Java:当线程池中的所有线程都完成时通知主类/不同线程中的对象的同一实例

发布于 2024-11-24 05:33:19 字数 2960 浏览 0 评论 0 原文

当 ThreadPoolExecutor 中的所有线程都完成时,如何通知实例化 ThreadPoolExecutor 的主类?

ThreadPoolExecutor threadPool = null;
ThreadClass threadclass1;
ThreadClass threadclass2;
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(maxPoolSize);

puclic MyClass(){
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue);

        threadClass1 = new ThreadClass;
        threadClass2 = new ThreadClass;

        threadPool.execute(threadClass1);
        threadPool.execute(threadClass2);

        //Now I would like to do something until the threadPool is done working
        //The threads fill a ConcurrentLinkedQueueand I would like to poll
        //the queue as it gets filled by the threads and output 
        //it to XML via JAX-RS

}

编辑 1

当我的线程从某处获取数据并将这些信息填充到 ConcurrentLinkedQueue 中时,我基本上想在 MyClass 中执行一些操作以使用结果更新 XML 输出。当所有线程终止时,我想向实例化 MyClass 的 JAX-RS Web 服务返回 true,以便 Web 服务知道所有数据已被获取,并且它现在可以显示最终的 XML 文件

编辑 2

我正在传递一个 Queue 到线程,以便它们可以将项目添加到队列中。当一个driver完成向articleQueue添加项目后,我想在我的主类中执行一项操作,从Queue轮询实体并处理它传递给 response 对象以某种方式显示它。

当我将队列传递给线程时,它们是否使用同一对象或对象的“副本”,以便线程内的更改不会影响主对象?那不是我想要的行为。当我检查 DriverarticleQueue 的大小时,它是 18,即 articleQueuearticleQueue 的大小。 DriverController0

当线程向队列中添加了除 while 循环之外的内容时,是否有更好的反应方法?我必须如何修改代码才能访问不同类中的同一对象?

DriverController

public class DriverController {

    Queue<Article> articleQueue;

    ThreadPoolExecutor threadPool = null;
    final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
            maxPoolSize);

    public DriverController(Response response) {

        articleQueue = new ConcurrentLinkedQueue<Article>();
        threadPool = new ThreadPoolExecutor();
        Driver driver = new Driver(this.articleQueue);

        threadPool.execute(driver);
        // More drivers would be executed here which add to the queue

        while (threadPool.getActiveCount() > 0) {
            // this.articleQueue.size() gives back 0 here ... why?
            if(articleQueue.size()>0){
                response.addArticle(articleQueue.poll());
            }
        }

    }
}

驱动程序

public class Driver implements Runnable{

    private Queue<Article> articleQueue;

    public DriverAlliedElectronics(Queue articleQueue) {
        this.articleQueue = articleQueue;
    }

    public boolean getData() {
        // Here would be the code where the article is created ...

        this.articleQueue.offer(article);
        return true;
    }

    public void run() {
        this.getData();
        // this.articleQueue.size() gives back 18 here ...

    }
}

How do I notify my main class which instantiates a ThreadPoolExecutor when all threads within the ThreadPoolExecutor are completed?

ThreadPoolExecutor threadPool = null;
ThreadClass threadclass1;
ThreadClass threadclass2;
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(maxPoolSize);

puclic MyClass(){
        threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue);

        threadClass1 = new ThreadClass;
        threadClass2 = new ThreadClass;

        threadPool.execute(threadClass1);
        threadPool.execute(threadClass2);

        //Now I would like to do something until the threadPool is done working
        //The threads fill a ConcurrentLinkedQueueand I would like to poll
        //the queue as it gets filled by the threads and output 
        //it to XML via JAX-RS

}

EDIT 1

Wile my threads fetch data from somewhere and fill this information into a ConcurrentLinkedQueue I basically would like to perform some action in MyClass to update the XML output with the results. When all threads are terminated I would like to return true to the JAX-RS webservice which instantiated MyClass so the webservice knows all data has been fetched and it can now display the final XML file

EDIT 2

I am passing a Queue to threads so they can add items to the queue. When one driver is done adding items to the articleQueue I want to perform an action within my main class, polling the entity from the Queue and handing it over to the response object to display it in some way.

When I pass the queue to the threads, are they working with the same object or with a "copy" of the object so that changes within the thread do not effect the main object? That is not the behavior I want. When I check the size of the articleQueue within the Driver it is 18, the size of the articleQueue in the DriverController is 0.

Is there a nicer way to react when a thread has added something to the queue other than my while loop? How do I have to modify my code to acces the same object within different classes?

DriverController

public class DriverController {

    Queue<Article> articleQueue;

    ThreadPoolExecutor threadPool = null;
    final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
            maxPoolSize);

    public DriverController(Response response) {

        articleQueue = new ConcurrentLinkedQueue<Article>();
        threadPool = new ThreadPoolExecutor();
        Driver driver = new Driver(this.articleQueue);

        threadPool.execute(driver);
        // More drivers would be executed here which add to the queue

        while (threadPool.getActiveCount() > 0) {
            // this.articleQueue.size() gives back 0 here ... why?
            if(articleQueue.size()>0){
                response.addArticle(articleQueue.poll());
            }
        }

    }
}

Driver

public class Driver implements Runnable{

    private Queue<Article> articleQueue;

    public DriverAlliedElectronics(Queue articleQueue) {
        this.articleQueue = articleQueue;
    }

    public boolean getData() {
        // Here would be the code where the article is created ...

        this.articleQueue.offer(article);
        return true;
    }

    public void run() {
        this.getData();
        // this.articleQueue.size() gives back 18 here ...

    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

拍不死你 2024-12-01 05:33:19

您应该尝试使用以下代码片段

//Now I would like to wait until the threadPool is done working
threadPool.shutdown();
while (!threadPool.isTerminated()) {
    try {
        threadPool.awaitTermination(10, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

You should try to use following snippet

//Now I would like to wait until the threadPool is done working
threadPool.shutdown();
while (!threadPool.isTerminated()) {
    try {
        threadPool.awaitTermination(10, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
琉璃繁缕 2024-12-01 05:33:19

也许 ExecutorCompletionService 可能适合您:

http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ExecutorCompletionService.html

上面链接中的示例:

void solve(Executor e, Collection<Callable<Result>> solvers)
  throws InterruptedException, ExecutionException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    for (Callable<Result> s : solvers)
        ecs.submit(s);
    int n = solvers.size();
    for (int i = 0; i < n; ++i) {
        Result r = ecs.take().get();
        if (r != null) 
            use(r);
    }
}

Maybe a ExecutorCompletionService might be the right thing for you:

http://download.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ExecutorCompletionService.html

Example from the link above:

void solve(Executor e, Collection<Callable<Result>> solvers)
  throws InterruptedException, ExecutionException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    for (Callable<Result> s : solvers)
        ecs.submit(s);
    int n = solvers.size();
    for (int i = 0; i < n; ++i) {
        Result r = ecs.take().get();
        if (r != null) 
            use(r);
    }
}
似狗非友 2024-12-01 05:33:19

Instead of using execute you should use submit. This will return a Future instance on which you can wait for the task(s) to complete. That way you don't need polling or shutting down the pool.

花想c 2024-12-01 05:33:19

我认为没有办法明确地做到这一点。您可以轮询 getCompletedTaskCount() 以等待其变为零。

为什么不收集提交时返回的 Future 对象并检查所有这些对象是否已完成?只需依次对每个调用 get() 即可。由于该调用会阻塞,您只需依次等待每个调用,并逐渐从该集合中删除,直到您等待完每个调用。

或者,您可以提交线程,并在执行器上调用 shutdown()。这样,提交的任务就会被执行,然后调用termed()方法。如果您覆盖此设置,那么一旦所有任务完成,您将收到回调(显然,您不能再次使用该执行器)。

I don't think there's a way to do this explicitly. You could poll the getCompletedTaskCount() to wait for that to become zero.

Why not collect the Future objects returned upon submission and check for all of those being completed ? Simply call get() on each one in turn. Since that call blocks you'll simply wait for each in turn and gradually fall through the set until you've waited on each on.

Alternatively you could submit the threads, and call shutdown() on the executor. That way, the submitted tasks will be executed, and then the terminated() method is called. If you override this then you'll get a callback once all tasks are completed (you couldn't use that executor again, obviously).

对你再特殊 2024-12-01 05:33:19

参考文档 你有几个选择:

ThreadPoolExecutor threadPool = null;
ThreadClass threadclass1;
ThreadClass threadclass2;
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(maxPoolSize);

puclic MyClass(){
    threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue);

    threadClass1 = new ThreadClass;
    threadClass2 = new ThreadClass;

    threadPool.execute(threadClass1);
    threadPool.execute(threadClass2);

    //Now I would like to wait until the threadPool is done working

    //Option 1:  shutdown() and awaitTermination()
    threadPool.shutDown();
    try {
        threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }

    //Option 2:  getActiveCount()
    while (threadPool.getActiveCount() > 0) {
        try {
            Thread.sleep(1000);
        }
        catch (InterruptedException ignored) {}
    }

    //Option 3:  getCompletedTaskCount()
    while (threadPool.getCompletedTaskCount() < totalNumTasks) {
        try {
            Thread.sleep(1000);
        }
        catch (InterruptedException ignored) {}
    }

}

综合考虑,我认为 shutdown()awaitTermination() 是这三个选项中的最佳选择。

Judging from the reference documentation you have a few options:

ThreadPoolExecutor threadPool = null;
ThreadClass threadclass1;
ThreadClass threadclass2;
final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(maxPoolSize);

puclic MyClass(){
    threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, queue);

    threadClass1 = new ThreadClass;
    threadClass2 = new ThreadClass;

    threadPool.execute(threadClass1);
    threadPool.execute(threadClass2);

    //Now I would like to wait until the threadPool is done working

    //Option 1:  shutdown() and awaitTermination()
    threadPool.shutDown();
    try {
        threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }

    //Option 2:  getActiveCount()
    while (threadPool.getActiveCount() > 0) {
        try {
            Thread.sleep(1000);
        }
        catch (InterruptedException ignored) {}
    }

    //Option 3:  getCompletedTaskCount()
    while (threadPool.getCompletedTaskCount() < totalNumTasks) {
        try {
            Thread.sleep(1000);
        }
        catch (InterruptedException ignored) {}
    }

}

All things considered, I think shutdown() and awaitTermination() is the best option of the three.

匿名的好友 2024-12-01 05:33:19

我认为你有点过度设计了。您并不真正关心线程或线程池,这是正确的。 Java 提供了很好的抽象,因此您不必这样做。您只需要知道您的任务何时完成,并且存在相应的方法。只需提交您的工作,然后等待未来通知他们已完成。如果您确实想知道单个任务完成后的情况,您可以观察所有未来并在任何一项任务完成后立即采取行动。如果不是,并且您只关心一切都已完成,则可以从我即将发布的代码中删除一些复杂性。试试这个大小(注意 MultithreadedJaxrsResource 是可执行的):

import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import java.util.*;
import java.util.concurrent.*;

@Path("foo")
public class MultithreadedJaxrsResource {
    private ExecutorService executorService;

    public MultithreadedJaxrsResource(ExecutorService executorService) {
        this.executorService = executorService;
    }

    @GET
    @Produces(MediaType.APPLICATION_XML)
    public AllMyArticles getStuff() {
        List<Future<Article>> futures = new ArrayList<Future<Article>>();
        // Submit all the tasks to run
        for (int i = 0; i < 10; i++) {
            futures.add(executorService.submit(new Driver(i + 1)));
        }
        AllMyArticles articles = new AllMyArticles();
        // Wait for all tasks to finish
        // If you only care that everything is done and not about seeing
        // when each one finishes, this outer do/while can go away, and
        // you only need a single for loop to wait on each future.
        boolean allDone;
        do {
            allDone = true;
            Iterator<Future<Article>> futureIterator = futures.iterator();
            while (futureIterator.hasNext()) {
                Future<Article> future =  futureIterator.next();
                if (future.isDone()) {
                    try {
                        articles.articles.add(future.get());
                        futureIterator.remove();
                    } catch (InterruptedException e) {
                        // thread was interrupted. don't do that.
                        throw new IllegalStateException("broken", e);
                    } catch (ExecutionException e) {
                        // execution of the Callable failed with an
                        // exception. check it out.
                        throw new IllegalStateException("broken", e);
                    }
                } else {
                    allDone = false;
                }
            }
        } while (!allDone);
        return articles;
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        AllMyArticles stuff =
            new MultithreadedJaxrsResource(executorService).getStuff();
        System.out.println(stuff.articles);
        executorService.shutdown();
    }
}

class Driver implements Callable<Article> {
    private int i; // Just to differentiate the instances

    public Driver(int i) {
        this.i = i;
    }

    public Article call() {
        // Simulate taking some time for each call
        try {
            Thread.sleep(1000 / i);
        } catch (InterruptedException e) {
            System.err.println("oops");
        }
        return new Article(i);
    }
}

class AllMyArticles {
    public final List<Article> articles = new ArrayList<Article>();
}

class Article {
    public final int i;

    public Article(int i) {
        this.i = i;
    }

    @Override
    public String toString() {
        return "Article{" +
                       "i=" + i +
                       '}';
    }
}

这样做,您可以清楚地看到任务按完成的顺序返回,因为最后一个任务由于休眠时间最短而首先完成。如果您不关心完成顺序而只想等待所有内容完成,则循环会变得更简单:

for (Future<Article> future : futures) {
    try {
        articles.articles.add(future.get());
    } catch (InterruptedException e) {
        // thread was interrupted. don't do that.
        throw new IllegalStateException("broken", e);
    } catch (ExecutionException e) {
        // execution of the Callable failed with an exception. check it out.
        throw new IllegalStateException("broken", e);
    }
}

I think you're overengineering things a bit. You don't really care about the threads or the thread pool, and rightly so. Java provides nice abstractions so that you don't have to. You just need to know when your tasks are complete, and methods exist for that. Just submit your jobs, and wait for the futures to say they're done. If you really want to know as soon as a single task completes, you can watch all the futures and take action as soon as any one is finished. If not and you only care that everything is finished, you can remove some complexity from the code I'm about to post. Try this on for size (note MultithreadedJaxrsResource is executable):

import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import java.util.*;
import java.util.concurrent.*;

@Path("foo")
public class MultithreadedJaxrsResource {
    private ExecutorService executorService;

    public MultithreadedJaxrsResource(ExecutorService executorService) {
        this.executorService = executorService;
    }

    @GET
    @Produces(MediaType.APPLICATION_XML)
    public AllMyArticles getStuff() {
        List<Future<Article>> futures = new ArrayList<Future<Article>>();
        // Submit all the tasks to run
        for (int i = 0; i < 10; i++) {
            futures.add(executorService.submit(new Driver(i + 1)));
        }
        AllMyArticles articles = new AllMyArticles();
        // Wait for all tasks to finish
        // If you only care that everything is done and not about seeing
        // when each one finishes, this outer do/while can go away, and
        // you only need a single for loop to wait on each future.
        boolean allDone;
        do {
            allDone = true;
            Iterator<Future<Article>> futureIterator = futures.iterator();
            while (futureIterator.hasNext()) {
                Future<Article> future =  futureIterator.next();
                if (future.isDone()) {
                    try {
                        articles.articles.add(future.get());
                        futureIterator.remove();
                    } catch (InterruptedException e) {
                        // thread was interrupted. don't do that.
                        throw new IllegalStateException("broken", e);
                    } catch (ExecutionException e) {
                        // execution of the Callable failed with an
                        // exception. check it out.
                        throw new IllegalStateException("broken", e);
                    }
                } else {
                    allDone = false;
                }
            }
        } while (!allDone);
        return articles;
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        AllMyArticles stuff =
            new MultithreadedJaxrsResource(executorService).getStuff();
        System.out.println(stuff.articles);
        executorService.shutdown();
    }
}

class Driver implements Callable<Article> {
    private int i; // Just to differentiate the instances

    public Driver(int i) {
        this.i = i;
    }

    public Article call() {
        // Simulate taking some time for each call
        try {
            Thread.sleep(1000 / i);
        } catch (InterruptedException e) {
            System.err.println("oops");
        }
        return new Article(i);
    }
}

class AllMyArticles {
    public final List<Article> articles = new ArrayList<Article>();
}

class Article {
    public final int i;

    public Article(int i) {
        this.i = i;
    }

    @Override
    public String toString() {
        return "Article{" +
                       "i=" + i +
                       '}';
    }
}

Done that way, you can plainly see that the tasks are returned in the order they complete, as the last task finishes first thanks to sleeping the shortest time. If you don't care about completion order and just want to wait for all to finish, the loop becomes much simpler:

for (Future<Article> future : futures) {
    try {
        articles.articles.add(future.get());
    } catch (InterruptedException e) {
        // thread was interrupted. don't do that.
        throw new IllegalStateException("broken", e);
    } catch (ExecutionException e) {
        // execution of the Callable failed with an exception. check it out.
        throw new IllegalStateException("broken", e);
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文