拥有资源的生产者-消费者
我正在尝试使用一组资源来实现生产者/消费者模式,因此每个线程都有一个与之关联的资源。例如,我可能有一个任务队列,其中每个任务都需要一个 StreamWriter 来写入其结果。每个任务还必须有参数传递给它。
我从 Joseph Albahari 的实现开始(我的修改版本见下文)。
我将 Action
队列替换为 Action
队列,其中 T
是资源,并将与线程关联的资源传递给操作
。但是,这给我留下了如何将参数传递给 Action
的问题。显然,Action
必须替换为委托,但这留下了当任务排队时(从 ProducerConsumerQueue
类外部)如何传递参数的问题。关于如何做到这一点有什么想法吗?
class ProducerConsumerQueue<T>
{
readonly object _locker = new object();
Thread[] _workers;
Queue<Action<T>> _itemQ = new Queue<Action<T>>();
public ProducerConsumerQueue(T[] resources)
{
_workers = new Thread[resources.Length];
// Create and start a separate thread for each worker
for (int i = 0; i < resources.Length; i++)
{
Thread thread = new Thread(() => Consume(resources[i]));
thread.SetApartmentState(ApartmentState.STA);
_workers[i] = thread;
_workers[i].Start();
}
}
public void Shutdown(bool waitForWorkers)
{
// Enqueue one null item per worker to make each exit.
foreach (Thread worker in _workers)
EnqueueItem(null);
// Wait for workers to finish
if (waitForWorkers)
foreach (Thread worker in _workers)
worker.Join();
}
public void EnqueueItem(Action<T> item)
{
lock (_locker)
{
_itemQ.Enqueue(item); // We must pulse because we're
Monitor.Pulse(_locker); // changing a blocking condition.
}
}
void Consume(T parameter)
{
while (true) // Keep consuming until
{ // told otherwise.
Action<T> item;
lock (_locker)
{
while (_itemQ.Count == 0) Monitor.Wait(_locker);
item = _itemQ.Dequeue();
}
if (item == null) return; // This signals our exit.
item(parameter); // Execute item.
}
}
}
I'm trying to implement the producer/consumer pattern with a set of resources, so each thread has one resource associated with it. For example, I may have a queue of tasks where each task requires a StreamWriter
to write its result. Each task also has to have parameters passed to it.
I started with Joseph Albahari's implementation (see below for my modified version).
I replaced the queue of Action
with a queue of Action<T>
where T
is the resource, and pass the resource associated with the thread to the Action
. But, this leaves me with the problem of how to pass parameters to the Action
. Obviously, the Action
must be replaced with a delegate but this leaves the problem of how to pass parameters when tasks are enqueued (from outside the ProducerConsumerQueue
class). Any ideas on how to do this?
class ProducerConsumerQueue<T>
{
readonly object _locker = new object();
Thread[] _workers;
Queue<Action<T>> _itemQ = new Queue<Action<T>>();
public ProducerConsumerQueue(T[] resources)
{
_workers = new Thread[resources.Length];
// Create and start a separate thread for each worker
for (int i = 0; i < resources.Length; i++)
{
Thread thread = new Thread(() => Consume(resources[i]));
thread.SetApartmentState(ApartmentState.STA);
_workers[i] = thread;
_workers[i].Start();
}
}
public void Shutdown(bool waitForWorkers)
{
// Enqueue one null item per worker to make each exit.
foreach (Thread worker in _workers)
EnqueueItem(null);
// Wait for workers to finish
if (waitForWorkers)
foreach (Thread worker in _workers)
worker.Join();
}
public void EnqueueItem(Action<T> item)
{
lock (_locker)
{
_itemQ.Enqueue(item); // We must pulse because we're
Monitor.Pulse(_locker); // changing a blocking condition.
}
}
void Consume(T parameter)
{
while (true) // Keep consuming until
{ // told otherwise.
Action<T> item;
lock (_locker)
{
while (_itemQ.Count == 0) Monitor.Wait(_locker);
item = _itemQ.Dequeue();
}
if (item == null) return; // This signals our exit.
item(parameter); // Execute item.
}
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
ProducerConsumerQueue
中的类型T
不一定是您的资源,它可以是包含您的资源的复合类型。对于 .NET4,最简单的方法是使用Tuple
。生产/消费者队列只是吃掉并吐出T
,因此在您的Action
中,您可以仅使用属性来获取资源和参数。如果您使用Tuple
,您将使用Item1
获取资源,使用Item2
获取参数。如果您不使用 .NET4,则过程类似,但您只需创建自己的类:
事实上,使其通用可能对您的情况进行了过度设计。您可以将 T 定义为您想要的类型。
另外,作为参考,.NET4 中包含可能适用于您的用例的多线程处理新方法,例如并发队列和并行任务库。它们还可以与信号量等传统方法结合使用。
编辑:
继续使用这种方法,这里有一个小示例类,演示如何使用:
这是 Processor 类:
现在我们可以像这样使用该类:
同时调度的任务不会超过三个,每个任务都有自己的 StreamWriter 资源这是被回收的。
The type
T
inProducerConsumerQueue<T>
doesn't have to be your resource it can be a composite type that contains your resource. With .NET4 the easiest way to do this is withTuple<StreamWriter, YourParameterType>
. The produce/consumer queue just eats and spits outT
so in yourAction<T>
you can just use properties to get the resource and the parameter. If you are usingTuple
you would useItem1
to get the resource andItem2
to get the parameter.If you are not use .NET4, the process is similar but you just create your own class:
In fact, making it generic may be overdesigning for your situation. You can just define T to be the type you want it to be.
Also, for reference, there are new ways to do multi-threading included in .NET4 that may applicable to your use case such as concurrent queues and the Parallel Task Library. They can also be combined with traditional approaches such as semaphores.
Edit:
Continuing with this approach, here is a small sample class that demonstrates using:
Here is the
Processor
class:and now we can use the class like this:
and no more than three tasks will be scheduled at the same time, each with their own
StreamWriter
resource which is recycled.