生产者消费者请求取消

发布于 2024-11-29 21:45:19 字数 1862 浏览 1 评论 0原文

public class MainClass {

private static final int size = 5;

private ExecutorService prodExec = Executors.newFixedThreadPool(size);
private ExecutorService consExec = Executors.newFixedThreadPool(size);

//main method here

public void start(String[] args) {

    for (int index = 0; index < size; index++) {
        Runnable producer = new Producer(consExec, listOfIds);
        prodExec.execute(producer);
    }

    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            prodExec.shutdown();
            try {
                prodExec.awaitTermination(10, TimeUnit.SECONDS);
            } catch (InterruptedException ignore) {
            }

            consExec.shutdown();
            try {
                consExec.awaitTermination(10, TimeUnit.SECONDS);
            } catch (InterruptedException ignore) {
            }
        }
    });
    }
}
public class Producer implements Runnable {

private ExecutorService consExec;
private List<Long> list;

public Producer(ExecutorService exec, List<Long> list) {
    this.consExec = exec;
    this.list = list;
}

public void run() {
    for (Long id: list) {
        data = get data from db for the id
        consExec.execute(new Consumer(data));
    }
}
}
public class Consumer implements Runnable {

public void run() {
    // call web service
}
}

我想通过按 Ctrl+C 来处理用户请求关闭的情况。我认为这可以在关闭挂钩中完成。然而,如上面的代码所示,每个 Producer 都会获取一个 ID 列表(可能是 250 个?)来处理,即调用 db 检索该 ID 的数据并将数据提交给消费者线程,然后消费者线程调用 Web服务。

当请求关闭以便每个线程不处理尚未处理的 id 时,如何打破每个生产者线程中的 for 循环?我能够让 shutdownHook 工作,但不确定每个线程应如何将逻辑合并到 run 方法中,以便在出现关闭请求时退出 run() 方法。可能是通过在外部设置一个布尔变量(AtomicBoolean),每个线程在处理每个 id 之前在 for 循环中检查?

据我了解,如果我调用 shutdown(),它会执行所有提交的任务,然后完成。在这种情况下,无法停止处理,因为任务已经在执行器服务上排队。

如果我调用 shutdownNow() 而不是 shutdown() 可能会产生意外结果?

public class MainClass {

private static final int size = 5;

private ExecutorService prodExec = Executors.newFixedThreadPool(size);
private ExecutorService consExec = Executors.newFixedThreadPool(size);

//main method here

public void start(String[] args) {

    for (int index = 0; index < size; index++) {
        Runnable producer = new Producer(consExec, listOfIds);
        prodExec.execute(producer);
    }

    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            prodExec.shutdown();
            try {
                prodExec.awaitTermination(10, TimeUnit.SECONDS);
            } catch (InterruptedException ignore) {
            }

            consExec.shutdown();
            try {
                consExec.awaitTermination(10, TimeUnit.SECONDS);
            } catch (InterruptedException ignore) {
            }
        }
    });
    }
}
public class Producer implements Runnable {

private ExecutorService consExec;
private List<Long> list;

public Producer(ExecutorService exec, List<Long> list) {
    this.consExec = exec;
    this.list = list;
}

public void run() {
    for (Long id: list) {
        data = get data from db for the id
        consExec.execute(new Consumer(data));
    }
}
}
public class Consumer implements Runnable {

public void run() {
    // call web service
}
}

I would like to handle the scenario when a user requests a shutdown may be by pressing Ctrl+C. I think this could be done in the shutdown hook. However, as in the above code, each Producer gets a list of IDs (250 may be?) to process i.e., call the db to retrieve data for the ID and submit the data to the consumer thread, which then calls out to a web service.

How do I break out of the for loop in each of the Producer thread, when a shutdown has been requested so that each thread does not process the ids that haven't been processes yet? I was able to get the shutDownHook to work, but not sure how each thread should incorporate the logic within the run method to exit the run() method in case of a shutdown request. May be by setting a boolean variable(AtomicBoolean) externally that each thread checks in the for loop before processing every id?

As I understand it, if I call shutdown(), it executes all the submitted tasks and then finishes. In this case, there is no way of stopping processing as the tasks have already beeb queued on the executor service.

If I call shutdownNow() instead of shutdown() it might produce unexpected results?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

自我难过 2024-12-06 21:45:19

shutdownNow 与 shutdown 取决于您是否希望首先完成当前正在运行的任务。

如果你想让他们立即停止,你会做两件事。首先调用 shutdownNow。第二,在 run 方法中测试线程的中断状态。

public class Producer implements Runnable {

private ExecutorService consExec;
private List<Long> list;

public Producer(ExecutorService exec, List<Long> list) {
    this.consExec = exec;
    this.list = list;
}

    public void run() {
        for (Long id: list) {
            if(Thread.currentThread().isInterrupted()){
               //the shutdownNow method has been called (or may a future.cancel(true))
            }
            data = get data from db for the id
            consExec.execute(new Consumer(data));
        }
    }
}

您可以在这里看到 run 方法现在知道当前线程已被中断。然后该运行方法可以清理所有数据并退出

shutdownNow vs shutdown depends on if you want the currently running tasks to complete first.

If you want them to stop immidiately you would do two things. First invoke shutdownNow. And two, within the run method you test the thread's interruption status.

public class Producer implements Runnable {

private ExecutorService consExec;
private List<Long> list;

public Producer(ExecutorService exec, List<Long> list) {
    this.consExec = exec;
    this.list = list;
}

    public void run() {
        for (Long id: list) {
            if(Thread.currentThread().isInterrupted()){
               //the shutdownNow method has been called (or may a future.cancel(true))
            }
            data = get data from db for the id
            consExec.execute(new Consumer(data));
        }
    }
}

You can see here the the run method now knows the current thread has been interrupted. That run method then can clean up any data and exit

若水微香 2024-12-06 21:45:19

如果您使用的是 Java SE 6+,您可以访问 JMX 类。我们发现使用它来“关闭”我们正在运行的服务很有帮助。

本质上,您将服务注册为 JMX 服务。并使用驱动程序类来启动它。

在实际的服务类中,实现本质上使用无限循环来检查条件以查看其是否为真(默认情况下为真)的逻辑。

创建另一个连接到 JMX 服务的驱动程序类,并将循环条件的值修改为 false。然后,当您达到循环条件时,它将(优雅地)关闭并且不处理下一组值。

If you are using Java SE 6+ you have access to the JMX classes. We found it helpful to use this to "shut down" services that we have running.

In essence you register your service as a JMX service. And use a driver class to fire it off.

In your actual service class, implement your logic that in essence uses an infinite loop to check a condition to see if it is true (which it will be by default).

Create another driver class that connects to your JMX service and modifies the value of the loop condition to be false. Then when you hit your loop condition it will (gracefully) shut down and not process the next set of values.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文