java多线程从队列中取出数据执行

发布于 2022-09-05 04:47:44 字数 2629 浏览 17 评论 0

先用python实现, 把每一个要处理的参数存放到队列Queue中, 然后创建线程从队列中取出

class ThreadExecuteJob(threading.Thread):

"""Threaded Url Grab"""

def __init__(self, queue):
    threading.Thread.__init__(self)
    self.queue = queue

def run(self):
    while 1:
        i = self.queue.get()
        try:
            print "执行的参数" + str(i)   
            # 发送执行完毕信号
            self.queue.task_done()
        except Exception as e:
            logging.warning(str(fun) + "---" + str(e.message) + "---" + str(e.args))

def execute_job_thread_pool(queue):

"""
:param queue: 队列
:param arg:   函数的参数
:return:
"""
for i in xrange(6):
    t = ThreadExecuteJob(queue)
    t.setDaemon(True)
    t.start()
if __name__ == "__main__":
    import Queue
    day_q = Queue.Queue()
    for i in xrange(6):
       day_q.put(i)
    execute_job_thread_pool(day_q)
    day_q.join()
    
    

用python实现的多线程取出0-5个数打印出来。现在用java粗略实现的如下, 但是队列数据取完了主线程没有停止, 如何让主线程停止,如何像python那样队列数据取完主线程也停止

public class HelloJava8 {

public static void main(String[] args) throws InterruptedException {
    // TODO Auto-generated method stub
    
    BlockingQueue s = new LinkedBlockingQueue();
    s.put("java");
    s.put("python");
    s.put("php");
    s.put("c++");
    s.put("c");
    ArrayList<Thread> threadList = new ArrayList<Thread>();
    Thread t1 = new Thread(new Consumer("zhangsan", s));
    t1.start();
    threadList.add(t1);
    Thread t2 = new Thread(new Consumer("lisi", s));
    t2.start();
    threadList.add(t2);
    Thread t3 = new Thread(new Consumer("wangwu", s));
    t3.start();
    threadList.add(t3);
    Thread t4 = new Thread(new Consumer("afei", s));
    t4.start();
    threadList.add(t4);
    Thread t5 = new Thread(new Consumer("jb", s));
    t5.start();
    threadList.add(t5);

    
    for(Thread thread: threadList) {
        thread.join();
    } 
    
    System.out.println("主线程执行完毕");
}

}

class Consumer implements Runnable {
    private String name;
    private BlockingQueue s = null;

    public Consumer(String name, BlockingQueue s) {
        this.name = name;
        this.s = s;
    }

    public void run() {
        try {
            while (true) {
                String product = (String) s.take();
                System.out.println(name + "消费(" + product.toString() + ").");
                System.out.println("===============");
                Thread.sleep(300);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

心奴独伤 2022-09-12 04:47:45

Consumer类的run方法中,while的条件应该改为!s.isEmpty()
主线程未能执行结束的原因是子线程都没有结束造成的,因为BlockingQueue.take()造成了阻塞,看一下文档中写的

take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入

也就是说当BlockingQueue中的对象被消费完毕之后,再次调用其take方法,就会进入等待状态,而题主在程序中使用while(true)作为循环条件,则所有的线程都会不断的调用队列的take方法,而进入阻塞状态,无法退出线程,子线程无法退出,则join之后的主线程System.out.println("主线程执行完毕");则永远在等待,无法执行,故主线程也无法退出。

谷夏 2022-09-12 04:47:45

楼上的解释完全正确,不过可以利用 s.poll() 返回值是否为 null,作为跳出条件。
下面诗完整代码。

package com.demo;

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

class Consumer implements Runnable {
    private String name;

    private BlockingQueue<String> s = null;

    public Consumer(String name, BlockingQueue<String> s) {
        this.name = name;
        this.s = s;
    }

    public void run() {
        try {
            while (true) {
                String product = s.poll();
                if (product == null) {
                    break;
                }
                int t = (new Random()).nextInt(10);
                System.out.println(name + "消费(" + product + ").");
                System.out.println(name + " ==== sleep " + t + " secs ====");
                Thread.sleep(t * 1000);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

public class HelloJava8 {
    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> s = new LinkedBlockingQueue<String>();
        s.put("java");
        s.put("python");
        s.put("php");
        s.put("c++");
        s.put("c");

        ArrayList<Thread> threadList = new ArrayList<Thread>();

        Thread t1 = new Thread(new Consumer("zhangsan", s));
        t1.start();
        threadList.add(t1);
        Thread t2 = new Thread(new Consumer("lisi", s));
        t2.start();
        threadList.add(t2);
        Thread t3 = new Thread(new Consumer("wangwu", s));
        t3.start();
        threadList.add(t3);
        Thread t4 = new Thread(new Consumer("afei", s));
        t4.start();
        threadList.add(t4);
        Thread t5 = new Thread(new Consumer("jb", s));
        t5.start();
        threadList.add(t5);

        for (Thread thread : threadList) {
            thread.join();
        }

        System.out.println("主线程执行完毕");
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文