我的生产者-消费者队列设计有什么问题?
我从 此处 开始 C# 代码示例。我尝试对其进行调整有几个原因:1)在我的场景中,所有任务都将在消费者启动之前预先放入队列中,2)我想将工作人员抽象为一个单独的类,而不是让WorkerQueue
类中的原始 Thread
成员。
但我的队列似乎没有自行处理,它只是挂起,当我中断 Visual Studio 时,它卡在 WorkerThread
的 _th.Join()
行上 # 1.另外,有没有更好的方法来组织这个?公开 WaitOne()
和 Join()
方法似乎是错误的,但我想不出一种合适的方法来让 WorkerThread
与队列交互。
另外,顺便说一句 - 如果我在 using
块的顶部调用 q.Start(#)
,则只有某些线程会启动(例如线程 1、2 ,以及 8 个进程处理每个任务)。这是为什么呢?这是某种竞争条件,还是我做错了什么?
using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.Threading;
using System.Linq;
namespace QueueTest
{
class Program
{
static void Main(string[] args)
{
using (WorkQueue q = new WorkQueue())
{
q.Finished += new Action(delegate { Console.WriteLine("All jobs finished"); });
Random r = new Random();
foreach (int i in Enumerable.Range(1, 10))
q.Enqueue(r.Next(100, 500));
Console.WriteLine("All jobs queued");
q.Start(8);
}
}
}
class WorkQueue : IDisposable
{
private Queue<int> _jobs = new Queue<int>();
private int _job_count;
private EventWaitHandle _wh = new AutoResetEvent(false);
private object _lock = new object();
private List<WorkerThread> _th;
public event Action Finished;
public WorkQueue()
{
}
public void Start(int num_threads)
{
_job_count = _jobs.Count;
_th = new List<WorkerThread>(num_threads);
foreach (int i in Enumerable.Range(1, num_threads))
{
_th.Add(new WorkerThread(i, this));
_th[_th.Count - 1].JobFinished += new Action<int>(WorkQueue_JobFinished);
}
}
void WorkQueue_JobFinished(int obj)
{
lock (_lock)
{
_job_count--;
if (_job_count == 0 && Finished != null)
Finished();
}
}
public void Enqueue(int job)
{
lock (_lock)
_jobs.Enqueue(job);
_wh.Set();
}
public void Dispose()
{
Enqueue(Int32.MinValue);
_th.ForEach(th => th.Join());
_wh.Close();
}
public int GetNextJob()
{
lock (_lock)
{
if (_jobs.Count > 0)
return _jobs.Dequeue();
else
return Int32.MinValue;
}
}
public void WaitOne()
{
_wh.WaitOne();
}
}
class WorkerThread
{
private Thread _th;
private WorkQueue _q;
private int _i;
public event Action<int> JobFinished;
public WorkerThread(int i, WorkQueue q)
{
_i = i;
_q = q;
_th = new Thread(DoWork);
_th.Start();
}
public void Join()
{
_th.Join();
}
private void DoWork()
{
while (true)
{
int job = _q.GetNextJob();
if (job != Int32.MinValue)
{
Console.WriteLine("Thread {0} Got job {1}", _i, job);
Thread.Sleep(job * 10); // in reality would to actual work here
if (JobFinished != null)
JobFinished(job);
}
else
{
Console.WriteLine("Thread {0} no job available", _i);
_q.WaitOne();
}
}
}
}
}
I'm starting with the C# code example here. I'm trying to adapt it for a couple reasons: 1) in my scenario, all tasks will be put in the queue up-front before consumers will start, and 2) I wanted to abstract the worker into a separate class instead of having raw Thread
members within the WorkerQueue
class.
My queue doesn't seem to dispose of itself though, it just hangs, and when I break in Visual Studio it's stuck on the _th.Join()
line for WorkerThread
#1. Also, is there a better way to organize this? Something about exposing the WaitOne()
and Join()
methods seems wrong, but I couldn't think of an appropriate way to let the WorkerThread
interact with the queue.
Also, an aside - if I call q.Start(#)
at the top of the using
block, only some of the threads every kick in (e.g. threads 1, 2, and 8 process every task). Why is this? Is it a race condition of some sort, or am I doing something wrong?
using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.Threading;
using System.Linq;
namespace QueueTest
{
class Program
{
static void Main(string[] args)
{
using (WorkQueue q = new WorkQueue())
{
q.Finished += new Action(delegate { Console.WriteLine("All jobs finished"); });
Random r = new Random();
foreach (int i in Enumerable.Range(1, 10))
q.Enqueue(r.Next(100, 500));
Console.WriteLine("All jobs queued");
q.Start(8);
}
}
}
class WorkQueue : IDisposable
{
private Queue<int> _jobs = new Queue<int>();
private int _job_count;
private EventWaitHandle _wh = new AutoResetEvent(false);
private object _lock = new object();
private List<WorkerThread> _th;
public event Action Finished;
public WorkQueue()
{
}
public void Start(int num_threads)
{
_job_count = _jobs.Count;
_th = new List<WorkerThread>(num_threads);
foreach (int i in Enumerable.Range(1, num_threads))
{
_th.Add(new WorkerThread(i, this));
_th[_th.Count - 1].JobFinished += new Action<int>(WorkQueue_JobFinished);
}
}
void WorkQueue_JobFinished(int obj)
{
lock (_lock)
{
_job_count--;
if (_job_count == 0 && Finished != null)
Finished();
}
}
public void Enqueue(int job)
{
lock (_lock)
_jobs.Enqueue(job);
_wh.Set();
}
public void Dispose()
{
Enqueue(Int32.MinValue);
_th.ForEach(th => th.Join());
_wh.Close();
}
public int GetNextJob()
{
lock (_lock)
{
if (_jobs.Count > 0)
return _jobs.Dequeue();
else
return Int32.MinValue;
}
}
public void WaitOne()
{
_wh.WaitOne();
}
}
class WorkerThread
{
private Thread _th;
private WorkQueue _q;
private int _i;
public event Action<int> JobFinished;
public WorkerThread(int i, WorkQueue q)
{
_i = i;
_q = q;
_th = new Thread(DoWork);
_th.Start();
}
public void Join()
{
_th.Join();
}
private void DoWork()
{
while (true)
{
int job = _q.GetNextJob();
if (job != Int32.MinValue)
{
Console.WriteLine("Thread {0} Got job {1}", _i, job);
Thread.Sleep(job * 10); // in reality would to actual work here
if (JobFinished != null)
JobFinished(job);
}
else
{
Console.WriteLine("Thread {0} no job available", _i);
_q.WaitOne();
}
}
}
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
工作线程都阻塞在 DoWork() 中的 _q.WaitOne() 调用上。调用线程的Join()方法会死锁,线程永远不会退出。您需要添加一种机制来向工作线程发出退出信号。在工作线程中使用 WaitAny 进行测试的 ManualResetEvent 将完成工作。
一个调试技巧:熟悉“调试”+“Windows”+“线程”窗口。它允许您在线程之间切换并查看它们的调用堆栈。您自己很快就会发现这个问题。
The worker threads are all blocking on the _q.WaitOne() call in DoWork(). Calling the thread's Join() method will deadlock, the threads never exit. You'll need to add a mechanism to signal to worker thread to exit. A ManualResetEvent, tested with WaitAny in the worker, will get the job done.
One debugging tip: get familiar with the Debug + Windows + Threads window. It lets you switch between threads and look at their call stacks. You'd have quickly found this problem by yourself.
您在
DoWork
末尾执行WaitOne()
,但在线程开始运行后从未设置它。请注意,在“成功”
WaitOne
后,AutoResetEvent
将返回到未设置状态You do a
WaitOne()
at the end ofDoWork
but you never set it after the threads start running.Note that
AutoResetEvent
will go back to not set state after a 'successful'WaitOne
DoWork 方法中的循环永远不会结束。这将导致线程始终处于繁忙状态,并且该 thread.Join() 将永远阻塞,等待其完成。
你有一个 WaitOne,但我认为没有必要,除非你有一个原因希望你的线程池在工作完成后继续存在:
如果你希望线程继续存在,这样你就不必在工作完成时重新分配更多线程调用 WorkQueue.Start 后,您必须使用 AutoResetEvent 执行更详细的操作。
Your loop in your DoWork method never finishes. This will cause the thread to always be busy and this thread.Join() will block forever, waiting for it to complete.
You have a WaitOne, but I don't think it's necessary unless there is a reason you want your threadpool to stick around after your work is complete:
If you want the threads to stick around so you don't have to realloc more threads when WorkQueue.Start is called, you'd have to do something more elaborate with the AutoResetEvent.
您的主要问题是其他答案中描述的确定性僵局。
然而,正确的处理方法不是解决死锁,而是完全消除该事件。
生产者-消费者模型的整体思想是客户端同时将元素入队和出队,这就是需要同步机制的原因。如果您预先将所有元素入队,然后仅同时出队,则只需要出队锁,因为“事件”用于让“消费者”等待新元素入队;这在你的情况下不会发生(根据你的描述)。
此外,“单一责任”设计原则建议线程代码应与“阻塞队列”代码分开。让“阻塞队列”成为一个自己的类,然后在线程管理类中使用它。
Your main problem is the deterministic deadlock described in the other answers.
The correct way to handle it, though, is not to fix the deadlock, but to eliminate the Event altogether.
The whole idea of the Producer-Consumer model is that the clients En-queue and De-queue elements concurrently, and that's why sync mechanisms are required. If you're enqueuing all of the elements beforehand and then only dequeue concurrently, you only need a lock on the dequeue, since the "Event" is used to let "Consumers" wait for new elements to be enqueued; this will not happen in your case (based on your description).
Also, the "single responsibility" design principle suggests that the threading code should be separated from the "Blocking Queue" code. Make the "Blocking Queue" a class of its own, then use it in your thread-management class.