拥有资源的生产者-消费者

发布于 2024-11-07 22:09:24 字数 2425 浏览 7 评论 0原文

我正在尝试使用一组资源来实现生产者/消费者模式,因此每个线程都有一个与之关联的资源。例如,我可能有一个任务队列,其中每个任务都需要一个 StreamWriter 来写入其结果。每个任务还​​必须有参数传递给它。

我从 Joseph Albahari 的实现开始(我的修改版本见下文)。

我将 Action 队列替换为 Action 队列,其中 T 是资源,并将与线程关联的资源传递给操作。但是,这给我留下了如何将参数传递给 Action 的问题。显然,Action 必须替换为委托,但这留下了当任务排队时(从 ProducerConsumerQueue 类外部)如何传递参数的问题。关于如何做到这一点有什么想法吗?

class ProducerConsumerQueue<T>
    {
        readonly object _locker = new object();            
        Thread[] _workers;
        Queue<Action<T>> _itemQ = new Queue<Action<T>>();

        public ProducerConsumerQueue(T[] resources)
        {
            _workers = new Thread[resources.Length];

            // Create and start a separate thread for each worker
            for (int i = 0; i < resources.Length; i++)
            {
                Thread thread = new Thread(() => Consume(resources[i]));
                thread.SetApartmentState(ApartmentState.STA);
                _workers[i] = thread;
                _workers[i].Start();
            }
        }        

        public void Shutdown(bool waitForWorkers)
        {
            // Enqueue one null item per worker to make each exit.
            foreach (Thread worker in _workers)
                EnqueueItem(null);

            // Wait for workers to finish
            if (waitForWorkers)
                foreach (Thread worker in _workers)
                    worker.Join();
        }

        public void EnqueueItem(Action<T> item)
        {
            lock (_locker)
            {
                _itemQ.Enqueue(item);           // We must pulse because we're
                Monitor.Pulse(_locker);         // changing a blocking condition.
            }
        }

        void Consume(T parameter)
        {
            while (true)                        // Keep consuming until
            {                                   // told otherwise.
                Action<T> item;
                lock (_locker)
                {
                    while (_itemQ.Count == 0) Monitor.Wait(_locker);
                    item = _itemQ.Dequeue();
                }
                if (item == null) return;         // This signals our exit.
                item(parameter);                           // Execute item.
            }
        }
    }

I'm trying to implement the producer/consumer pattern with a set of resources, so each thread has one resource associated with it. For example, I may have a queue of tasks where each task requires a StreamWriter to write its result. Each task also has to have parameters passed to it.

I started with Joseph Albahari's implementation (see below for my modified version).

I replaced the queue of Action with a queue of Action<T> where T is the resource, and pass the resource associated with the thread to the Action. But, this leaves me with the problem of how to pass parameters to the Action. Obviously, the Action must be replaced with a delegate but this leaves the problem of how to pass parameters when tasks are enqueued (from outside the ProducerConsumerQueue class). Any ideas on how to do this?

class ProducerConsumerQueue<T>
    {
        readonly object _locker = new object();            
        Thread[] _workers;
        Queue<Action<T>> _itemQ = new Queue<Action<T>>();

        public ProducerConsumerQueue(T[] resources)
        {
            _workers = new Thread[resources.Length];

            // Create and start a separate thread for each worker
            for (int i = 0; i < resources.Length; i++)
            {
                Thread thread = new Thread(() => Consume(resources[i]));
                thread.SetApartmentState(ApartmentState.STA);
                _workers[i] = thread;
                _workers[i].Start();
            }
        }        

        public void Shutdown(bool waitForWorkers)
        {
            // Enqueue one null item per worker to make each exit.
            foreach (Thread worker in _workers)
                EnqueueItem(null);

            // Wait for workers to finish
            if (waitForWorkers)
                foreach (Thread worker in _workers)
                    worker.Join();
        }

        public void EnqueueItem(Action<T> item)
        {
            lock (_locker)
            {
                _itemQ.Enqueue(item);           // We must pulse because we're
                Monitor.Pulse(_locker);         // changing a blocking condition.
            }
        }

        void Consume(T parameter)
        {
            while (true)                        // Keep consuming until
            {                                   // told otherwise.
                Action<T> item;
                lock (_locker)
                {
                    while (_itemQ.Count == 0) Monitor.Wait(_locker);
                    item = _itemQ.Dequeue();
                }
                if (item == null) return;         // This signals our exit.
                item(parameter);                           // Execute item.
            }
        }
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

无妨# 2024-11-14 22:09:24

ProducerConsumerQueue 中的类型 T 不一定是您的资源,它可以是包含您的资源的复合类型。对于 .NET4,最简单的方法是使用 Tuple。生产/消费者队列只是吃掉并吐出 T,因此在您的 Action 中,您可以仅使用属性来获取资源和参数。如果您使用 Tuple,您将使用 Item1 获取资源,使用 Item2 获取参数。

如果您不使用 .NET4,则过程类似,但您只需创建自己的类:

public class WorkItem<T>
{
    private StreamWriter resource;
    private T parameter;

    public WorkItem(StreamWriter resource, T parameter)
    {
        this.resource = resource;
        this.parameter = parameter;
    }

    public StreamWriter Resource { get { return resource; } }
    public T Parameter { get { return parameter; } }
}

事实上,使其通用可能对您的情况进行了过度设计。您可以将 T 定义为您想要的类型。

另外,作为参考,.NET4 中包含可能适用于您的用例的多线程处理新方法,例如并发队列和并行任务库。它们还可以与信号量等传统方法结合使用。

编辑:

继续使用这种方法,这里有一个小示例类,演示如何使用:

  • 信号量来控制对有限资源的访问
  • 并发队列来在线程之间安全地管理该资源
  • 使用任务并行库进行任务管理

这是 Processor 类:

public class Processor
{
    private const int count = 3;
    private ConcurrentQueue<StreamWriter> queue = new ConcurrentQueue<StreamWriter>();
    private Semaphore semaphore = new Semaphore(count, count);

    public Processor()
    {
        // Populate the resource queue.
        for (int i = 0; i < count; i++) queue.Enqueue(new StreamWriter("sample" + i));
    }

    public void Process(int parameter)
    {
        // Wait for one of our resources to become free.
        semaphore.WaitOne();
        StreamWriter resource;
        queue.TryDequeue(out resource);

        // Dispatch the work to a task.
        Task.Factory.StartNew(() => Process(resource, parameter));
    }

    private Random random = new Random();

    private void Process(StreamWriter resource, int parameter)
    {
        // Do work in background with resource.
        Thread.Sleep(random.Next(10) * 100);
        resource.WriteLine("Parameter = {0}", parameter);
        queue.Enqueue(resource);
        semaphore.Release();
    }
}

现在我们可以像这样使用该类:

var processor = new Processor();
for (int i = 0; i < 10; i++)
    processor.Process(i);

同时调度的任务不会超过三个,每个任务都有自己的 StreamWriter 资源这是被回收的。

The type T in ProducerConsumerQueue<T> doesn't have to be your resource it can be a composite type that contains your resource. With .NET4 the easiest way to do this is with Tuple<StreamWriter, YourParameterType>. The produce/consumer queue just eats and spits out T so in your Action<T> you can just use properties to get the resource and the parameter. If you are using Tuple you would use Item1 to get the resource and Item2 to get the parameter.

If you are not use .NET4, the process is similar but you just create your own class:

public class WorkItem<T>
{
    private StreamWriter resource;
    private T parameter;

    public WorkItem(StreamWriter resource, T parameter)
    {
        this.resource = resource;
        this.parameter = parameter;
    }

    public StreamWriter Resource { get { return resource; } }
    public T Parameter { get { return parameter; } }
}

In fact, making it generic may be overdesigning for your situation. You can just define T to be the type you want it to be.

Also, for reference, there are new ways to do multi-threading included in .NET4 that may applicable to your use case such as concurrent queues and the Parallel Task Library. They can also be combined with traditional approaches such as semaphores.

Edit:

Continuing with this approach, here is a small sample class that demonstrates using:

  • a semaphore to control access to a limited resource
  • a concurrent queue to manage that resource safely between threads
  • task management using the Task Parallel Library

Here is the Processor class:

public class Processor
{
    private const int count = 3;
    private ConcurrentQueue<StreamWriter> queue = new ConcurrentQueue<StreamWriter>();
    private Semaphore semaphore = new Semaphore(count, count);

    public Processor()
    {
        // Populate the resource queue.
        for (int i = 0; i < count; i++) queue.Enqueue(new StreamWriter("sample" + i));
    }

    public void Process(int parameter)
    {
        // Wait for one of our resources to become free.
        semaphore.WaitOne();
        StreamWriter resource;
        queue.TryDequeue(out resource);

        // Dispatch the work to a task.
        Task.Factory.StartNew(() => Process(resource, parameter));
    }

    private Random random = new Random();

    private void Process(StreamWriter resource, int parameter)
    {
        // Do work in background with resource.
        Thread.Sleep(random.Next(10) * 100);
        resource.WriteLine("Parameter = {0}", parameter);
        queue.Enqueue(resource);
        semaphore.Release();
    }
}

and now we can use the class like this:

var processor = new Processor();
for (int i = 0; i < 10; i++)
    processor.Process(i);

and no more than three tasks will be scheduled at the same time, each with their own StreamWriter resource which is recycled.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文