如何让主线程等待ThreadPoolExecutor中其他线程完成

发布于 2024-08-15 02:13:43 字数 1572 浏览 7 评论 0 原文

我正在使用 ThreadPoolExecutor 在我的 Java 应用程序中实现线程。

我有一个 XML,我需要解析它并将其每个节点添加到线程中以执行完成。我的实现是这样的:

parse_tp是一个创建的线程池对象& ParseQuotesXML 是具有 run 方法的类。

        try {     
           List children = root.getChildren();               
        Iterator iter = children.iterator();

        //Parsing the XML     
        while(iter.hasNext()) {       
           Element child = (Element) iter.next();           
           ParseQuotesXML quote = new ParseQuotesXML(child, this);         
           parse_tp.execute(quote);         
        }
    System.out.println("Print it after all the threads have completed");
        catch(Exception ex) {  
        ex.printStackTrace();      
        }
        finally {  
    System.out.println("Print it in the end.");
if(!parse_tp.isShutdown()) {
                if(parse_tp.getActiveCount() == 0 && parse_tp.getQueue().size() == 0 ) {
                    parse_tp.shutdown();                    
                } else {
                    try {
                        parse_tp.awaitTermination(30, TimeUnit.SECONDS);
                    } catch (InterruptedException ex) {
                        log.info("Exception while terminating the threadpool "+ex.getMessage());
                        ex.printStackTrace();
                    }
                }
            }
          parse_tp.shutdown();  
        }

问题是,这两个打印输出语句是在其他线程退出之前打印的。我想让主线程等待所有其他线程完成。 在正常的线程实现中,我可以使用 join() 函数来完成此操作,但无法在 ThreadPool Executor 中实现相同的效果。还想问一下finally块中写的代码是否正确关闭线程池?

谢谢, 阿米特

I am using the ThreadPoolExecutor to implement threading in my Java Application.

I have a XML which I need to parse and add each node of it to a thread to execute the completion. My implementation is like this:

parse_tp is a threadpool object created & ParseQuotesXML is the class with the run method.

        try {     
           List children = root.getChildren();               
        Iterator iter = children.iterator();

        //Parsing the XML     
        while(iter.hasNext()) {       
           Element child = (Element) iter.next();           
           ParseQuotesXML quote = new ParseQuotesXML(child, this);         
           parse_tp.execute(quote);         
        }
    System.out.println("Print it after all the threads have completed");
        catch(Exception ex) {  
        ex.printStackTrace();      
        }
        finally {  
    System.out.println("Print it in the end.");
if(!parse_tp.isShutdown()) {
                if(parse_tp.getActiveCount() == 0 && parse_tp.getQueue().size() == 0 ) {
                    parse_tp.shutdown();                    
                } else {
                    try {
                        parse_tp.awaitTermination(30, TimeUnit.SECONDS);
                    } catch (InterruptedException ex) {
                        log.info("Exception while terminating the threadpool "+ex.getMessage());
                        ex.printStackTrace();
                    }
                }
            }
          parse_tp.shutdown();  
        }

The problem is, the two print out statements are printed before the other threads exit. I want to make the main thread wait for all other threads to complete.
In normal Thread implementation I can do it using join() function but not getting a way to achieve the same in ThreadPool Executor. Also would like to ask if the code written in finally block to close the threadpool proper ?

Thanks,
Amit

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

笔芯 2024-08-22 02:13:43

CountDownLatch< /a> 就是为此目的而设计的。示例可在此处此处。当事先未知线程数时,请考虑 Phaser,Java 1.7 中的新功能,或 UpDownLatch

A CountDownLatch is designed for this very purpose. Examples may be found here and here. When the number of threads is not known in advance, consider a Phaser, new in Java 1.7, or an UpDownLatch.

错爱 2024-08-22 02:13:43

为了回答你的第二个问题,我认为你在清理线程池方面做得很合理。

关于你的第一个问题,我认为你想要使用的方法是 提交而不是执行。这里没有尝试用文本来解释这一切,而是我编写的一个单元测试的编辑片段,该测试执行了许多任务,每个任务都完成总工作的一部分,然后在起点会合以添加结果:

final AtomicInteger messagesReceived = new AtomicInteger(0);

// ThreadedListenerAdapter is the class that I'm testing 
// It's not germane to the question other than as a target for a thread pool.
final ThreadedListenerAdapter<Integer> adapter = 
    new ThreadedListenerAdapter<Integer>(listener);
int taskCount = 10;

List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();

for (int whichTask = 0; whichTask < taskCount; whichTask++) {
    FutureTask<Integer> futureTask = 
        new FutureTask<Integer>(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            // Does useful work that affects messagesSent
            return messagesSent;
        }
    });
    taskList.add(futureTask);
}

for (FutureTask<Integer> task : taskList) {
    LocalExecutorService.getExecutorService().submit(task);
}

for (FutureTask<Integer> task : taskList) {
    int result = 0;
    try {
        result = task.get();
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    } catch (ExecutionException ex) {
        throw new RuntimeException("ExecutionException in task " + task, ex);
    }
    assertEquals(maxMessages, result);
}

int messagesSent = taskCount * maxMessages;
assertEquals(messagesSent, messagesReceived.intValue());

我认为这个片段与您想要做的类似。关键组件是 提交get 方法。

To answer your second question, I think you are doing a reasonable job trying to clean up your thread pool.

With respect to your first question, I think the method that you want to use is submit rather than execute. Rather than try to explain it all in text, here's an edited fragment from a unit test that I wrote that makes many tasks, has each of them do a fragment of the total work and then meets back at the starting point to add the results:

final AtomicInteger messagesReceived = new AtomicInteger(0);

// ThreadedListenerAdapter is the class that I'm testing 
// It's not germane to the question other than as a target for a thread pool.
final ThreadedListenerAdapter<Integer> adapter = 
    new ThreadedListenerAdapter<Integer>(listener);
int taskCount = 10;

List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();

for (int whichTask = 0; whichTask < taskCount; whichTask++) {
    FutureTask<Integer> futureTask = 
        new FutureTask<Integer>(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            // Does useful work that affects messagesSent
            return messagesSent;
        }
    });
    taskList.add(futureTask);
}

for (FutureTask<Integer> task : taskList) {
    LocalExecutorService.getExecutorService().submit(task);
}

for (FutureTask<Integer> task : taskList) {
    int result = 0;
    try {
        result = task.get();
    } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
    } catch (ExecutionException ex) {
        throw new RuntimeException("ExecutionException in task " + task, ex);
    }
    assertEquals(maxMessages, result);
}

int messagesSent = taskCount * maxMessages;
assertEquals(messagesSent, messagesReceived.intValue());

I think this fragment is similar to what you're trying to do. The key components were the submit and get methods.

纸伞微斜 2024-08-22 02:13:43

首先,您可以使用 ThreadPoolExecutor.submit() 方法,该方法返回 Future 实例,然后在提交所有工作项后,您可以迭代这些 future 并调用 Future.get() 在他们每个人身上。

或者,您可以准备可运行的工作项并使用 ThreadPoolExecutor.invokeAll(),它将等待所有工作项完成,然后调用相同的 Future.get 即可获取执行结果或异常() 方法。

First of all you can use ThreadPoolExecutor.submit() method, which returns Future instance, then after you submitted all your work items you can iterate trough those futures and call Future.get() on each of them.

Alternatively, you can prepare your runnable work items and submit them all at once using ThreadPoolExecutor.invokeAll(), which will wait until all work items completed and then you can get the execution results or exception calling the same Future.get() method.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文