等待 ExecutorService 的所有任务完成的最简单方法是什么?我的任务主要是计算,所以我只想运行大量作业 - 每个核心一个。现在我的设置如下所示:
ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {
es.execute(new ComputeDTask(singleTable));
}
try{
es.wait();
}
catch (InterruptedException e){
e.printStackTrace();
}
ComputeDTask 实现了可运行的。这似乎正确执行了任务,但代码在 wait()
上崩溃,并出现 IllegalMonitorStateException
。这很奇怪,因为我玩了一些玩具示例,它似乎有效。
uniquePhrases
包含数万个元素。我应该使用其他方法吗?我正在寻找尽可能简单的东西
What is the simplest way to to wait for all tasks of ExecutorService
to finish? My task is primarily computational, so I just want to run a large number of jobs - one on each core. Right now my setup looks like this:
ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {
es.execute(new ComputeDTask(singleTable));
}
try{
es.wait();
}
catch (InterruptedException e){
e.printStackTrace();
}
ComputeDTask
implements runnable. This appears to execute the tasks correctly, but the code crashes on wait()
with IllegalMonitorStateException
. This is odd, because I played around with some toy examples and it appeared to work.
uniquePhrases
contains several tens of thousands of elements. Should I be using another method? I am looking for something as simple as possible
发布评论
评论(17)
最简单的方法是使用
ExecutorService.invokeAll()
一行代码即可完成您想要的操作。用您的话说,您需要修改或包装ComputeDTask
来实现Callable<>
,这可以为您提供更多的灵活性。可能在您的应用程序中有一个有意义的 Callable.call() 实现,但如果不使用Executors.callable()
。正如其他人所指出的,如果合适,您可以使用
invokeAll()
的超时版本。在此示例中,answers
将包含一堆Future
,它们将返回 null(请参阅Executors.callable()
的定义。可能您想要做的是稍微重构,这样您就可以获得有用的答案,或者对底层ComputeDTask的引用,但我无法从您的示例中看出,如果不清楚,请注意,在所有任务完成之前,
invokeAll()
不会返回(即answers
集合中的所有Future
将报告 <。 code>.isDone() 如果要求的话。)这可以避免所有手动关闭、awaitTermination 等...并且允许您在多个周期中整齐地重用此 ExecutorService(如果需要)。有一些关于SO的相关问题:
如何等待让所有线程完成
从 java 线程返回值
invokeAll() 不愿意接受 Collection> < /p>
我需要同步吗?
这些都不是严格针对您的问题的,但它们确实提供了一些关于人们如何看待
Executor
的信息应该使用 /ExecutorService
。The simplest approach is to use
ExecutorService.invokeAll()
which does what you want in a one-liner. In your parlance, you'll need to modify or wrapComputeDTask
to implementCallable<>
, which can give you quite a bit more flexibility. Probably in your app there is a meaningful implementation ofCallable.call()
, but here's a way to wrap it if not usingExecutors.callable()
.As others have pointed out, you could use the timeout version of
invokeAll()
if appropriate. In this example,answers
is going to contain a bunch ofFuture
s which will return nulls (see definition ofExecutors.callable()
. Probably what you want to do is a slight refactoring so you can get a useful answer back, or a reference to the underlyingComputeDTask
, but I can't tell from your example.If it isn't clear, note that
invokeAll()
will not return until all the tasks are completed. (i.e., all theFuture
s in youranswers
collection will report.isDone()
if asked.) This avoids all the manual shutdown, awaitTermination, etc... and allows you to reuse thisExecutorService
neatly for multiple cycles, if desired.There are a few related questions on SO:
How to wait for all threads to finish
Return values from java threads
invokeAll() not willing to accept a Collection<Callable<t>>
Do I need to synchronize?
None of these are strictly on-point for your question, but they do provide a bit of color about how folks think
Executor
/ExecutorService
ought to be used.如果要等待所有任务完成,请使用
shutdown
方法而不是wait
。然后使用awaitTermination
。另外,您可以使用
Runtime .availableProcessors
获取硬件线程的数量,以便您可以正确初始化线程池。If you want to wait for all tasks to complete, use the
shutdown
method instead ofwait
. Then follow it withawaitTermination
.Also, you can use
Runtime.availableProcessors
to get the number of hardware threads so you can initialize your threadpool properly.如果等待 ExecutorService 中的所有任务完成并不是您的目标,而是等待特定批次的任务完成,您可以使用
CompletionService
— 具体来说,一个ExecutorCompletionService
< /a>.这个想法是创建一个
ExecutorCompletionService
来包装您的Executor
,通过CompletionService
提交一些已知数量的任务code>,然后使用
take()
(阻塞)或poll()
(事实并非如此)。一旦您得出了与您提交的任务相对应的所有预期结果,您就知道它们都已完成。让我再声明一次,因为从界面上看并不明显:您必须知道向
CompletionService
中放入了多少内容,才能知道要尝试提取多少内容。这对于take()
方法尤其重要:调用它一次太多,它会阻塞您的调用线程,直到其他线程向同一个CompletionService
提交另一个作业。有一些示例展示了如何使用
Java 并发实践一书中的 CompletionService
。If waiting for all tasks in the
ExecutorService
to finish isn't precisely your goal, but rather waiting until a specific batch of tasks has completed, you can use aCompletionService
— specifically, anExecutorCompletionService
.The idea is to create an
ExecutorCompletionService
wrapping yourExecutor
, submit some known number of tasks through theCompletionService
, then draw that same number of results from the completion queue using eithertake()
(which blocks) orpoll()
(which does not). Once you've drawn all the expected results corresponding to the tasks you submitted, you know they're all done.Let me state this one more time, because it's not obvious from the interface: You must know how many things you put into the
CompletionService
in order to know how many things to try to draw out. This matters especially with thetake()
method: call it one time too many and it will block your calling thread until some other thread submits another job to the sameCompletionService
.There are some examples showing how to use
CompletionService
in the book Java Concurrency in Practice.如果要等待执行器服务完成执行,请调用
shutdown()
,然后 awaitTermination(units, unitType),例如awaitTermination(1, MINUTE)
。 ExecutorService 不会在它自己的监视器上阻塞,因此您不能使用wait
等。If you want to wait for the executor service to finish executing, call
shutdown()
and then, awaitTermination(units, unitType), e.g.awaitTermination(1, MINUTE)
. The ExecutorService does not block on it's own monitor, so you can't usewait
etc.您可以等待作业在一定的时间间隔内完成:
或者您可以使用ExecutorService。提交(Runnable)并收集Future< /em> 它返回的对象,并依次调用每个对象的 get() 以等待它们完成。
InterruptedException对于正确处理非常重要。它可以让您或您的库的用户安全地终止一个漫长的进程。
You could wait jobs to finish on a certain interval:
Or you could use ExecutorService.submit(Runnable) and collect the Future objects that it returns and call get() on each in turn to wait for them to finish.
InterruptedException is extremely important to handle properly. It is what lets you or the users of your library terminate a long process safely.
使用
只需在每个线程中
并作为屏障
Just use
In each thread
and as barrier
有几种方法。
您可以先调用 ExecutorService.shutdown 然后 ExecutorService.awaitTermination 返回:
所以:
您只需在循环中调用
awaitTermination
即可。使用
awaitTermination
:此实现的完整示例:
使用
CountDownLatch
:另一个选项是创建一个CountDownLatch 带有
count
等于并行任务的数量。每个线程调用countDownLatch.countDown();
,而主线程调用countDownLatch.await();
。此实现的完整示例:
使用
CyclicBarrier
:另一种方法是使用循环屏障
还有其他方法,但这些方法需要更改您的初始要求,即:
There are several approaches.
You can call first ExecutorService.shutdown and then ExecutorService.awaitTermination which returns:
So:
You just have to call
awaitTermination
in a loop.Using
awaitTermination
:A full example of this implementation:
Using
CountDownLatch
:Another option is to create a CountDownLatch with a
count
equal to the number of parallel tasks. Each thread callscountDownLatch.countDown();
, while the main thread callscountDownLatch.await();
.A full example of this implementation:
Using
CyclicBarrier
:Another approach is to use a Cyclic Barrier
There are other approaches as well but those would require changes to your initial requirements, namely:
IllegalMonitorStateException 的根本原因:
从您的代码中,您刚刚在 ExecutorService 上调用了 wait() 而没有拥有锁。
下面的代码将修复
IllegalMonitorStateException
按照以下方法之一等待已提交给
ExecutorService
的所有任务完成。遍历
ExecutorService
上submit
的所有Future
任务,并通过阻塞调用get()
检查状态onFuture
object使用 invokeAll 在
ExecutorService
上
使用CountDownLatch
使用 ForkJoinPool 或newWorkStealingPool < code>执行器(自 java 8 起)
按照 Oracle 文档中的建议关闭池页面
如果您想在使用选项 5 而不是选项 1 到 4 时优雅地等待所有任务完成,请更改
至
while(condition)
每 1 分钟检查一次。Root cause for IllegalMonitorStateException:
From your code, you have just called wait() on ExecutorService without owning the lock.
Below code will fix
IllegalMonitorStateException
Follow one of below approaches to wait for completion of all tasks, which have been submitted to
ExecutorService
.Iterate through all
Future
tasks fromsubmit
onExecutorService
and check the status with blocking callget()
onFuture
objectUsing invokeAll on
ExecutorService
Using CountDownLatch
Using ForkJoinPool or newWorkStealingPool of
Executors
(since java 8)Shutdown the pool as recommended in oracle documentation page
If you want to gracefully wait for all tasks for completion when you are using option 5 instead of options 1 to 4, change
to
a
while(condition)
which checks for every 1 minute.您可以使用 ExecutorService.invokeAll 方法,它将执行所有任务并等待所有线程完成其任务。
这里是完整的 javadoc
您还可以使用此方法的重载版本来指定超时。
以下是
ExecutorService.invokeAll
的示例代码You can use
ExecutorService.invokeAll
method, It will execute all task and wait till all threads finished their task.Here is complete javadoc
You can also user overloaded version of this method to specify the timeout.
Here is sample code with
ExecutorService.invokeAll
我也有这样的情况,我有一组文档要爬取。我从应该处理的初始“种子”文档开始,该文档包含指向也应该处理的其他文档的链接,等等。
在我的主程序中,我只想编写如下所示的内容,其中
Crawler
控制一堆线程。如果我想导航一棵树,也会发生同样的情况;我会弹出根节点,每个节点的处理器会根据需要将子节点添加到队列中,并且一堆线程将处理树中的所有节点,直到没有更多的节点。
我在 JVM 中找不到任何东西,我认为这有点令人惊讶。因此,我编写了一个类
ThreadPool
,可以直接使用它,也可以创建子类来添加适合该领域的方法,例如schedule(Document)
。希望有帮助!线程池 Javadoc | Maven
I also have the situation that I have a set of documents to be crawled. I start with an initial "seed" document which should be processed, that document contains links to other documents which should also be processed, and so on.
In my main program, I just want to write something like the following, where
Crawler
controls a bunch of threads.The same situation would happen if I wanted to navigate a tree; i would pop in the root node, the processor for each node would add children to the queue as necessary, and a bunch of threads would process all the nodes in the tree, until there were no more.
I couldn't find anything in the JVM which I thought was a bit surprising. So I wrote a class
ThreadPool
which one can either use directly or subclass to add methods suitable for the domain, e.g.schedule(Document)
. Hope it helps!ThreadPool Javadoc | Maven
添加集合中的所有线程并使用
invokeAll
提交。如果您可以使用
ExecutorService
的invokeAll
方法,JVM将在所有线程完成之前不会继续执行下一行。这里有一个很好的例子:
通过 ExecutorService 调用 All
Add all threads in collection and submit it using
invokeAll
.If you can use
invokeAll
method ofExecutorService
, JVM won’t proceed to next line until all threads are complete.Here there is a good example:
invokeAll via ExecutorService
将您的任务提交到 Runner 中,然后等待调用方法 waitTillDone(),如下所示:
要使用它,请添加此 gradle/maven 依赖项:
'com.github.com。 matejtymes:javafixes:1.0'
有关更多详细信息,请参见此处:https://github.com/MatejTymes/JavaFixes 或此处:http:// matejtymes.blogspot.com/2016/04/executor-that-notizes-you-when-task.html
Submit your tasks into the Runner and then wait calling the method waitTillDone() like this:
To use it add this gradle/maven dependency:
'com.github.matejtymes:javafixes:1.0'
For more details look here: https://github.com/MatejTymes/JavaFixes or here: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
我将等待执行程序以您认为适合任务完成的指定超时终止。
I will just wait for the executor to terminate with a specified timeout that you think it is suitable for the tasks to complete.
听起来你需要
ForkJoinPool
并使用全局池来执行任务。美妙之处在于
pool.awaitQuithesis
,其中该方法将阻止利用调用者的线程来执行其任务,然后在真正为空时返回。Sounds like you need
ForkJoinPool
and use the global pool to execute tasks.The beauty is in
pool.awaitQuiescence
where the method will block utilize the caller's thread to execute its tasks and then return when it is really empty.这个怎么样?
如果您不向 ExecutorService 添加新任务,这可能会等待所有当前任务完成
how about this?
if you do not add new tasks to ExecutorService , this may wait for all current tasks completed
一个简单的替代方法是使用线程和连接。
请参阅:加入线程
A simple alternative to this is to use threads along with join.
Refer : Joining Threads
你可以使用这个:
you can use this :