C# 生产者/消费者

发布于 2024-08-10 10:55:46 字数 2491 浏览 8 评论 0原文

我最近遇到了生产者/消费者模式的 C# 实现。它非常简单而且(至少对我来说)非常优雅。

它似乎是在 2006 年左右设计的,所以我想知道这个实现是否
- 安全
- 仍然适用

代码如下(原始代码引用于 http://bytes .com/topic/net/answers/575276-生产者-消费者#post2251375

using System;  
using System.Collections;  
using System.Threading;

public class Test
{  
    static ProducerConsumer queue;

    static void Main()
    {
        queue = new ProducerConsumer();
        new Thread(new ThreadStart(ConsumerJob)).Start();

        Random rng = new Random(0);
        for (int i=0; i < 10; i++)
        {
            Console.WriteLine ("Producing {0}", i);
            queue.Produce(i);
            Thread.Sleep(rng.Next(1000));
        }
    }

    static void ConsumerJob()
    {
        // Make sure we get a different random seed from the
        // first thread
        Random rng = new Random(1);
        // We happen to know we've only got 10 
        // items to receive
        for (int i=0; i < 10; i++)
        {
            object o = queue.Consume();
            Console.WriteLine ("\t\t\t\tConsuming {0}", o);
            Thread.Sleep(rng.Next(1000));
        }
    }
}

public class ProducerConsumer
{
    readonly object listLock = new object();
    Queue queue = new Queue();

    public void Produce(object o)
    {
        lock (listLock)
        {
            queue.Enqueue(o);

            // We always need to pulse, even if the queue wasn't
            // empty before. Otherwise, if we add several items
            // in quick succession, we may only pulse once, waking
            // a single thread up, even if there are multiple threads
            // waiting for items.            
            Monitor.Pulse(listLock);
        }
    }

    public object Consume()
    {
        lock (listLock)
        {
            // If the queue is empty, wait for an item to be added
            // Note that this is a while loop, as we may be pulsed
            // but not wake up before another thread has come in and
            // consumed the newly added object. In that case, we'll
            // have to wait for another pulse.
            while (queue.Count==0)
            {
                // This releases listLock, only reacquiring it
                // after being woken up by a call to Pulse
                Monitor.Wait(listLock);
            }
            return queue.Dequeue();
        }
    }
}

i've recently come across a producer/consumer pattern c# implementation. it's very simple and (for me at least) very elegant.

it seems to have been devised around 2006, so i was wondering if this implementation is
- safe
- still applicable

Code is below (original code was referenced at http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375)

using System;  
using System.Collections;  
using System.Threading;

public class Test
{  
    static ProducerConsumer queue;

    static void Main()
    {
        queue = new ProducerConsumer();
        new Thread(new ThreadStart(ConsumerJob)).Start();

        Random rng = new Random(0);
        for (int i=0; i < 10; i++)
        {
            Console.WriteLine ("Producing {0}", i);
            queue.Produce(i);
            Thread.Sleep(rng.Next(1000));
        }
    }

    static void ConsumerJob()
    {
        // Make sure we get a different random seed from the
        // first thread
        Random rng = new Random(1);
        // We happen to know we've only got 10 
        // items to receive
        for (int i=0; i < 10; i++)
        {
            object o = queue.Consume();
            Console.WriteLine ("\t\t\t\tConsuming {0}", o);
            Thread.Sleep(rng.Next(1000));
        }
    }
}

public class ProducerConsumer
{
    readonly object listLock = new object();
    Queue queue = new Queue();

    public void Produce(object o)
    {
        lock (listLock)
        {
            queue.Enqueue(o);

            // We always need to pulse, even if the queue wasn't
            // empty before. Otherwise, if we add several items
            // in quick succession, we may only pulse once, waking
            // a single thread up, even if there are multiple threads
            // waiting for items.            
            Monitor.Pulse(listLock);
        }
    }

    public object Consume()
    {
        lock (listLock)
        {
            // If the queue is empty, wait for an item to be added
            // Note that this is a while loop, as we may be pulsed
            // but not wake up before another thread has come in and
            // consumed the newly added object. In that case, we'll
            // have to wait for another pulse.
            while (queue.Count==0)
            {
                // This releases listLock, only reacquiring it
                // after being woken up by a call to Pulse
                Monitor.Wait(listLock);
            }
            return queue.Dequeue();
        }
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(7

救赎№ 2024-08-17 10:55:46

该代码比这更旧 - 我在 .NET 2.0 发布之前编写了它。生产者/消费者队列的概念比这方式要古老:)

是的,据我所知,该代码是安全的 - 但它有一些缺陷:

  • 这是非通用的。现代版本肯定是通用的。
  • 它无法停止队列。停止队列(以便所有消费者线程退出)的一种简单方法是拥有一个可以放入队列中的“停止工作”令牌。然后,您可以添加与线程一样多的令牌。或者,您可以有一个单独的标志来指示您想要停止。 (这允许其他线程在完成队列中的所有当前工作之前停止。)
  • 如果作业非常小,一次消耗一个作业可能不是最有效的做法。

老实说,代码背后的想法比代码本身更重要。

The code is older than that - I wrote it some time before .NET 2.0 came out. The concept of a producer/consumer queue is way older than that though :)

Yes, that code is safe as far as I'm aware - but it has some deficiencies:

  • It's non-generic. A modern version would certainly be generic.
  • It has no way of stopping the queue. One simple way of stopping the queue (so that all the consumer threads retire) is to have a "stop work" token which can be put into the queue. You then add as many tokens as you have threads. Alternatively, you have a separate flag to indicate that you want to stop. (This allows the other threads to stop before finishing all the current work in the queue.)
  • If the jobs are very small, consuming a single job at a time may not be the most efficient thing to do.

The ideas behind the code are more important than the code itself, to be honest.

后来的我们 2024-08-17 10:55:46

您可以执行类似以下代码片段的操作。它是通用的,并且有一个方法将空值(或您想要使用的任何标志)排队以告诉工作线程退出。

代码取自此处:http://www.albahari.com/threading/part4。 aspx#_Wait_and_Pulse

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication1
{

    public class TaskQueue<T> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<T> taskQ = new Queue<T>();

        public TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }

        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }

        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                Console.Write(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}

You could do something like the following code snippet. It's generic and has a method for enqueue-ing nulls (or whatever flag you'd like to use) to tell the worker threads to exit.

The code is taken from here: http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication1
{

    public class TaskQueue<T> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<T> taskQ = new Queue<T>();

        public TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }

        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }

        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                Console.Write(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}
℡寂寞咖啡 2024-08-17 10:55:46

那天,我从上面的代码和 文章系列它来自。所以正如 Jon 所说,它有很大的价值,而且确实安全、适用。

但是,从 .NET 4 开始,框架中有一个生产者-消费者队列实现。我自己才刚刚找到它,但到目前为止它已经满足了我需要的一切。

Back in the day I learned how Monitor.Wait/Pulse works (and a lot about threads in general) from the above piece of code and the article series it is from. So as Jon says, it has a lot of value to it and is indeed safe and applicable.

However, as of .NET 4, there is a producer-consumer queue implementation in the framework. I only just found it myself but up to this point it does everything I need.

何以笙箫默 2024-08-17 10:55:46

如今,使用命名空间 System.Threading.Tasks.Dataflow 可以使用更现代的选项。它是异步/等待友好的并且更加通用。

更多信息请参见此处 如何:实现生产者-消费者数据流模式

它从 .Net Core 开始包含在内,对于较旧的 .Net,您可能需要安装与命名空间同名的包。

我知道这个问题很旧,但这是 Google 中第一次匹配我的请求,所以我决定更新这个主题。

These days a more modern option is available using the namespace System.Threading.Tasks.Dataflow. It's async/await friendly and much more versatile.

More info here How to: Implement a producer-consumer dataflow pattern

It's included starting from .Net Core, for older .Nets you may need to install a package with the same name as the namespace.

I know the question is old, but it's the first match in Google for my request, so I decided to update the topic.

菩提树下叶撕阳。 2024-08-17 10:55:46

在 C# 中实现生产者/消费者模式的一种现代且简单的方法是使用 System.Threading.Channels。它是异步的,并使用 ValueTask 来减少内存分配。这是一个例子:

public class ProducerConsumer<T>
{
    protected readonly Channel<T> JobChannel = Channel.CreateUnbounded<T>();

    public IAsyncEnumerable<T> GetAllAsync()
    {
        return JobChannel.Reader.ReadAllAsync();
    }

    public async ValueTask AddAsync(T job)
    {
        await JobChannel.Writer.WriteAsync(job);
    }

    public async ValueTask AddAsync(IEnumerable<T> jobs)
    {
        foreach (var job in jobs)
        {
            await JobChannel.Writer.WriteAsync(job);
        }
    }
}

A modern and simple way to implement the producer/consumer pattern in C# is to use System.Threading.Channels. It's asynchronous and uses ValueTask's to decrease memory allocations. Here is an example:

public class ProducerConsumer<T>
{
    protected readonly Channel<T> JobChannel = Channel.CreateUnbounded<T>();

    public IAsyncEnumerable<T> GetAllAsync()
    {
        return JobChannel.Reader.ReadAllAsync();
    }

    public async ValueTask AddAsync(T job)
    {
        await JobChannel.Writer.WriteAsync(job);
    }

    public async ValueTask AddAsync(IEnumerable<T> jobs)
    {
        foreach (var job in jobs)
        {
            await JobChannel.Writer.WriteAsync(job);
        }
    }
}
GRAY°灰色天空 2024-08-17 10:55:46

警告:如果您阅读了评论,您就会明白我的答案是错误的:)

您的代码中可能存在死锁

想象一下以下情况,为了清楚起见,我使用了单线程方法,但应该很容易通过睡眠转换为多线程:

// We create some actions...
object locker = new object();

Action action1 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action1");
    }
};

Action action2 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action2");
    }
};

// ... (stuff happens, etc.)

// Imagine both actions were running
// and there's 0 items in the queue

// And now the producer kicks in...
lock (locker)
{
    // This would add a job to the queue

    Console.WriteLine("Pulse now!");
    System.Threading.Monitor.Pulse(locker);
}

// ... (more stuff)
// and the actions finish now!

Console.WriteLine("Consume action!");
action1(); // Oops... they're locked...
action2();

如果这没有任何意义,请告诉我。

如果这一点得到证实,那么您问题的答案就是“不,这不安全”;)
我希望这有帮助。

Warning: If you read the comments, you'll understand my answer is wrong :)

There's a possible deadlock in your code.

Imagine the following case, for clarity, I used a single-thread approach but should be easy to convert to multi-thread with sleep:

// We create some actions...
object locker = new object();

Action action1 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action1");
    }
};

Action action2 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action2");
    }
};

// ... (stuff happens, etc.)

// Imagine both actions were running
// and there's 0 items in the queue

// And now the producer kicks in...
lock (locker)
{
    // This would add a job to the queue

    Console.WriteLine("Pulse now!");
    System.Threading.Monitor.Pulse(locker);
}

// ... (more stuff)
// and the actions finish now!

Console.WriteLine("Consume action!");
action1(); // Oops... they're locked...
action2();

Please do let me know if this doesn't make any sense.

If this is confirmed, then the answer to your question is, "no, it isn't safe" ;)
I hope this helps.

季末如歌 2024-08-17 10:55:46
public class ProducerConsumerProblem
    {
        private int n;
        object obj = new object();
        public ProducerConsumerProblem(int n)
        {
            this.n = n;
        }

        public void Producer()
        {

            for (int i = 0; i < n; i++)
            {
                lock (obj)
                {
                    Console.Write("Producer =>");
                    System.Threading.Monitor.Pulse(obj);
                    System.Threading.Thread.Sleep(1);
                    System.Threading.Monitor.Wait(obj);
                }
            }
        }

        public void Consumer()
        {
            lock (obj)
            {
                for (int i = 0; i < n; i++)
                {
                    System.Threading.Monitor.Wait(obj, 10);
                    Console.Write("<= Consumer");
                    System.Threading.Monitor.Pulse(obj);
                    Console.WriteLine();
                }
            }
        }
    }

    public class Program
    {
        static void Main(string[] args)
        {
            ProducerConsumerProblem f = new ProducerConsumerProblem(10);
            System.Threading.Thread t1 = new System.Threading.Thread(() => f.Producer());
            System.Threading.Thread t2 = new System.Threading.Thread(() => f.Consumer());
            t1.IsBackground = true;
            t2.IsBackground = true;
            t1.Start();
            t2.Start();
            Console.ReadLine();
        }
    }

输出

Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
public class ProducerConsumerProblem
    {
        private int n;
        object obj = new object();
        public ProducerConsumerProblem(int n)
        {
            this.n = n;
        }

        public void Producer()
        {

            for (int i = 0; i < n; i++)
            {
                lock (obj)
                {
                    Console.Write("Producer =>");
                    System.Threading.Monitor.Pulse(obj);
                    System.Threading.Thread.Sleep(1);
                    System.Threading.Monitor.Wait(obj);
                }
            }
        }

        public void Consumer()
        {
            lock (obj)
            {
                for (int i = 0; i < n; i++)
                {
                    System.Threading.Monitor.Wait(obj, 10);
                    Console.Write("<= Consumer");
                    System.Threading.Monitor.Pulse(obj);
                    Console.WriteLine();
                }
            }
        }
    }

    public class Program
    {
        static void Main(string[] args)
        {
            ProducerConsumerProblem f = new ProducerConsumerProblem(10);
            System.Threading.Thread t1 = new System.Threading.Thread(() => f.Producer());
            System.Threading.Thread t2 = new System.Threading.Thread(() => f.Consumer());
            t1.IsBackground = true;
            t2.IsBackground = true;
            t1.Start();
            t2.Start();
            Console.ReadLine();
        }
    }

output

Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文