AutoResetEvent 未正确阻止

发布于 2024-09-15 19:54:26 字数 3336 浏览 8 评论 0原文

我有一个线程,它创建可变数量的工作线程并在它们之间分配任务。这是通过向线程传递一个 TaskQueue 对象来解决的,您将在下面看到其实现。

这些工作线程只是迭代它们所获得的TaskQueue 对象,执行每个任务。

private class TaskQueue : IEnumerable<Task>
{
    public int Count
    {
        get
        {
            lock(this.tasks)
            {
                return this.tasks.Count;
            }
        }
    }

    private readonly Queue<Task> tasks = new Queue<Task>();
    private readonly AutoResetEvent taskWaitHandle = new AutoResetEvent(false);

    private bool isFinishing = false;
    private bool isFinished = false;

    public void Enqueue(Task task)
    {
        Log.Trace("Entering Enqueue, lock...");
        lock(this.tasks)
        {
            Log.Trace("Adding task, current count = {0}...", Count);
            this.tasks.Enqueue(task);

            if (Count == 1)
            {
                Log.Trace("Count = 1, so setting the wait handle...");
                this.taskWaitHandle.Set();
            }
        }
        Log.Trace("Exiting enqueue...");
    }

    public Task Dequeue()
    {
        Log.Trace("Entering Dequeue...");
        if (Count == 0)
        {
            if (this.isFinishing)
            {
                Log.Trace("Finishing (before waiting) - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }

            Log.Trace("Count = 0, lets wait for a task...");
            this.taskWaitHandle.WaitOne();
            Log.Trace("Wait handle let us through, Count = {0}, IsFinishing = {1}, Returned = {2}", Count, this.isFinishing);

            if(this.isFinishing)
            {
                Log.Trace("Finishing - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }
        }

        Log.Trace("Entering task lock...");
        lock(this.tasks)
        {
            Log.Trace("Entered task lock, about to dequeue next item, Count = {0}", Count);
            return this.tasks.Dequeue();
        }
    }

    public void Finish()
    {
        Log.Trace("Setting TaskQueue state to isFinishing = true and setting wait handle...");
        this.isFinishing = true;

        if (Count == 0)
        {
            this.taskWaitHandle.Set();
        }
    }

    public IEnumerator<Task> GetEnumerator()
    {
        while(true)
        {
            Task t = Dequeue();
            if(this.isFinished)
            {
                yield break;
            }

            yield return t;
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

正如您所看到的,我使用 AutoResetEvent 对象来确保工作线程不会过早退出,即在获取任何任务之前退出。

简而言之:

  • 主线程通过将任务放入其任务队列来将任务分配给线程,
  • 主线程通过调用任务队列的完成来通知线程没有更多任务要执行() 方法
  • 工作线程通过调用任务队列的 Dequeue() 方法来检索分配给它的下一个任务。

问题是 Dequeue() 方法经常 < strong>抛出 InvalidOperationException,表示队列为空。正如您所看到的,我添加了一些日志记录,事实证明,AutoResetEvent 不会阻止Dequeue(),即使没有调用它的 >Set() 方法。

据我了解,调用 AutoResetEvent.Set() 将允许等待线程继续(之前调用 AutoResetEvent.WaitOne()),然后自动调用 AutoResetEvent.Reset(),阻塞下一个等待者。

那么可能有什么问题呢?我是不是搞错了什么?我有什么地方有错误吗? 我已经在上面坐了三个小时了,但我不知道出了什么问题。 请帮我!

非常感谢!

I have a thread, which creates a variable number of worker threads and distributes tasks between them. This is solved by passing the threads a TaskQueue object, whose implementation you will see below.

These worker threads simply iterate over the TaskQueue object they were given, executing each task.

private class TaskQueue : IEnumerable<Task>
{
    public int Count
    {
        get
        {
            lock(this.tasks)
            {
                return this.tasks.Count;
            }
        }
    }

    private readonly Queue<Task> tasks = new Queue<Task>();
    private readonly AutoResetEvent taskWaitHandle = new AutoResetEvent(false);

    private bool isFinishing = false;
    private bool isFinished = false;

    public void Enqueue(Task task)
    {
        Log.Trace("Entering Enqueue, lock...");
        lock(this.tasks)
        {
            Log.Trace("Adding task, current count = {0}...", Count);
            this.tasks.Enqueue(task);

            if (Count == 1)
            {
                Log.Trace("Count = 1, so setting the wait handle...");
                this.taskWaitHandle.Set();
            }
        }
        Log.Trace("Exiting enqueue...");
    }

    public Task Dequeue()
    {
        Log.Trace("Entering Dequeue...");
        if (Count == 0)
        {
            if (this.isFinishing)
            {
                Log.Trace("Finishing (before waiting) - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }

            Log.Trace("Count = 0, lets wait for a task...");
            this.taskWaitHandle.WaitOne();
            Log.Trace("Wait handle let us through, Count = {0}, IsFinishing = {1}, Returned = {2}", Count, this.isFinishing);

            if(this.isFinishing)
            {
                Log.Trace("Finishing - isCompleted set, returning empty task.");
                this.isFinished = true;
                return new Task();
            }
        }

        Log.Trace("Entering task lock...");
        lock(this.tasks)
        {
            Log.Trace("Entered task lock, about to dequeue next item, Count = {0}", Count);
            return this.tasks.Dequeue();
        }
    }

    public void Finish()
    {
        Log.Trace("Setting TaskQueue state to isFinishing = true and setting wait handle...");
        this.isFinishing = true;

        if (Count == 0)
        {
            this.taskWaitHandle.Set();
        }
    }

    public IEnumerator<Task> GetEnumerator()
    {
        while(true)
        {
            Task t = Dequeue();
            if(this.isFinished)
            {
                yield break;
            }

            yield return t;
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

As you can see, I'm using an AutoResetEvent object to make sure that the worker threads don't exit prematurely, i.e. before getting any tasks.

In a nutshell:

  • the main thread assigns a task to a thread by Enqeueue-ing a task to its TaskQueue
  • the main thread notifies the thread that are no more tasks to execute by calling the TaskQueue's Finish() method
  • the worker thread retrieves the next task assigned to it by calling the TaskQueue's Dequeue() method

The problem is that the Dequeue() method often throws an InvalidOperationException, saying that the Queue is empty. As you can see I added some logging, and it turns out, that the AutoResetEvent doesn't block the Dequeue(), even though there were no calls to its Set() method.

As I understand it, calling AutoResetEvent.Set() will allow a waiting thread to proceed (who previously called AutoResetEvent.WaitOne()), and then automatically calls AutoResetEvent.Reset(), blocking the next waiter.

So what can be wrong? Did I get something wrong? Do I have an error somewhere?
I'm sitting above this for 3 hours now, but I cannot figure out what's wrong.
Please help me!

Thank you very much!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

老娘不死你永远是小三 2024-09-22 19:54:26

您的出列代码不正确。你检查了锁上的伯爵,然后摸索着裤子的接缝,然后你期望任务有一些东西。释放锁时不能保留假设:)。您的 Count 检查和tasks.Dequeue 必须在锁定下发生:

bool TryDequeue(out Tasks task)
{
  task = null;
  lock (this.tasks) {
    if (0 < tasks.Count) {
      task = tasks.Dequeue();
    }
  }
  if (null == task) {
    Log.Trace ("Queue was empty");
  }
  return null != task;
 }

您的 Enqueue() 代码同样充满了问题。您的入队/出队不能确保进度(即使队列中有项目,出队线程也会被阻塞等待)。您的 Enqueue() 签名是错误的。总的来说,你的帖子是非常非常糟糕的代码。坦率地说,我认为你在这里试图咀嚼的东西超出了你能咬的范围......哦,还有永远不要在锁定状态下登录

我强烈建议您使用 ConcurrentQueue

如果您无法访问 .Net 4.0,这里有一个可以帮助您入门的实现:

public class ConcurrentQueue<T>:IEnumerable<T>
{
    volatile bool fFinished = false;
    ManualResetEvent eventAdded = new ManualResetEvent(false);
    private Queue<T> queue = new Queue<T>();
    private object syncRoot = new object();

    public void SetFinished()
    {
        lock (syncRoot)
        {
            fFinished = true;
            eventAdded.Set();
        }
    }

    public void Enqueue(T t)
    {
        Debug.Assert (false == fFinished);
        lock (syncRoot)
        {
            queue.Enqueue(t);
            eventAdded.Set();
        }
    }

    private bool Dequeue(out T t)
    {
        do
        {
            lock (syncRoot)
            {
                if (0 < queue.Count)
                {
                    t = queue.Dequeue();
                    return true;
                }
                if (false == fFinished)
                {
                    eventAdded.Reset ();
                }
            }
            if (false == fFinished)
            {
                eventAdded.WaitOne();
            }
            else
            {
                break;
            }
        } while (true);
        t = default(T);
        return false;
    }


    public IEnumerator<T> GetEnumerator()
    {
        T t;
        while (Dequeue(out t))
        {
            yield return t;
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

Your dequeue code is incorrect. You check the Count under lock, then fly by the seams of your pants, and then you expect the tasks to have something. You cannot retain assumptions while you release the lock :). Your Count check and tasks.Dequeue must occur under lock:

bool TryDequeue(out Tasks task)
{
  task = null;
  lock (this.tasks) {
    if (0 < tasks.Count) {
      task = tasks.Dequeue();
    }
  }
  if (null == task) {
    Log.Trace ("Queue was empty");
  }
  return null != task;
 }

You Enqueue() code is similarly riddled with problems. Your Enqueue/Dequeue don't ensure progress (you will have dequeue threads blocked waiting even though there are items in the queue). Your signature of Enqueue() is wrong. Overall your post is very very poor code. Frankly, I think you're trying to chew more than you can bite here... Oh, and never log under lock.

I strongly suggest you just use ConcurrentQueue.

If you don't have access to .Net 4.0 here is an implementation to get you started:

public class ConcurrentQueue<T>:IEnumerable<T>
{
    volatile bool fFinished = false;
    ManualResetEvent eventAdded = new ManualResetEvent(false);
    private Queue<T> queue = new Queue<T>();
    private object syncRoot = new object();

    public void SetFinished()
    {
        lock (syncRoot)
        {
            fFinished = true;
            eventAdded.Set();
        }
    }

    public void Enqueue(T t)
    {
        Debug.Assert (false == fFinished);
        lock (syncRoot)
        {
            queue.Enqueue(t);
            eventAdded.Set();
        }
    }

    private bool Dequeue(out T t)
    {
        do
        {
            lock (syncRoot)
            {
                if (0 < queue.Count)
                {
                    t = queue.Dequeue();
                    return true;
                }
                if (false == fFinished)
                {
                    eventAdded.Reset ();
                }
            }
            if (false == fFinished)
            {
                eventAdded.WaitOne();
            }
            else
            {
                break;
            }
        } while (true);
        t = default(T);
        return false;
    }


    public IEnumerator<T> GetEnumerator()
    {
        T t;
        while (Dequeue(out t))
        {
            yield return t;
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}
小巷里的女流氓 2024-09-22 19:54:26

我正在等待更详细的答复,但我只想指出一些非常重要的事情。

如果您使用的是 .NET 3.5,您可以使用 ConcurrentQueue。向后移植包含在 Rx 扩展库,适用于 .NET 3.5。

由于您想要阻塞行为,因此需要将 ConcurrentQueue 包装在 BlockingCollection(也可作为 Rx 的一部分)。

A more detailed answer from me is pending, but I just want to point out something very important.

If you're using .NET 3.5, you can use the ConcurrentQueue<T> class. A backport is included in the Rx extensions library, which is available for .NET 3.5.

Since you want blocking behavior, you would need to wrap a ConcurrentQueue<T> in a BlockingCollection<T> (also available as part of Rx).

凌乱心跳 2024-09-22 19:54:26

看起来您正在尝试复制阻塞队列。 .NET 4.0 BCL 中已存在一个 BlockingCollection。如果 .NET 4.0 不适合您,那么您可以使用此代码。它使用 Monitor.WaitMonitor.Pulse 方法而不是 AutoResetEvent

public class BlockingCollection<T>
{
    private Queue<T> m_Queue = new Queue<T>();

    public T Take() // Dequeue
    {
        lock (m_Queue)
        {
            while (m_Queue.Count <= 0)
            {
                Monitor.Wait(m_Queue);
            }
            return m_Queue.Dequeue();
        }
    }

    public void Add(T data) // Enqueue
    {
        lock (m_Queue)
        {
            m_Queue.Enqueue(data);
            Monitor.Pulse(m_Queue);
        }
    }
}

更新:

相当确定如果您希望它是线程安全的,则不可能使用AutoResetEvent实现生产者-消费者队列对于多个生产者和多个消费者(如果有人能想出一个反例,我准备被证明是错误的)。当然,你会在互联网上看到一些例子,但它们都是错误的。事实上,微软的一次此类尝试是缺陷在于队列可能会实时锁定

It looks like you are trying to replicate a blocking queue. One already exists in the .NET 4.0 BCL as a BlockingCollection. If .NET 4.0 is not an option for you then you can use this code. It use the Monitor.Wait and Monitor.Pulse method instead of AutoResetEvent.

public class BlockingCollection<T>
{
    private Queue<T> m_Queue = new Queue<T>();

    public T Take() // Dequeue
    {
        lock (m_Queue)
        {
            while (m_Queue.Count <= 0)
            {
                Monitor.Wait(m_Queue);
            }
            return m_Queue.Dequeue();
        }
    }

    public void Add(T data) // Enqueue
    {
        lock (m_Queue)
        {
            m_Queue.Enqueue(data);
            Monitor.Pulse(m_Queue);
        }
    }
}

Update:

I am fairly certain that it is not possible to implement a producer-consumer queue using AutoResetEvent if you want it to be thread-safe for multiple producers and multiple consumers (I am prepared to be proven wrong if someone can come up with a counter example). Sure, you will see examples on the internet, but they are all wrong. In fact, one such attempt by Microsoft is flawed in that the queue can get live-locked.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文