生产者消费者队列不处置

发布于 2024-09-26 17:53:09 字数 3318 浏览 1 评论 0原文

我已经构建了一个生产者消费者队列,它包装了 .net 4.0 的 ConcurrentQueue,并在生产(Enqueue)和消费(while(true) 线程之间使用 SlimManualResetEvent 信号发送。 队列看起来像:

public class ProducerConsumerQueue<T> : IDisposable, IProducerConsumerQueue<T>
{
    private bool _IsActive=true;

    public int Count
    {
        get
        {
            return this._workerQueue.Count;
        }
    }

    public bool IsActive
    {
        get { return _IsActive; }
        set { _IsActive = value; }
    }

    public event Dequeued<T> OnDequeued = delegate { };
    public event LoggedHandler OnLogged = delegate { };

    private ConcurrentQueue<T> _workerQueue = new ConcurrentQueue<T>();

    private object _locker = new object();

    Thread[] _workers;

    #region IDisposable Members

    int _workerCount=0;

    ManualResetEventSlim _mres = new ManualResetEventSlim();

    public void Dispose()
    {
        _IsActive = false;

        _mres.Set();

        LogWriter.Write("55555555555");

          for (int i = 0; i < _workerCount; i++)
          // Wait for the consumer's thread to finish.
          {
             _workers[i].Join();        
          }
           LogWriter.Write("6666666666");
     // Release any OS resources.
    }
    public ProducerConsumerQueue(int workerCount)
    {
        try
        {
            _workerCount = workerCount;
            _workers = new Thread[workerCount];
            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (_workers[i] = new Thread(Work)).Start();
        }
        catch (Exception ex)
        {
            OnLogged(ex.Message + ex.StackTrace);
        }

    }
    #endregion

    #region IProducerConsumerQueue<T> Members

    public void EnqueueTask(T task)
    {
        if (_IsActive)
        {
            _workerQueue.Enqueue(task);
            //Monitor.Pulse(_locker);
            _mres.Set();
        }
    }

    public void Work()
    {
      while (_IsActive)
      {
          try
          {
              T item = Dequeue();
              if (item != null)
                  OnDequeued(item);
          }
          catch (Exception ex)
          {
              OnLogged(ex.Message + ex.StackTrace);
          }              
      }
    }

    #endregion
    private T Dequeue()
    {
        try
        {
            T dequeueItem;
            //if (_workerQueue.Count > 0)
            //{
            _workerQueue.TryDequeue(out dequeueItem);
            if (dequeueItem != null)
                return dequeueItem;
            //}
            if (_IsActive)
            {
                _mres.Wait();
                _mres.Reset();
            }
            //_workerQueue.TryDequeue(out dequeueItem);
            return dequeueItem;
        }
        catch (Exception ex)
        {
            OnLogged(ex.Message + ex.StackTrace);
            T dequeueItem;
            //if (_workerQueue.Count > 0)
            //{
            _workerQueue.TryDequeue(out dequeueItem);
            return dequeueItem;
        }

    }


    public void Clear()
    {
        _workerQueue = new ConcurrentQueue<T>();
    }
}

}

当调用 Dispose 时,它​​有时会在连接上阻塞(消耗一个线程)并且 dispose 方法被卡住。我猜它会卡在重置事件的等待中,但为此我将其称为处置集。 有什么建议吗?

i have built a Producer Consumer queue wrapping a ConcurrentQueue of .net 4.0 with SlimManualResetEvent signaling between the producing (Enqueue) and the consuming (while(true) thread based.
the queue looks like:

public class ProducerConsumerQueue<T> : IDisposable, IProducerConsumerQueue<T>
{
    private bool _IsActive=true;

    public int Count
    {
        get
        {
            return this._workerQueue.Count;
        }
    }

    public bool IsActive
    {
        get { return _IsActive; }
        set { _IsActive = value; }
    }

    public event Dequeued<T> OnDequeued = delegate { };
    public event LoggedHandler OnLogged = delegate { };

    private ConcurrentQueue<T> _workerQueue = new ConcurrentQueue<T>();

    private object _locker = new object();

    Thread[] _workers;

    #region IDisposable Members

    int _workerCount=0;

    ManualResetEventSlim _mres = new ManualResetEventSlim();

    public void Dispose()
    {
        _IsActive = false;

        _mres.Set();

        LogWriter.Write("55555555555");

          for (int i = 0; i < _workerCount; i++)
          // Wait for the consumer's thread to finish.
          {
             _workers[i].Join();        
          }
           LogWriter.Write("6666666666");
     // Release any OS resources.
    }
    public ProducerConsumerQueue(int workerCount)
    {
        try
        {
            _workerCount = workerCount;
            _workers = new Thread[workerCount];
            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (_workers[i] = new Thread(Work)).Start();
        }
        catch (Exception ex)
        {
            OnLogged(ex.Message + ex.StackTrace);
        }

    }
    #endregion

    #region IProducerConsumerQueue<T> Members

    public void EnqueueTask(T task)
    {
        if (_IsActive)
        {
            _workerQueue.Enqueue(task);
            //Monitor.Pulse(_locker);
            _mres.Set();
        }
    }

    public void Work()
    {
      while (_IsActive)
      {
          try
          {
              T item = Dequeue();
              if (item != null)
                  OnDequeued(item);
          }
          catch (Exception ex)
          {
              OnLogged(ex.Message + ex.StackTrace);
          }              
      }
    }

    #endregion
    private T Dequeue()
    {
        try
        {
            T dequeueItem;
            //if (_workerQueue.Count > 0)
            //{
            _workerQueue.TryDequeue(out dequeueItem);
            if (dequeueItem != null)
                return dequeueItem;
            //}
            if (_IsActive)
            {
                _mres.Wait();
                _mres.Reset();
            }
            //_workerQueue.TryDequeue(out dequeueItem);
            return dequeueItem;
        }
        catch (Exception ex)
        {
            OnLogged(ex.Message + ex.StackTrace);
            T dequeueItem;
            //if (_workerQueue.Count > 0)
            //{
            _workerQueue.TryDequeue(out dequeueItem);
            return dequeueItem;
        }

    }


    public void Clear()
    {
        _workerQueue = new ConcurrentQueue<T>();
    }
}

}

when calling Dispose it sometimes blocks on the join (one thread consuming) and the dispose method is stuck. i guess it get's stuck on the Wait of the resetEvents but for that i call the set on the dispose.
any suggestions?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

围归者 2024-10-03 17:53:09

更新:我理解您关于内部需要队列的观点。我建议使用 BlockingCollection 是基于以下事实:您的代码包含大量提供阻止行为的逻辑。自己编写这样的逻辑很容易出现错误(我从经验中知道这一点);因此,当框架内有一个现有的类至少可以为您完成一些工作时,通常最好选择它。

如何使用 BlockingCollection 实现此类的完整示例有点太大,无法包含在此答案中,因此我发布了一个有效的 pastebin.com 上的示例;请随意查看并看看您的想法。

我还在此处编写了一个示例程序来演示上述示例。

我的代码正确吗?我不会太自信地说“是”;毕竟,我还没有编写单元测试,也没有对其运行任何诊断等。这只是一个基本草案,让您了解如何使用 BlockingCollection 而不是 ConcurrentQueue; 清理了很多逻辑(在我看来),并使其更容易专注于类的主要目的(从队列中消费项目并通知订阅者),而不是其实现有些困难的方面(内部队列的阻塞行为)。


评论中提出的问题:

您不使用 BlockingCollection 有什么原因吗?

您的答案:

[...]我需要一个队列。

来自 关于 BlockingCollection的默认构造函数的 MSDN 文档类

默认的基础集合是 ConcurrentQueue.

如果您选择实现自己的类而不是使用 BlockingCollection唯一原因是您需要一个 FIFO 队列,那么...您可能需要重新考虑你的决定。使用默认无参数构造函数实例化的 BlockingCollection 是一个 FIFO 队列。

也就是说,虽然我认为我无法对您发布的代码进行全面分析,但我至少可以提供一些提示:

  1. 我会非常犹豫是否以您在这里的方式使用事件一个处理这种棘手的多线程行为的类。调用代码可以附加它想要的任何事件处理程序,而这些处理程序又可能会抛出异常(您无法捕获)、长时间阻塞,甚至可能由于完全超出您控制范围的原因而陷入死锁——这在以下情况下非常糟糕:阻塞队列的情况。
  2. 您的 DequeueDispose 方法中存在竞争条件。

查看 Dequeue 方法中的这些行:

if (_IsActive) // point A
{
    _mres.Wait(); // point C
    _mres.Reset(); // point D
}

现在看看 Dispose 中的这两行:

_IsActive = false;

_mres.Set(); // point B

假设您有三个线程,T1、T2 和 T3。 T1 和 T2 均位于 A 点,其中每个都检查 _IsActive 并找到 true< /代码>。然后调用 Dispose,并且 T3_IsActive 设置为 false(但 T1 > 和 T2 已通过点 A),然后到达点 B,在此处调用 _mres.Set().然后 T1 到达点 C,移动到点 D,并调用 _mres.Reset()。现在 T2 到达点 C 并将永远卡住,因为 _mres.Set 将不会再次被调用(任何执行 Enqueue< 的线程) /code>会发现_IsActive == false并立即返回,并且执行Dispose的线程已经经过了B点)。

我很乐意尝试为解决此竞争条件提供一些帮助,但我怀疑 BlockingCollection 实际上并不正是您所需的类。如果您可以提供更多信息来让我相信情况并非如此,也许我会再看一下。

Update: I understand your point about needing a queue internally. My suggestion to use a BlockingCollection<T> is based on the fact that your code contains a lot of logic to provide the blocking behavior. Writing such logic yourself is very prone to bugs (I know this from experience); so when there's an existing class within the framework that does at least some of the work for you, it's generally preferable to go with that.

A complete example of how you can implement this class using a BlockingCollection<T> is a little bit too large to include in this answer, so I've posted a working example on pastebin.com; feel free to take a look and see what you think.

I also wrote an example program demonstrating the above example here.

Is my code correct? I wouldn't say yes with too much confidence; after all, I haven't written unit tests, run any diagnostics on it, etc. It's just a basic draft to give you an idea how using BlockingCollection<T> instead of ConcurrentQueue<T> cleans up a lot of your logic (in my opinion) and makes it easier to focus on the main purpose of your class (consuming items from a queue and notifying subscribers) rather than a somewhat difficult aspect of its implementation (the blocking behavior of the internal queue).


Question posed in a comment:

Any reason you're not using BlockingCollection<T>?

Your answer:

[...] i needed a queue.

From the MSDN documentation on the default constructor for the BlockingCollection<T> class:

The default underlying collection is a ConcurrentQueue<T>.

If the only reason you opted to implement your own class instead of using BlockingCollection<T> is that you need a FIFO queue, well then... you might want to rethink your decision. A BlockingCollection<T> instantiated using the default parameterless constructor is a FIFO queue.

That said, while I don't think I can offer a comprehensive analysis of the code you've posted, I can at least offer a couple of pointers:

  1. I'd be very hesitant to use events in the way that you are here for a class that deals with such tricky multithreaded behavior. Calling code can attach any event handlers it wants, and these can in turn throw exceptions (which you don't catch), block for long periods of time, or possibly even deadlock for reasons completely outside your control--which is very bad in the case of a blocking queue.
  2. There's a race condition in your Dequeue and Dispose methods.

Look at these lines of your Dequeue method:

if (_IsActive) // point A
{
    _mres.Wait(); // point C
    _mres.Reset(); // point D
}

And now take a look at these two lines from Dispose:

_IsActive = false;

_mres.Set(); // point B

Let's say you have three threads, T1, T2, and T3. T1 and T2 are both at point A, where each checks _IsActive and finds true. Then Dispose is called, and T3 sets _IsActive to false (but T1 and T2 have already passed point A) and then reaches point B, where it calls _mres.Set(). Then T1 gets to point C, moves on to point D, and calls _mres.Reset(). Now T2 reaches point C and will be stuck forever since _mres.Set will not be called again (any thread executing Enqueue will find _IsActive == false and return immediately, and the thread executing Dispose has already passed point B).

I'd be happy to try and offer some help on solving this race condition, but I'm skeptical that BlockingCollection<T> isn't in fact exactly the class you need for this. If you can provide some more information to convince me that this isn't the case, maybe I'll take another look.

请别遗忘我 2024-10-03 17:53:09

由于_IsActive未标记为易失性并且所有访问都没有,因此每个核心可以为该值和该缓存拥有单独的缓存可能永远不会刷新。因此,在 Dispose 中将 _IsActive 标记为 false 实际上不会影响所有正在运行的线程。

http://igoro.com/archive/volatile-keyword- in-c-内存模型-解释/

private volatile bool _IsActive=true;

Since _IsActive isn't marked as volatile and there's no lock around all access, each core can have a separate cache for this value and that cache may never get refreshed. So marking _IsActive to false in Dispose will not actually affect all running threads.

http://igoro.com/archive/volatile-keyword-in-c-memory-model-explained/

private volatile bool _IsActive=true;
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文