ExecutorService,如何等待所有任务完成

发布于 2024-09-10 09:54:44 字数 554 浏览 3 评论 0 原文

等待 ExecutorService 的所有任务完成的最简单方法是什么?我的任务主要是计算,所以我只想运行大量作业 - 每个核心一个。现在我的设置如下所示:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask 实现了可运行的。这似乎正确执行了任务,但代码在 wait() 上崩溃,并出现 IllegalMonitorStateException。这很奇怪,因为我玩了一些玩具示例,它似乎有效。

uniquePhrases 包含数万个元素。我应该使用其他方法吗?我正在寻找尽可能简单的东西

What is the simplest way to to wait for all tasks of ExecutorService to finish? My task is primarily computational, so I just want to run a large number of jobs - one on each core. Right now my setup looks like this:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask implements runnable. This appears to execute the tasks correctly, but the code crashes on wait() with IllegalMonitorStateException. This is odd, because I played around with some toy examples and it appeared to work.

uniquePhrases contains several tens of thousands of elements. Should I be using another method? I am looking for something as simple as possible

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(17

对岸观火 2024-09-17 09:54:44

最简单的方法是使用 ExecutorService.invokeAll() 一行代码即可完成您想要的操作。用您的话说,您需要修改或包装 ComputeDTask 来实现 Callable<>,这可以为您提供更多的灵活性。可能在您的应用程序中有一个有意义的 Callable.call() 实现,但如果不使用 Executors.callable()

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

正如其他人所指出的,如果合适,您可以使用 invokeAll() 的超时版本。在此示例中,answers 将包含一堆 Future,它们将返回 null(请参阅 Executors.callable() 的定义。可能您想要做的是稍微重构,这样您就可以获得有用的答案,或者对底层ComputeDTask的引用,但我无法从您的示例中看出,

如果不清楚,请注意,在所有任务完成之前,invokeAll() 不会返回(即 answers 集合中的所有 Future 将报告 <。 code>.isDone() 如果要求的话。)这可以避免所有手动关闭、awaitTermination 等...并且允许您在多个周期中整齐地重用此 ExecutorService(如果需要)

。有一些关于SO的相关问题:

这些都不是严格针对您的问题的,但它们确实提供了一些关于人们如何看待Executor的信息应该使用 /ExecutorService

The simplest approach is to use ExecutorService.invokeAll() which does what you want in a one-liner. In your parlance, you'll need to modify or wrap ComputeDTask to implement Callable<>, which can give you quite a bit more flexibility. Probably in your app there is a meaningful implementation of Callable.call(), but here's a way to wrap it if not using Executors.callable().

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

As others have pointed out, you could use the timeout version of invokeAll() if appropriate. In this example, answers is going to contain a bunch of Futures which will return nulls (see definition of Executors.callable(). Probably what you want to do is a slight refactoring so you can get a useful answer back, or a reference to the underlying ComputeDTask, but I can't tell from your example.

If it isn't clear, note that invokeAll() will not return until all the tasks are completed. (i.e., all the Futures in your answers collection will report .isDone() if asked.) This avoids all the manual shutdown, awaitTermination, etc... and allows you to reuse this ExecutorService neatly for multiple cycles, if desired.

There are a few related questions on SO:

None of these are strictly on-point for your question, but they do provide a bit of color about how folks think Executor/ExecutorService ought to be used.

千里故人稀 2024-09-17 09:54:44

如果要等待所有任务完成,请使用 shutdown 方法而不是 wait。然后使用 awaitTermination

另外,您可以使用 Runtime .availableProcessors 获取硬件线程的数量,以便您可以正确初始化线程池。

If you want to wait for all tasks to complete, use the shutdown method instead of wait. Then follow it with awaitTermination.

Also, you can use Runtime.availableProcessors to get the number of hardware threads so you can initialize your threadpool properly.

不甘平庸 2024-09-17 09:54:44

如果等待 ExecutorService 中的所有任务完成并不是您的目标,而是等待特定批次的任务完成,您可以使用 CompletionService — 具体来说,一个 ExecutorCompletionService< /a>.

这个想法是创建一个ExecutorCompletionService来包装您的Executor通过 CompletionService 提交一些已知数量的任务code>,然后使用 take() (阻塞)或 poll() (事实并非如此)。一旦您得出了与您提交的任务相对应的所有预期结果,您就知道它们都已完成。

让我再声明一次,因为从界面上看并不明显:您必须知道向 CompletionService 中放入了多少内容,才能知道要尝试提取多少内容。这对于 take() 方法尤其重要:调用它一次太多,它会阻塞您的调用线程,直到其他线程向同一个 CompletionService 提交另一个作业。

一些示例展示了如何使用 Java 并发实践一书中的 CompletionService

If waiting for all tasks in the ExecutorService to finish isn't precisely your goal, but rather waiting until a specific batch of tasks has completed, you can use a CompletionService — specifically, an ExecutorCompletionService.

The idea is to create an ExecutorCompletionService wrapping your Executor, submit some known number of tasks through the CompletionService, then draw that same number of results from the completion queue using either take() (which blocks) or poll() (which does not). Once you've drawn all the expected results corresponding to the tasks you submitted, you know they're all done.

Let me state this one more time, because it's not obvious from the interface: You must know how many things you put into the CompletionService in order to know how many things to try to draw out. This matters especially with the take() method: call it one time too many and it will block your calling thread until some other thread submits another job to the same CompletionService.

There are some examples showing how to use CompletionService in the book Java Concurrency in Practice.

Smile简单爱 2024-09-17 09:54:44

如果要等待执行器服务完成执行,请调用 shutdown(),然后 awaitTermination(units, unitType),例如 awaitTermination(1, MINUTE) 。 ExecutorService 不会在它自己的监视器上阻塞,因此您不能使用 wait 等。

If you want to wait for the executor service to finish executing, call shutdown() and then, awaitTermination(units, unitType), e.g. awaitTermination(1, MINUTE). The ExecutorService does not block on it's own monitor, so you can't use wait etc.

往日 2024-09-17 09:54:44

您可以等待作业在一定的时间间隔内完成:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

或者您可以使用ExecutorService提交Runnable)并收集Future< /em> 它返回的对象,并依次调用每个对象的 get() 以等待它们完成。

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

InterruptedException对于正确处理非常重要。它可以让您或您的库的用户安全地终止一个漫长的进程。

You could wait jobs to finish on a certain interval:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

Or you could use ExecutorService.submit(Runnable) and collect the Future objects that it returns and call get() on each in turn to wait for them to finish.

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

InterruptedException is extremely important to handle properly. It is what lets you or the users of your library terminate a long process safely.

安稳善良 2024-09-17 09:54:44

使用

latch = new CountDownLatch(noThreads)

只需在每个线程中

latch.countDown();

并作为屏障

latch.await();

Just use

latch = new CountDownLatch(noThreads)

In each thread

latch.countDown();

and as barrier

latch.await();
流云如水 2024-09-17 09:54:44

有几种方法。

您可以先调用 ExecutorService.shutdown 然后 ExecutorService.awaitTermination 返回:

true 如果此执行程序终止,false 如果超时
终止前

所以:

有一个名为 awaitTermination 的函数,但是必须超时
其中提供。这并不能保证当返回所有
任务本来就已经完成了。有没有办法实现这个目标?

您只需在循环中调用 awaitTermination 即可。

使用awaitTermination

此实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100));
        }

        int count = 0;

        // This is the relevant part
        // Chose the delay most appropriate for your use case
        executor.shutdown();
        while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
            System.out.println("Waiting "+ count);
            count++;
        }
    }

    private static Runnable parallelWork(long sleepMillis) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
        };
    }
}

使用CountDownLatch

另一个选项是创建一个CountDownLatch 带有 count 等于并行任务的数量。每个线程调用 countDownLatch.countDown();,而线程调用 countDownLatch.await();

此实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        CountDownLatch countDownLatch = new CountDownLatch(total_threads);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, countDownLatch));
        }
        countDownLatch.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            countDownLatch.countDown();
        };
    }
}

使用CyclicBarrier

另一种方法是使用循环屏障

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final int total_threads = 4;
        CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, barrier));
        }
        barrier.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
              // Do something
            }
        };
    }
}

还有其他方法,但这些方法需要更改您的初始要求,即:

提交时如何等待所有任务完成
使用 ExecutorService.execute() 。

There are several approaches.

You can call first ExecutorService.shutdown and then ExecutorService.awaitTermination which returns:

true if this executor terminated and false if the timeout elapsed
before termination

So:

There is a function called awaitTermination But a timeout has to be
provided in it. Which is not a guarantee that when this returns all
the tasks would have been finished. Is there a way to achieve this?

You just have to call awaitTermination in a loop.

Using awaitTermination:

A full example of this implementation:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100));
        }

        int count = 0;

        // This is the relevant part
        // Chose the delay most appropriate for your use case
        executor.shutdown();
        while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
            System.out.println("Waiting "+ count);
            count++;
        }
    }

    private static Runnable parallelWork(long sleepMillis) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
        };
    }
}

Using CountDownLatch:

Another option is to create a CountDownLatch with a count equal to the number of parallel tasks. Each thread calls countDownLatch.countDown();, while the main thread calls countDownLatch.await();.

A full example of this implementation:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        CountDownLatch countDownLatch = new CountDownLatch(total_threads);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, countDownLatch));
        }
        countDownLatch.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            countDownLatch.countDown();
        };
    }
}

Using CyclicBarrier:

Another approach is to use a Cyclic Barrier

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final int total_threads = 4;
        CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, barrier));
        }
        barrier.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
              // Do something
            }
        };
    }
}

There are other approaches as well but those would require changes to your initial requirements, namely:

How to wait for all tasks to be completed when they are submitted
using ExecutorService.execute() .

疧_╮線 2024-09-17 09:54:44

IllegalMonitorStateException 的根本原因:

抛出该异常表示线程尝试在对象的监视器上等待,或通知在不拥有指定监视器的情况下在对象的监视器上等待的其他线程。

从您的代码中,您刚刚在 ExecutorService 上调用了 wait() 而没有拥有锁。

下面的代码将修复 IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

按照以下方法之一等待已提交给 ExecutorService 的所有任务完成。

  1. 遍历 ExecutorServicesubmit 的所有 Future 任务,并通过阻塞调用 get() 检查状态on Future object

  2. 使用 invokeAllExecutorService

  3. 使用CountDownLatch

  4. 使用 ForkJoinPoolnewWorkStealingPool < code>执行器(自 java 8 起)

  5. 按照 Oracle 文档中的建议关闭池页面

    void shutdownAndAwaitTermination(ExecutorService pool) {
       池.shutdown(); // 禁止新任务提交
       尝试 {
       // 等待一段时间,让现有任务终止
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // 取消当前正在执行的任务
           // 等待一段时间任务响应被取消
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("池没有终止");
       }
    } catch (InterruptedException 即) {
         // 如果当前线程也被中断,则(重新)取消
         pool.shutdownNow();
         // 保存中断状态
         Thread.currentThread().interrupt();
    }
    

    如果您想在使用选项 5 而不是选项 1 到 4 时优雅地等待所有任务完成,请更改

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
    

    while(condition) 每 1 分钟检查一次。

Root cause for IllegalMonitorStateException:

Thrown to indicate that a thread has attempted to wait on an object's monitor or to notify other threads waiting on an object's monitor without owning the specified monitor.

From your code, you have just called wait() on ExecutorService without owning the lock.

Below code will fix IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

Follow one of below approaches to wait for completion of all tasks, which have been submitted to ExecutorService.

  1. Iterate through all Future tasks from submit on ExecutorService and check the status with blocking call get() on Future object

  2. Using invokeAll on ExecutorService

  3. Using CountDownLatch

  4. Using ForkJoinPool or newWorkStealingPool of Executors(since java 8)

  5. Shutdown the pool as recommended in oracle documentation page

    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
       // Wait a while for existing tasks to terminate
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
       }
    } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
    }
    

    If you want to gracefully wait for all tasks for completion when you are using option 5 instead of options 1 to 4, change

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
    

    to

    a while(condition) which checks for every 1 minute.

墨洒年华 2024-09-17 09:54:44

您可以使用 ExecutorService.invokeAll 方法,它将执行所有任务并等待所有线程完成其任务。

这里是完整的 javadoc

您还可以使用此方法的重载版本来指定超时。

以下是 ExecutorService.invokeAll 的示例代码

public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}

You can use ExecutorService.invokeAll method, It will execute all task and wait till all threads finished their task.

Here is complete javadoc

You can also user overloaded version of this method to specify the timeout.

Here is sample code with ExecutorService.invokeAll

public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}
南汐寒笙箫 2024-09-17 09:54:44

我也有这样的情况,我有一组文档要爬取。我从应该处理的初始“种子”文档开始,该文档包含指向也应该处理的其他文档的链接,等等。

在我的主程序中,我只想编写如下所示的内容,其中 Crawler 控制一堆线程。

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

如果我想导航一棵树,也会发生同样的情况;我会弹出根节点,每个节点的处理器会根据需要将子节点添加到队列中,并且一堆线程将处理树中的所有节点,直到没有更多的节点。

我在 JVM 中找不到任何东西,我认为这有点令人惊讶。因此,我编写了一个类ThreadPool,可以直接使用它,也可以创建子类来添加适合该领域的方法,例如schedule(Document)。希望有帮助!

线程池 Javadoc | Maven

I also have the situation that I have a set of documents to be crawled. I start with an initial "seed" document which should be processed, that document contains links to other documents which should also be processed, and so on.

In my main program, I just want to write something like the following, where Crawler controls a bunch of threads.

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

The same situation would happen if I wanted to navigate a tree; i would pop in the root node, the processor for each node would add children to the queue as necessary, and a bunch of threads would process all the nodes in the tree, until there were no more.

I couldn't find anything in the JVM which I thought was a bit surprising. So I wrote a class ThreadPool which one can either use directly or subclass to add methods suitable for the domain, e.g. schedule(Document). Hope it helps!

ThreadPool Javadoc | Maven

奈何桥上唱咆哮 2024-09-17 09:54:44

添加集合中的所有线程并使用 invokeAll 提交。
如果您可以使用ExecutorServiceinvokeAll方法,JVM将在所有线程完成之前不会继续执行下一行。

这里有一个很好的例子:
通过 ExecutorService 调用 All

Add all threads in collection and submit it using invokeAll.
If you can use invokeAll method of ExecutorService, JVM won’t proceed to next line until all threads are complete.

Here there is a good example:
invokeAll via ExecutorService

陪你搞怪i 2024-09-17 09:54:44

将您的任务提交到 Runner 中,然后等待调用方法 waitTillDone(),如下所示:

Runner runner = Runner.runner(2);

for (DataTable singleTable : uniquePhrases) {

    runner.run(new ComputeDTask(singleTable));
}

// blocks until all tasks are finished (or failed)
runner.waitTillDone();

runner.shutdown();

要使用它,请添加此 gradle/maven 依赖项:'com.github.com。 matejtymes:javafixes:1.0'

有关更多详细信息,请参见此处:https://github.com/MatejTymes/JavaFixes 或此处:http:// matejtymes.blogspot.com/2016/04/executor-that-notizes-you-when-task.html

Submit your tasks into the Runner and then wait calling the method waitTillDone() like this:

Runner runner = Runner.runner(2);

for (DataTable singleTable : uniquePhrases) {

    runner.run(new ComputeDTask(singleTable));
}

// blocks until all tasks are finished (or failed)
runner.waitTillDone();

runner.shutdown();

To use it add this gradle/maven dependency: 'com.github.matejtymes:javafixes:1.0'

For more details look here: https://github.com/MatejTymes/JavaFixes or here: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

心凉 2024-09-17 09:54:44

我将等待执行程序以您认为适合任务完成的指定超时终止。

 try {  
         //do stuff here 
         exe.execute(thread);
    } finally {
        exe.shutdown();
    }
    boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
    if (!result)

    {
        LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
    }

I will just wait for the executor to terminate with a specified timeout that you think it is suitable for the tasks to complete.

 try {  
         //do stuff here 
         exe.execute(thread);
    } finally {
        exe.shutdown();
    }
    boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
    if (!result)

    {
        LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
    }
小傻瓜 2024-09-17 09:54:44

听起来你需要 ForkJoinPool 并使用全局池来执行任务。

public static void main(String[] args) {
    // the default `commonPool` should be sufficient for many cases.
    ForkJoinPool pool = ForkJoinPool.commonPool(); 
    // The root of your task that may spawn other tasks. 
    // Make sure it submits the additional tasks to the same executor that it is in.
    Runnable rootTask = new YourTask(pool); 
    pool.execute(rootTask);
    pool.awaitQuiescence(...);
    // that's it.
}

美妙之处在于pool.awaitQuithesis,其中该方法将阻止利用调用者的线程来执行其任务,然后在真正为空时返回。

Sounds like you need ForkJoinPool and use the global pool to execute tasks.

public static void main(String[] args) {
    // the default `commonPool` should be sufficient for many cases.
    ForkJoinPool pool = ForkJoinPool.commonPool(); 
    // The root of your task that may spawn other tasks. 
    // Make sure it submits the additional tasks to the same executor that it is in.
    Runnable rootTask = new YourTask(pool); 
    pool.execute(rootTask);
    pool.awaitQuiescence(...);
    // that's it.
}

The beauty is in pool.awaitQuiescence where the method will block utilize the caller's thread to execute its tasks and then return when it is really empty.

老旧海报 2024-09-17 09:54:44

这个怎么样?

Object lock = new Object();
CountDownLatch cdl = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
    executorService.execute(() -> {

        synchronized (lock) {
            cdl.countDown();
            try {
                lock.wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
}
cdl.await();
synchronized (lock) {
    lock.notifyAll();
}

如果您不向 ExecutorService 添加新任务,这可能会等待所有当前任务完成

how about this?

Object lock = new Object();
CountDownLatch cdl = new CountDownLatch(threadNum);
for (int i = 0; i < threadNum; i++) {
    executorService.execute(() -> {

        synchronized (lock) {
            cdl.countDown();
            try {
                lock.wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });
}
cdl.await();
synchronized (lock) {
    lock.notifyAll();
}

if you do not add new tasks to ExecutorService , this may wait for all current tasks completed

-柠檬树下少年和吉他 2024-09-17 09:54:44

一个简单的替代方法是使用线程和连接。
请参阅:加入线程

A simple alternative to this is to use threads along with join.
Refer : Joining Threads

墨洒年华 2024-09-17 09:54:44

你可以使用这个:

while (!executors.isTerminated()){}

you can use this :

while (!executors.isTerminated()){}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文