生产者-消费者场景的正确实现和“优雅”的实现线程池终止

发布于 2024-10-22 14:15:03 字数 2880 浏览 1 评论 0原文

我正在开发我的第一个多线程项目,因此有一些我不确定的事情。有关我的设置的详细信息位于上一个问题< /a>,简而言之:我有一个由 Executors.newFixedThreadPool(N) 实现的线程池。一个线程被赋予一个操作,该操作对本地和远程资源进行一系列查询并迭代填充 ArrayBlockingQueue,而其余线程则调用该操作上的 take() 方法。排队并处理队列中的对象。

尽管小型和受监督的测试似乎运行正常,但我不确定如何处理特殊场景,例如开始(队列还没有项目)、结束(队列已清空)以及任何最终的 InterruptedExceptions< /代码>。我在这里读了一些关于 SO 的文章,然后让我读到了 GoetzKabutz。人们似乎一致认为不应忽视这些例外情况。但是我不确定提供的示例与我的情况有何关系,我没有在代码中的任何地方调用 thread.interrupt()...说到这一点,我不确定我是否应该这样做所以...

总而言之,鉴于下面的代码,我如何最好地处理特殊情况,例如终止条件和 InterrrruptedExceptions?希望这些问题有意义,否则我会尽力进一步描述。

提前致谢,


编辑:我已经致力于实施一段时间了,我遇到了一个新的问题,所以我想我应该更新一下情况。我不幸遇到了 ConcurrentModificationException,这很可能是由于线程池的不完全关闭/终止造成的。当我发现我可以使用 isTermminate() 时,我就尝试了,然后由于不同步 wait(),我得到了 IllegalMonitorStateException 。代码的当前状态如下:

我已经遵​​循了@Jonathan的回答中的一些建议,但是我认为他的建议并不像我需要/想要的那样。背景故事与我上面提到的相同,相关代码如下:

持有/管理池的类,以及可运行对象的提交:

public void serve() {
    try {
        this.started = true;
        pool.execute(new QueryingAction(pcqs));
        for(;;){
            PathwayImpl p = bq.take();

            if (p.getId().equals("0")){
                System.out.println("--DEBUG: Termination criteria found, shutdown initiated..");
                pool.shutdown();
                            // give 3 minutes per item in queue to finish up
                pool.awaitTermination(3 * bq.size(), TimeUnit.MINUTES);
                break;
            }
            int sortMethod = AnalysisParameters.getInstance().getSort_method();
            pool.submit(new AnalysisAction(p)); 
        }
      } catch (Exception ex) {
          ex.printStackTrace();
          System.err.println("Unexpected error in core analysis, terminating execution!");
          System.exit(0);
      }finally{   pool.shutdown();     }
}

public boolean isDone(){
    if(this.started)
        return pool.isTerminated();
    else
        return false;
    }

通过位于单独类中的以下代码将元素添加到队列中:

this.queue.offer(path, offer_wait, TimeUnit.MINUTES);

... offer() 而不是 take() 背后的动机正如 Jonathan 提到的。不可预见的块很烦人并且很难弄清楚,因为我的分析需要很长时间。所以我需要相对快速地知道失败是否是由于坏块,或者是否只是处理数字......


最后;这是我的测试类中的代码,我在其中检查“并发服务”(此处名为 cs)与其余要分析的对象之间的交互:

cs.serve();
synchronized (this) {
    while(!cs.isDone())
    this.wait(5000);
}
ReportGenerator rg = new ReportGenerator();
rg.doReports();

我意识到这是一个非常长的问题,但我尝试详细说明具体的。希望这不会太拖累,如果有的话我深表歉意......

I am working on my first multi-threaded project and thus have a couple of things that I am unsure of. Details on my setup was on a previous question, in short: I have a thread pool implemented by Executors.newFixedThreadPool(N). One thread is given an action which does a series of queries to local and remote resources and iteratively populates an ArrayBlockingQueue, while the rest of the threads invoke take() method on the queue and process the objects in the queue.

Even though small and supervised tests seem to run OK, I am unsure about how I handle special scenarios such as the beginning (the queue has no items yet), the end (the queue is emptied), and any eventual InterruptedExceptions. I have done some reading here on SO, which then led me to two really nice articles by Goetz and Kabutz. The consensus seems to be that one should not ignore these exceptions. However I am unsure how the examples supplied relates to my situation, I have not invoked thread.interrupt() anywhere in my code... Speaking of which, I'm getting unsure if I should have done so...

To sum it up, given the code below, how do I best handle the special cases, such as termination criteria and the InterrruptedExceptions? Hope the questions make sense, otherwise I'll do my best to describe it further.

Thanks in advance,


edit: I have been working on the implementation for a while now, and I have come across a new hiccup so I figured I'd update the situation. I have had the misfortune of coming across ConcurrentModificationException which was most likely due to incomplete shutdown/termination of the thread pool. As soon as I figured out I could use isTerminated() I tried that, then I got a IllegalMonitorStateException due to an unsynchronized wait(). The current state of the code is below:

I have followed some of the advices from @Jonathan's answer, however I don't think his proposal works quite like what I need/want. The background story is the same as I have mentioned above, and relevant bits of code are as follows:

Class holding/managing the pool, and submission of runnables:

public void serve() {
    try {
        this.started = true;
        pool.execute(new QueryingAction(pcqs));
        for(;;){
            PathwayImpl p = bq.take();

            if (p.getId().equals("0")){
                System.out.println("--DEBUG: Termination criteria found, shutdown initiated..");
                pool.shutdown();
                            // give 3 minutes per item in queue to finish up
                pool.awaitTermination(3 * bq.size(), TimeUnit.MINUTES);
                break;
            }
            int sortMethod = AnalysisParameters.getInstance().getSort_method();
            pool.submit(new AnalysisAction(p)); 
        }
      } catch (Exception ex) {
          ex.printStackTrace();
          System.err.println("Unexpected error in core analysis, terminating execution!");
          System.exit(0);
      }finally{   pool.shutdown();     }
}

public boolean isDone(){
    if(this.started)
        return pool.isTerminated();
    else
        return false;
    }

Elements are added to the queue by the following code on located in a separate class:

this.queue.offer(path, offer_wait, TimeUnit.MINUTES);

... motivation behind offer() instead of take() is as Jonathan mentioned. Unforeseen blocks are annoying and hard to figure out as my analysis take a long time as it is. So I need to know relatively quick if the fails due to a bad block, or if it's just crunching numbers...


and finally; here's the code in my test class where I check the interaction between the "concurrency service" (named cs here) and the rest of the objects to be analyzed:

cs.serve();
synchronized (this) {
    while(!cs.isDone())
    this.wait(5000);
}
ReportGenerator rg = new ReportGenerator();
rg.doReports();

I realize that this has been a VERY long question but I tried to be detailed and specific. Hopefully it won't be too much of a drag, and I apologize in case it is...

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

陌路终见情 2024-10-29 14:15:03

不要使用块的 take,而是使用类似这样的东西:

PathwayImpl p = null;
synchronized (bq) {
    try {
        while (bq.isEmpty() && !stopSignal) {
            bq.wait(3000); // Wait up to 3 seconds and check again
        }

        if (!stopSignal) {
            p = bq.poll();
        }
    }
    catch (InterruptedException ie) {
        // Broke us out of waiting, loop around to test the stopSignal again
    }
}

这假设该块包含在某种 while (!stopSignal) {...} 中。

然后,在添加到队列的代码中,执行以下操作:

synchronized (bq) {
    bq.add(item);
    bq.notify();
}

对于 InterruptedException,它们适合向线程发出信号以立即测试停止信号,而不是等到下一次超时 - 并且 -测试。我建议再次测试您的停止信号,并可能记录异常。

我在发出恐慌信号而不是正常关闭时使用它们,但这种情况很少有必要。

Instead of using take, which blocks, use something more like this:

PathwayImpl p = null;
synchronized (bq) {
    try {
        while (bq.isEmpty() && !stopSignal) {
            bq.wait(3000); // Wait up to 3 seconds and check again
        }

        if (!stopSignal) {
            p = bq.poll();
        }
    }
    catch (InterruptedException ie) {
        // Broke us out of waiting, loop around to test the stopSignal again
    }
}

This assumes that the block is enclosed in some sort of while (!stopSignal) {...}.

Then, in the code that adds to the queue, do this:

synchronized (bq) {
    bq.add(item);
    bq.notify();
}

As for InterruptedExceptions, they are good for signaling the thread to test the stop signal immediately, instead of waiting until the next timeout-and-test. I suggest just testing your stop signal again, and possibly logging the exception.

I use them when signaling a panic, versus a normal shutdown, but it is rare that such a situation is necessary.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文