Java正确使用ExecutorService、CompletionService、BlockingQueue和Observer?

发布于 2024-12-02 20:00:58 字数 4470 浏览 4 评论 0原文

所以,我对多线程还很陌生,并且最近在我的所有程序中都使用了这个想法。在我开始更多地使用它之前,我真的想确保它是使用 Executor、CompletionService 和 BlockingQueue 以及 Observer 来实现多线程的正确有效方法。我将在下面提供示例代码,但首先让我快速解释一下我认为它是如何工作的,也许这会有所帮助。

我拥有的第一件事是一个 BlockingQueue,所有任务都通过 add(Task task) 方法添加到该队列中。创建类后,将调用 run 方法,并使用 while(true) 调用 take 来阻塞队列,直到有内容添加到任务队列中。

一旦有东西被添加到 run() 内的队列中,queue.take() 就会返回队列中的项目。然后我将这个项目传递给 WorkerThread 类来处理它。该workerThread被添加到CompletionService池中,该池处理等待线程完成的情况。

好吧,现在是我不确定是否正确的部分。我还有一个实现 runnable 的内部类,并在类初始化时启动。它的工作是永远循环调用 pool.take()。因此,这本质上是等待其中一个 WorkerThread 完成。我让完成服务处理这个问题。一旦 take() 获得一个值,内部类就会将其传递给通知观察者方法。

这可以吗?让我有点担心的是,主类在任务队列上运行 while(true) 循环,并且内部类也在池上循环等待从 WorkerThread 接收结果?

这是一个示例实现。你认为呢?

     public class HttpSchedulerThreaded extends Observable implements Runnable  {

private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
    numThreadsInPool = numThreads;
    executor = Executors.newFixedThreadPool(numThreads);
    doneSignal = new CountDownLatch(numThreads);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    queue = new LinkedBlockingQueue<VulnInfo>();
}

public HttpSchedulerThreaded()
{
    numThreadsInPool = 1;
    executor = Executors.newFixedThreadPool(1);
    doneSignal = new CountDownLatch(1);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    queue = new LinkedBlockingQueue<VulnInfo>();
}

public void setThreadCount(int numThreads)
{
    if(!isRunning){
    executor = Executors.newFixedThreadPool(numThreads);
    doneSignal = new CountDownLatch(numThreads);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    numThreadsInPool = numThreads;
    }
}


public void start()
{
    if(!isRunning){
        responseWorkerThread.start();
        new Thread(this).start();
        isRunning = true;
    }

}


public void add(VulnInfo info) {
    queue.add(info);
}

@Override
public void run() {
    // TODO Auto-generated method stub
    while(shouldRun)
    {   
        try {
            VulnInfo info = queue.take();
            Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
            //System.out.println("submitting to pooler: " + info.getID());
            pool.submit(worker);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }   
    }
}

/**
 * Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
 * are complete it will send them to server for completion.
 * @author Steve
 *
 */
class HttpSchedulerWorker  implements Runnable{

    public void run() {
        // TODO Auto-generated method stub
        while(true)
        {
            VulnInfo vulnInfo = null;
            try {
                //System.out.println("taking finished request");
                Future<VulnInfo>    tmp = pool.take();
            //  Future<VulnInfo> tmp = pool.poll();
                if(tmp != null)
                    vulnInfo = tmp.get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if(vulnInfo != null)
            {
                //System.out.println("updating all observers: "  + vulnInfo.getID());
                updateObservers(vulnInfo);
            }



        }
    }

}

So, I'm pretty new to multi-threading and have been using this idea in all my programs lately. Before I start using it more I really want to make sure it is a correct efficient way to implement multi-threading using the Executor,CompletionService and a BlockingQueue plus an Observer. I'll provide example code below but let me first quickly explain how I think it works and maybe that will help.

The first thing I have is a BlockingQueue all tasks are added to this queue via an add(Task task) method. Upon creation of the class the run method is called with a while(true) calling take on the queue blocking until something gets added to the task queue.

Once something gets added to the queue inside the run() queue.take() returns the item on queue. Then I take that item and pass it to WorkerThread class that does stuff on it. That workerThread is added to the CompletionService pool which handles the waiting for a thread to finish.

Ok now comes the part i'm not sure is correct. I also have an inner class that implements runnable and is started when the class is initialized. Its job is to loop forever calling pool.take(). So, this essentially waits for one of the WorkerThreads to complete. I let the completion service handle this. Once the take() gets a value the inner class passes it to a notify observer method.

Is this okay implementation.? It concerns me a bit that there is the main classes run with a while(true) looping on task queue and an inner class also looping waiting on pool to receive a result from WorkerThread?

Here is an example implementation. What you think?

     public class HttpSchedulerThreaded extends Observable implements Runnable  {

private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
    numThreadsInPool = numThreads;
    executor = Executors.newFixedThreadPool(numThreads);
    doneSignal = new CountDownLatch(numThreads);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    queue = new LinkedBlockingQueue<VulnInfo>();
}

public HttpSchedulerThreaded()
{
    numThreadsInPool = 1;
    executor = Executors.newFixedThreadPool(1);
    doneSignal = new CountDownLatch(1);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    queue = new LinkedBlockingQueue<VulnInfo>();
}

public void setThreadCount(int numThreads)
{
    if(!isRunning){
    executor = Executors.newFixedThreadPool(numThreads);
    doneSignal = new CountDownLatch(numThreads);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    numThreadsInPool = numThreads;
    }
}


public void start()
{
    if(!isRunning){
        responseWorkerThread.start();
        new Thread(this).start();
        isRunning = true;
    }

}


public void add(VulnInfo info) {
    queue.add(info);
}

@Override
public void run() {
    // TODO Auto-generated method stub
    while(shouldRun)
    {   
        try {
            VulnInfo info = queue.take();
            Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
            //System.out.println("submitting to pooler: " + info.getID());
            pool.submit(worker);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }   
    }
}

/**
 * Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
 * are complete it will send them to server for completion.
 * @author Steve
 *
 */
class HttpSchedulerWorker  implements Runnable{

    public void run() {
        // TODO Auto-generated method stub
        while(true)
        {
            VulnInfo vulnInfo = null;
            try {
                //System.out.println("taking finished request");
                Future<VulnInfo>    tmp = pool.take();
            //  Future<VulnInfo> tmp = pool.poll();
                if(tmp != null)
                    vulnInfo = tmp.get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if(vulnInfo != null)
            {
                //System.out.println("updating all observers: "  + vulnInfo.getID());
                updateObservers(vulnInfo);
            }



        }
    }

}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

总攻大人 2024-12-09 20:00:58

根据我的经验,你的解决方案似乎没问题。我有三个评论/建议:

  1. 一旦创建了新的执行线程 responseWorkerThread = new Thread(schedulerWorker)responseWorkerThread.start(),您基本上就已经分解了这些线程两个循环。这部分看起来还不错。您似乎确实正确使用了 Executor 的 API,但看起来您可能需要更多代码来停止 HttpScheduledWorker 线程并关闭 ExecutionCompletionService 作为 HttpSchedulerThreaded 类的一部分。
  2. 我不确定您使用 queue 是否真的有必要。 ExecutionCompletionService 已经使用 BlockingQueue 来管理提交给它的任务。
  3. 您的“问题”可能更适合 测试版代码审查 站点。

From my experience, your solution seems to be okay. I have three comments/suggestions:

  1. Once you create a new thread of execution responseWorkerThread = new Thread(schedulerWorker) and responseWorkerThread.start(), you've essentially broken apart those two loops. This part looks okay. You do seem to be using the Executors API correctly, but it does look like you may need some more code for stopping the HttpScheduledWorker thread and for shutting down the ExecutionCompletionService as part of the HttpSchedulerThreaded class.
  2. I'm not sure that your use of queue is really necessary. ExecutionCompletionService already uses a BlockingQueue to manage the tasks which are submitted to it.
  3. Your "question" may be a better fit over at the beta Code Review site.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文