如何确保线程池已完成?

发布于 2024-12-03 15:42:15 字数 1803 浏览 1 评论 0原文

设置:

我正在改变程序在幕后工作的方式。在当前版本中,工作原理如下:

public void threadWork( List<MyCallable> workQueue )
{
    ExecutorService pool = Executors.newFixedThreadPool(someConst);
    List<Future<myOutput>> returnValues = new ArrayList<Future<myOutput>>();
    List<myOutput> finishedStuff = new ArrayList<myOutput>();

    for( int i = 0; i < workQueue.size(); i++ )
    {
        returnValues.add( pool.submit( workQueue.get(i) ) );
    }

    while( !returnValues.isEmpty() )
    {
        try
        {
            // Future.get() waits for a value from the callable
            finishedStuff.add( returnValues.remove(0).get(0) );
        }
        catch(Throwable iknowthisisbaditisjustanexample){}
    }

    doLotsOfThings(finsihedStuff);
}

但是新系统将使用私有内部 Runnable 来调用将数据写入全局变量的同步方法。我的基本设置是:

public void threadReports( List<String> workQueue )
{
    ExecutorService pool = Executors.newFixedThreadPool(someConst); 
    List<MyRunnable> runnables = new ArrayList<MyRunnable>()

    for ( int i = 0; i < modules.size(); i++ )
    {
        runnables.add( new MyRunnable( workQueue.get(i) );
        pool.submit(threads.get(i));
    }

    while( !runnables.isEmpty() )
    {
        try 
        {
            runnables.remove(0).wait(); // I realized that this wouldn't work
        } 
        catch(Throwable iknowthisisbaditisjustanexample){}
    }
    doLotsOfThings(finsihedStuff); // finishedStuff is the global the Runnables write to
}

如果您在第二段代码的尝试中阅读我的评论,您会注意到我不知道如何使用 wait()。我原以为它基本上就像 thread.join() 但在阅读文档后我发现它不是。

我可以根据需要更改某些结构,但是需要完成工作、使用可运行对象、将可运行对象写入全局变量以及使用线程池的基本系统。


问题

我怎样才能在我 doLotsOfThings() 之前等待线程池完全完成?

The setup:

I am in the process of changing the way a program works under the hood. In the current version works like this:

public void threadWork( List<MyCallable> workQueue )
{
    ExecutorService pool = Executors.newFixedThreadPool(someConst);
    List<Future<myOutput>> returnValues = new ArrayList<Future<myOutput>>();
    List<myOutput> finishedStuff = new ArrayList<myOutput>();

    for( int i = 0; i < workQueue.size(); i++ )
    {
        returnValues.add( pool.submit( workQueue.get(i) ) );
    }

    while( !returnValues.isEmpty() )
    {
        try
        {
            // Future.get() waits for a value from the callable
            finishedStuff.add( returnValues.remove(0).get(0) );
        }
        catch(Throwable iknowthisisbaditisjustanexample){}
    }

    doLotsOfThings(finsihedStuff);
}

But the new system is going to use a private inner Runnable to call a synchronized method that writes the data into a global variable. My basic setup is:

public void threadReports( List<String> workQueue )
{
    ExecutorService pool = Executors.newFixedThreadPool(someConst); 
    List<MyRunnable> runnables = new ArrayList<MyRunnable>()

    for ( int i = 0; i < modules.size(); i++ )
    {
        runnables.add( new MyRunnable( workQueue.get(i) );
        pool.submit(threads.get(i));
    }

    while( !runnables.isEmpty() )
    {
        try 
        {
            runnables.remove(0).wait(); // I realized that this wouldn't work
        } 
        catch(Throwable iknowthisisbaditisjustanexample){}
    }
    doLotsOfThings(finsihedStuff); // finishedStuff is the global the Runnables write to
}

If you read my comment in the try of the second piece of code you will notice that I don't know how to use wait(). I had thought it was basically like thread.join() but after reading the documentation I see it is not.

I'm okay with changing some structure as needed, but the basic system of taking work, using runnables, having the runnables write to a global variable, and using a threadpool are requirements.


The Question

How can I wait for the threadpool to be completely finished before I doLotsOfThings()?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

撕心裂肺的伤痛 2024-12-10 15:42:15

您应该调用 ExecutorService。 shutdown() 然后 ExecutorService.awaitTermination

...
pool.shutdown();
if (pool.awaitTermination(<long>,<TimeUnit>)) {
    // finished before timeout
    doLotsOfThings(finsihedStuff);
} else {
    // Timeout occured.
}

You should call ExecutorService.shutdown() and then ExecutorService.awaitTermination.

...
pool.shutdown();
if (pool.awaitTermination(<long>,<TimeUnit>)) {
    // finished before timeout
    doLotsOfThings(finsihedStuff);
} else {
    // Timeout occured.
}
淡笑忘祈一世凡恋 2024-12-10 15:42:15

试试这个:

pool.shutdown();
pool.awaitTermination(WHATEVER_TIMEOUT, TimeUnit.SECONDS);

Try this:

pool.shutdown();
pool.awaitTermination(WHATEVER_TIMEOUT, TimeUnit.SECONDS);
瞳孔里扚悲伤 2024-12-10 15:42:15

您是否考虑过使用现在包含的 Fork/Join 框架Java 7 中。如果您不想使用 Java 7,则可以在 此处获取它的 jar

Have you considered using the Fork/Join framework that is now included in Java 7. If you do not want to use Java 7 yet you can get the jar for it here.

请别遗忘我 2024-12-10 15:42:15
public void threadReports( List<String> workQueue )
{
    ExecutorService pool = Executors.newFixedThreadPool(someConst); 
    Set<Future<?>> futures = new HashSet<Future<?>>();

    for ( int i = 0; i < modules.size(); i++ )
    {
        futures.add(pool.submit(threads.get(i)));
    }

    while( !futures.isEmpty() )
    {
        Set<Future<?>> removed = new Set<Future<?>>();
        for(Future<?> f : futures) {
            f.get(100, TimeUnit.MILLISECONDS);
            if(f.isDone()) removed.add(f);
        }
        for(Future<?> f : removed) futures.remove(f);
    }
    doLotsOfThings(finsihedStuff); // finishedStuff is the global the Runnables write to
}
public void threadReports( List<String> workQueue )
{
    ExecutorService pool = Executors.newFixedThreadPool(someConst); 
    Set<Future<?>> futures = new HashSet<Future<?>>();

    for ( int i = 0; i < modules.size(); i++ )
    {
        futures.add(pool.submit(threads.get(i)));
    }

    while( !futures.isEmpty() )
    {
        Set<Future<?>> removed = new Set<Future<?>>();
        for(Future<?> f : futures) {
            f.get(100, TimeUnit.MILLISECONDS);
            if(f.isDone()) removed.add(f);
        }
        for(Future<?> f : removed) futures.remove(f);
    }
    doLotsOfThings(finsihedStuff); // finishedStuff is the global the Runnables write to
}
北斗星光 2024-12-10 15:42:15

shutdown 是 ExecutorService 的生命周期方法,调用后会使执行器不可用。在方法中创建和销毁线程池与创建/销毁线程一样糟糕:它几乎违背了使用线程池的目的,即通过启用透明重用来减少线程创建的开销。

如果可能,您应该保持 ExecutorService 生命周期与应用程序同步。 - 在第一次需要时创建,在应用程序关闭时关闭。

为了实现执行一堆任务并等待它们的目标,ExecutorService 提供了方法 invokeAll(Collection>tasks)(并且如果你想等待一段特定的时间,则使用带有超时的版本。)

使用这种方法和上面提到的一些要点,有问题的代码变成:

public void threadReports( List<String> workQueue ) {

    List<MyRunnable> runnables = new ArrayList<MyRunnable>(workQueue.size());
    for (String work:workQueue) {
        runnables.add(new MyRunnable(work));
    }
    // Executor is obtained from some applicationContext that takes care of lifecycle mgnt
    // invokeAll(...) will block and return when all callables are executed 
    List<Future<MyRunnable>> results = applicationContext.getExecutor().invokeAll(runnables); 

    // I wouldn't use a global variable unless you have a VERY GOOD reason for that.  
    // b/c all the threads of the pool doing work will be contending for the lock on that variable.
    // doLotsOfThings(finishedStuff);  

    // Note that the List of Futures holds the individual results of each execution. 
    // That said, the preferred way to harvest your results would be:
    doLotsOfThings(results);
}

PS:不确定为什么 threadReports无效。它可以/应该返回 doLotsOfThings 的计算以实现更实用的设计。

shutdownis a lifecycle method of the ExecutorService and renders the executor unusable after the call. Creating and destroying ThreadPools in a method is as bad as creating/destroying threads: it pretty much defeats the purpose of using threadpool, which is to reduce the overhead of thread creation by enabling transparent reuse.

If possible, you should maintain your ExecutorService lifecycle in sync with your application. - create when first needed, shutdown when your app is closing down.

To achieve your goal of executing a bunch of tasks and waiting for them, the ExecutorService provides the method invokeAll(Collection<? extends Callable<T>> tasks) (and the version with timeout if you want to wait a specific period of time.)

Using this method and some of the points mentioned above, the code in question becomes:

public void threadReports( List<String> workQueue ) {

    List<MyRunnable> runnables = new ArrayList<MyRunnable>(workQueue.size());
    for (String work:workQueue) {
        runnables.add(new MyRunnable(work));
    }
    // Executor is obtained from some applicationContext that takes care of lifecycle mgnt
    // invokeAll(...) will block and return when all callables are executed 
    List<Future<MyRunnable>> results = applicationContext.getExecutor().invokeAll(runnables); 

    // I wouldn't use a global variable unless you have a VERY GOOD reason for that.  
    // b/c all the threads of the pool doing work will be contending for the lock on that variable.
    // doLotsOfThings(finishedStuff);  

    // Note that the List of Futures holds the individual results of each execution. 
    // That said, the preferred way to harvest your results would be:
    doLotsOfThings(results);
}

PS: Not sure why threadReports is void. It could/should return the calculation of doLotsOfThings to achieve a more functional design.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文