ThreadPoolExecutor 的 getActiveCount()
当我调用 getActiveCount() 时,我有一个 ThreadPoolExecutor 似乎在对我撒谎。然而,我没有做过很多多线程编程,所以也许我做错了一些事情。
这是我的 TPE
@Override
public void afterPropertiesSet() throws Exception {
BlockingQueue<Runnable> workQueue;
int maxQueueLength = threadPoolConfiguration.getMaximumQueueLength();
if (maxQueueLength == 0) {
workQueue = new LinkedBlockingQueue<Runnable>();
} else {
workQueue = new LinkedBlockingQueue<Runnable>(maxQueueLength);
}
pool = new ThreadPoolExecutor(
threadPoolConfiguration.getCorePoolSize(),
threadPoolConfiguration.getMaximumPoolSize(),
threadPoolConfiguration.getKeepAliveTime(),
TimeUnit.valueOf(threadPoolConfiguration.getTimeUnit()),
workQueue,
// Default thread factory creates normal-priority,
// non-daemon threads.
Executors.defaultThreadFactory(),
// Run any rejected task directly in the calling thread.
// In this way no records will be lost due to rejection
// however, no records will be added to the workQueue
// while the calling thread is processing a Task, so set
// your queue-size appropriately.
//
// This also means MaxThreadCount+1 tasks may run
// concurrently. If you REALLY want a max of MaxThreadCount
// threads don't use this.
new ThreadPoolExecutor.CallerRunsPolicy());
}
在这个类中,我还有一个 DAO,我将其传递到我的 Runnable (FooWorker
) 中,如下所示:
@Override
public void addTask(FooRecord record) {
if (pool == null) {
throw new FooException(ERROR_THREAD_POOL_CONFIGURATION_NOT_SET);
}
pool.execute(new FooWorker(context, calculator, dao, record));
}
FooWorker
运行 record
(仅非单例)通过 calculator
通过状态机,然后通过 dao
将转换发送到数据库,如下所示:
public void run() {
calculator.calculate(record);
dao.save(record);
}
一旦我的主线程完成创建新任务,我尝试并等待以确保所有线程成功完成:
while (pool.getActiveCount() > 0) {
recordHandler.awaitTermination(terminationTimeout,
terminationTimeoutUnit);
}
我从输出日志(由于线程处理而可能不可靠)看到的是 getActiveCount() 过早返回零,并且 while() 循环正在退出,而我的最后一个线程仍在打印 <代码>计算器。
请注意,我还尝试调用pool.shutdown()
,然后使用awaitTermination
,但下次我的作业运行时,池仍然关闭。
我唯一的猜测是在线程内部,当我将数据发送到dao
时(因为它是Spring在主线程中创建的单例......),java正在考虑线程处于非活动状态,因为(我假设)它正在主线程中处理/等待。
直觉上,仅基于我所看到的,这是我的猜测。但是……真的是这样吗?有没有一种方法可以“正确执行”,而无需在 run() 顶部放置手动递增变量并在末尾放置递减变量来跟踪线程数?
如果答案是“不要传入 dao”,那么我不是必须为每个线程“新建”一个 DAO 吗?我的流程已经是一个(美丽、高效)的野兽,但这真的很糟糕。
I have a ThreadPoolExecutor that seems to be lying to me when I call getActiveCount(). I haven't done a lot of multithreaded programming however, so perhaps I'm doing something incorrectly.
Here's my TPE
@Override
public void afterPropertiesSet() throws Exception {
BlockingQueue<Runnable> workQueue;
int maxQueueLength = threadPoolConfiguration.getMaximumQueueLength();
if (maxQueueLength == 0) {
workQueue = new LinkedBlockingQueue<Runnable>();
} else {
workQueue = new LinkedBlockingQueue<Runnable>(maxQueueLength);
}
pool = new ThreadPoolExecutor(
threadPoolConfiguration.getCorePoolSize(),
threadPoolConfiguration.getMaximumPoolSize(),
threadPoolConfiguration.getKeepAliveTime(),
TimeUnit.valueOf(threadPoolConfiguration.getTimeUnit()),
workQueue,
// Default thread factory creates normal-priority,
// non-daemon threads.
Executors.defaultThreadFactory(),
// Run any rejected task directly in the calling thread.
// In this way no records will be lost due to rejection
// however, no records will be added to the workQueue
// while the calling thread is processing a Task, so set
// your queue-size appropriately.
//
// This also means MaxThreadCount+1 tasks may run
// concurrently. If you REALLY want a max of MaxThreadCount
// threads don't use this.
new ThreadPoolExecutor.CallerRunsPolicy());
}
In this class I also have a DAO that I pass into my Runnable (FooWorker
), like so:
@Override
public void addTask(FooRecord record) {
if (pool == null) {
throw new FooException(ERROR_THREAD_POOL_CONFIGURATION_NOT_SET);
}
pool.execute(new FooWorker(context, calculator, dao, record));
}
FooWorker
runs record
(the only non-singleton) through a state machine via calculator
then sends the transitions to the database via dao
, like so:
public void run() {
calculator.calculate(record);
dao.save(record);
}
Once my main thread is done creating new tasks I try and wait to make sure all threads finished successfully:
while (pool.getActiveCount() > 0) {
recordHandler.awaitTermination(terminationTimeout,
terminationTimeoutUnit);
}
What I'm seeing from output logs (which are presumably unreliable due to the threading) is that getActiveCount() is returning zero too early, and the while() loop is exiting while my last threads are still printing output from calculator
.
Note I've also tried calling pool.shutdown()
then using awaitTermination
but then the next time my job runs the pool is still shut down.
My only guess is that inside a thread, when I send data into the dao
(since it's a singleton created by Spring in the main thread...), java is considering the thread inactive since (I assume) it's processing in/waiting on the main thread.
Intuitively, based only on what I'm seeing, that's my guess. But... Is that really what's happening? Is there a way to "do it right" without putting a manual incremented variable at the top of run()
and a decremented at the end to track the number of threads?
If the answer is "don't pass in the dao", then wouldn't I have to "new" a DAO for every thread? My process is already a (beautiful, efficient) beast, but that would really suck.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
作为<的JavaDoc code>getActiveCount 声明,它是一个近似值:您不应以此为基础做出任何主要业务逻辑决策。
如果您想等待所有计划任务完成,那么您应该简单地使用
如果您需要等待特定任务完成,您应该使用
submit()
而不是execute()
,然后检查Future
对象用于完成(使用isDone()< /code>
如果您想以非阻塞方式进行操作或只需调用
get()< /code>
会阻塞直到任务完成)。
As the JavaDoc of
getActiveCount
states, it's an approximate value: you should not base any major business logic decisions on this.If you want to wait for all scheduled tasks to complete, then you should simply use
If you need to wait for a specific task to finish, you should use
submit()
instead ofexecute()
and then check theFuture
object for completion (either usingisDone()
if you want to do it non-blocking or by simply callingget()
which blocks until the task is done).文档表明
ThreadPoolExecutor
上的方法getActiveCount()
不是一个精确的数字:就我个人而言,当我进行这样的多线程工作时,我使用一个变量,该变量在添加任务时递增,并在获取其输出时递减。
The documentation suggests that the method
getActiveCount()
onThreadPoolExecutor
is not an exact number:Personally, when I am doing multithreaded work such as this, I use a variable that I increment as I add tasks, and decrement as I grab their output.