具有 wait() 和 notification() 的 ConcurrentLinkedQueue

发布于 2024-12-27 00:11:26 字数 2329 浏览 6 评论 0原文

我不太熟悉多线程。我试图通过一个生产者线程重复截取屏幕截图,该线程将 BufferedImage 对象添加到 ConcurrentLinkedQueue 中,并且消费者线程将轮询队列中的 >BufferedImage 对象将它们保存在文件中。我可以通过重复轮询(while 循环)来使用它们,但我不知道如何使用 notify()wait() 来使用它们。我曾尝试在较小的程序中使用 wait()notify ,但无法在这里实现。

我有以下代码:

class StartPeriodicTask implements Runnable {
    public synchronized void run() {
        Robot robot = null;
        try {
            robot = new Robot();
        } catch (AWTException e1) {
            e1.printStackTrace();
        }
        Rectangle screenRect = new Rectangle(Toolkit.getDefaultToolkit()
                .getScreenSize());
        BufferedImage image = robot.createScreenCapture(screenRect);
        if(null!=queue.peek()){
            try {
                System.out.println("Empty queue, so waiting....");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            queue.add(image);
            notify();
        }
    }
}

public class ImageConsumer implements Runnable {
        @Override
        public synchronized void run() {
            while (true) {
                BufferedImage bufferedImage = null;
                if(null==queue.peek()){
                    try {
                        //Empty queue, so waiting....
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }else{
                    bufferedImage = queue.poll();
                    notify();
                }
                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                    imageFile.getParentFile().mkdirs();
                }
                    try {
                        ImageIO.write(bufferedImage, extension, imageFile);
                        //Image saved
                    catch (IOException e) {
                        tracer.severe("IOException occurred. Image is not saved to file!");
                    }
                }
            }

之前我进行了重复轮询以检查 BufferedImage 对象是否存在。现在我已将 run 方法更改为 synchronized 并尝试实现 wait()notify()。我做的对吗?请帮忙。谢谢。

I am not well-versed in Multi-Threading. I am trying to take screenshot repeatedly by one producer thread, which adds the BufferedImage object to ConcurrentLinkedQueue and a Consumer Thread will poll queue for BufferedImage object to saving them in file. I could consume them by repeated polling(while loop), but I don't know how to consume them using notify() and wait(). I have tried using wait() and notify in smaller programs, but couldn't implement it here.

I have the following code:

class StartPeriodicTask implements Runnable {
    public synchronized void run() {
        Robot robot = null;
        try {
            robot = new Robot();
        } catch (AWTException e1) {
            e1.printStackTrace();
        }
        Rectangle screenRect = new Rectangle(Toolkit.getDefaultToolkit()
                .getScreenSize());
        BufferedImage image = robot.createScreenCapture(screenRect);
        if(null!=queue.peek()){
            try {
                System.out.println("Empty queue, so waiting....");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            queue.add(image);
            notify();
        }
    }
}

public class ImageConsumer implements Runnable {
        @Override
        public synchronized void run() {
            while (true) {
                BufferedImage bufferedImage = null;
                if(null==queue.peek()){
                    try {
                        //Empty queue, so waiting....
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }else{
                    bufferedImage = queue.poll();
                    notify();
                }
                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                    imageFile.getParentFile().mkdirs();
                }
                    try {
                        ImageIO.write(bufferedImage, extension, imageFile);
                        //Image saved
                    catch (IOException e) {
                        tracer.severe("IOException occurred. Image is not saved to file!");
                    }
                }
            }

Previously I had a repeated polling to check for existence of BufferedImage Object. Now I have changed run method as synchronised and tried to implement wait() and notify(). Am I doing correct? Please help. Thanks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

谁与争疯 2025-01-03 00:11:27

您为作业使用了错误的队列。 ConcurrentLinkedQueue 是一个非阻塞队列,这意味着没有生产者消费者语义。如果您只是做一名读者和一名作家,请查看 SynchronousQueue

简而言之,您的代码可以这样重写,

BlockingQueue<?> queue = new SynchrnousQueue<?>();
class StartPeriodicTask implements Runnable {
    public void run() {
        Robot robot = null;
        try {
            robot = new Robot();
        } catch (AWTException e1) {
            e1.printStackTrace();
        }
        Rectangle screenRect = new Rectangle(Toolkit.getDefaultToolkit()
                .getScreenSize());
        BufferedImage image = robot.createScreenCapture(screenRect);
        queue.offer(image); //1
}
public class ImageConsumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                BufferedImage bufferedImage = queue.poll(); //2

                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                    imageFile.getParentFile().mkdirs();
                }
                    try {
                        ImageIO.write(bufferedImage, extension, imageFile);
                        //Image saved
                    catch (IOException e) {
                        tracer.severe("IOException occurred. Image is not saved to file!");
                    }
            }

就是这样。

让我解释一下。在第 //1 行,生成线程会将图像“放置”到队列中。我引用 place 是因为 SynchronousQueue 没有深度。实际发生的情况是线程告诉队列“如果有任何线程请求此队列中的元素,则给它该线程并让我继续。如果没有,我将等到另一个线程准备好”

行//2是类似于 1,其中消费线程只是等待,直到线程提供。这对于单读者单作者来说效果很好

You are using the wrong Queue for the job. The ConcurrentLinkedQueue is a non-blocking Queue which means that there is no producer consumer semantics. If you are just doing one reader and one writer take a look at SynchronousQueue

Simply put your code can be re-written as such

BlockingQueue<?> queue = new SynchrnousQueue<?>();
class StartPeriodicTask implements Runnable {
    public void run() {
        Robot robot = null;
        try {
            robot = new Robot();
        } catch (AWTException e1) {
            e1.printStackTrace();
        }
        Rectangle screenRect = new Rectangle(Toolkit.getDefaultToolkit()
                .getScreenSize());
        BufferedImage image = robot.createScreenCapture(screenRect);
        queue.offer(image); //1
}
public class ImageConsumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                BufferedImage bufferedImage = queue.poll(); //2

                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                    imageFile.getParentFile().mkdirs();
                }
                    try {
                        ImageIO.write(bufferedImage, extension, imageFile);
                        //Image saved
                    catch (IOException e) {
                        tracer.severe("IOException occurred. Image is not saved to file!");
                    }
            }

That's really it.

Let me explain. At line //1 the producing thread will 'place' the image on the queue. I quotes place because a SynchrnousQueue has no depth. What actually happens is the thread tells the queue "If there are any threads asking for an element from this queue then give it the that thread and let me continue. If not I'll wait until another thread is ready"

Line //2 is similar to 1 where the consuming thread just waits until a thread is offering. This works great with a single-reader single-writer

年少掌心 2025-01-03 00:11:27

第一个问题是生产者中不必要的等待:

    if(null!=queue.peek()){ // You are the producer, you don't care if the queue is empty
        try {
            System.out.println("Empty queue, so waiting....");
            wait(); // This puts you to bed, your waiting and so is your consumer
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }else{
        queue.add(image);
        notify();
    }

这就是您所需要的:

        queue.add(image);
        notify();

下一个问题是消费者中不必要的 notify 。它在那时产生对其处理的控制,我相信您打算将其作为让生产者继续运行的一种方式,但当然您的代码永远不会达到这一点。所以这个:

            }else{
                bufferedImage = queue.poll();
                notify();
            }
            File imageFile = getFile();
            if (!imageFile.getParentFile().exists()) {
                imageFile.getParentFile().mkdirs();
            }
                try {
                    ImageIO.write(bufferedImage, extension, imageFile);
                    //Image saved
                catch (IOException e) {
                    tracer.severe("IOException occurred. Image is not saved to file!");
                }
            }

应该看起来更像这样:

            }else{
                bufferedImage = queue.poll();

                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                   imageFile.getParentFile().mkdirs();
                }

                try {
                    ImageIO.write(bufferedImage, extension, imageFile);
                    //Image saved
                catch (IOException e) {
                    tracer.severe("IOException occurred. Image is not saved to file!");
                }
            }

The first problem is the unnecessary wait that you have in your producer:

    if(null!=queue.peek()){ // You are the producer, you don't care if the queue is empty
        try {
            System.out.println("Empty queue, so waiting....");
            wait(); // This puts you to bed, your waiting and so is your consumer
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }else{
        queue.add(image);
        notify();
    }

This is all you should need:

        queue.add(image);
        notify();

The next issue is the unnecessary notify in your consumer. It yields control of its processing at that point, which I believe you intended as a way to get your producer going, but of course your code never gets to that point. So this:

            }else{
                bufferedImage = queue.poll();
                notify();
            }
            File imageFile = getFile();
            if (!imageFile.getParentFile().exists()) {
                imageFile.getParentFile().mkdirs();
            }
                try {
                    ImageIO.write(bufferedImage, extension, imageFile);
                    //Image saved
                catch (IOException e) {
                    tracer.severe("IOException occurred. Image is not saved to file!");
                }
            }

Should look more like this:

            }else{
                bufferedImage = queue.poll();

                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                   imageFile.getParentFile().mkdirs();
                }

                try {
                    ImageIO.write(bufferedImage, extension, imageFile);
                    //Image saved
                catch (IOException e) {
                    tracer.severe("IOException occurred. Image is not saved to file!");
                }
            }
待天淡蓝洁白时 2025-01-03 00:11:27

一旦 java.util.concurrent 库进入 JDK1.5,就不再需要编写自己的等待/通知逻辑了。在 2012 年,如果您正在执行自己的等待/通知,那么您工作得太辛苦了,应该强烈考虑经过验证的、真正的 java.util.concurrent 等效项。

话虽这么说,我相信轮询是内置 java.util.concurrent.ConcurrentLinkedQueue 背后的想法。换句话说,只要它是 !isEmpty(),消费者就坐在自己的 Thread 和 ConcurrentLinkedQue 中的 .poll() 项中。我见过的大多数实现都会在 !isEmpty() 测试之间进行某种一秒钟的睡眠,但我认为这实际上没有必要。另外,请注意 Vint 人员对我的答案的评论,.poll() 可能会返回 null。考虑 java.util.AbstractQueue 的替代实现,它可能具有更接近您正在寻找的阻塞行为。

这家伙有一个简单的例子: http://www.informit .com/articles/article.aspx?p=1339471&seqNum=4

最后,获取 Goetz 的书《Java Concurrency In Practice》并读它。我几乎可以肯定它有一个方法可以用来替换您自己的等待/通知。

Once the java.util.concurrent library came into the JDK1.5, the need to write your own wait/notify logic went right out the door. In 2012, if you are doing your own wait/notify, you are working too hard and should strongly consider the tried and true java.util.concurrent equivalents.

That being said, I believe polling is the idea behind the built in java.util.concurrent.ConcurrentLinkedQueue. In other words, the consumers sit in their own Thread and .poll() items from the ConcurrentLinkedQue as long as it is !isEmpty(). Most implementations that I've seen throw some sort of a one second sleep between tests of the !isEmpty(), but I don't think that is actually necessary. Also, pay note to the Vint guy's comment on my answer, .poll() may return null. Consider alternative implementations of java.util.AbstractQueue that may have blocking behavior closer to what you are looking for.

This guy's got a simple example: http://www.informit.com/articles/article.aspx?p=1339471&seqNum=4

Finally, get the Goetz book "Java Concurrency In Practice" and read it. I'm almost sure it has a recipe for what to use to replace your own home-grown wait/notifys.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文