ManualResetEvent 与 Thread.Sleep

发布于 2024-07-26 12:58:37 字数 2382 浏览 9 评论 0原文

我实现了以下后台处理线程,其中 Jobs 是一个 Queue

static void WorkThread()
{
    while (working)
    {
        var job;

        lock (Jobs)
        {
            if (Jobs.Count > 0)
                job = Jobs.Dequeue();
        }

        if (job == null)
        {
            Thread.Sleep(1);
        }
        else
        {
            // [snip]: Process job.
        }
    }
}

这在输入作业和实际开始作业之间产生了明显的延迟运行(一次输入批量作业,每个作业都[相对]小。)延迟并不是什么大问题,但我开始考虑这个问题,并进行了以下更改:

static ManualResetEvent _workerWait = new ManualResetEvent(false);
// ...
    if (job == null)
    {
        lock (_workerWait)
        {
            _workerWait.Reset();
        }
        _workerWait.WaitOne();
    }

线程添加的位置jobs 现在会锁定 _workerWait 并在添加作业完成后调用 _workerWait.Set()。 这个解决方案(看起来)立即开始处理作业,并且延迟完全消失了。

我的问题部分是“为什么会发生这种情况?”,考虑到 Thread.Sleep(int) 可以很好地睡眠比您指定的时间更长的时间,部分是“ManualResetEvent如何> 达到这种性能水平吗?”。

编辑:由于有人询问了排队项目的功能,这里就是它,以及目前的完整系统。

public void RunTriggers(string data)
{
    lock (this.SyncRoot)
    {
        this.Triggers.Sort((a, b) => { return a.Priority - b.Priority; });

        foreach (Trigger trigger in this.Triggers)
        {
            lock (Jobs)
            {
                Jobs.Enqueue(new TriggerData(this, trigger, data));
                _workerWait.Set();
            }
        }
    }
}

static private ManualResetEvent _workerWait = new ManualResetEvent(false);
static void WorkThread()
{
    while (working)
    {
        TriggerData job = null;

        lock (Jobs)
        {
            if (Jobs.Count > 0)
                job = Jobs.Dequeue();

            if (job == null)
            {
                _workerWait.Reset();
            }
        }

        if (job == null)
            _workerWait.WaitOne();
        else
        {
            try
            {
                foreach (Match m in job.Trigger.Regex.Matches(job.Data))
                    job.Trigger.Value.Action(job.World, m);
            }
            catch (Exception ex)
            {
                job.World.SendLineToClient("\r\n\x1B[32m -- {0} in trigger ({1}): {2}\x1B[m",
                    ex.GetType().ToString(), job.Trigger.Name, ex.Message);
            }
        }
    }
}

I implemented the following background processing thread, where Jobs is a Queue<T>:

static void WorkThread()
{
    while (working)
    {
        var job;

        lock (Jobs)
        {
            if (Jobs.Count > 0)
                job = Jobs.Dequeue();
        }

        if (job == null)
        {
            Thread.Sleep(1);
        }
        else
        {
            // [snip]: Process job.
        }
    }
}

This produced a noticable delay between when the jobs were being entered and when they were actually starting to be run (batches of jobs are entered at once, and each job is only [relatively] small.) The delay wasn't a huge deal, but I got to thinking about the problem, and made the following change:

static ManualResetEvent _workerWait = new ManualResetEvent(false);
// ...
    if (job == null)
    {
        lock (_workerWait)
        {
            _workerWait.Reset();
        }
        _workerWait.WaitOne();
    }

Where the thread adding jobs now locks _workerWait and calls _workerWait.Set() when it's done adding jobs. This solution (seemingly) instantly starts processing jobs, and the delay is gone altogether.

My question is partly "Why does this happen?", granted that Thread.Sleep(int) can very well sleep for longer than you specify, and partly "How does the ManualResetEvent achieve this level of performance?".

EDIT: Since someone asked about the function that's queueing items, here it is, along with the full system as it stands at the moment.

public void RunTriggers(string data)
{
    lock (this.SyncRoot)
    {
        this.Triggers.Sort((a, b) => { return a.Priority - b.Priority; });

        foreach (Trigger trigger in this.Triggers)
        {
            lock (Jobs)
            {
                Jobs.Enqueue(new TriggerData(this, trigger, data));
                _workerWait.Set();
            }
        }
    }
}

static private ManualResetEvent _workerWait = new ManualResetEvent(false);
static void WorkThread()
{
    while (working)
    {
        TriggerData job = null;

        lock (Jobs)
        {
            if (Jobs.Count > 0)
                job = Jobs.Dequeue();

            if (job == null)
            {
                _workerWait.Reset();
            }
        }

        if (job == null)
            _workerWait.WaitOne();
        else
        {
            try
            {
                foreach (Match m in job.Trigger.Regex.Matches(job.Data))
                    job.Trigger.Value.Action(job.World, m);
            }
            catch (Exception ex)
            {
                job.World.SendLineToClient("\r\n\x1B[32m -- {0} in trigger ({1}): {2}\x1B[m",
                    ex.GetType().ToString(), job.Trigger.Name, ex.Message);
            }
        }
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

我是有多爱你 2024-08-02 12:58:37

这些事件是操作系统/内核提供的内核原语,专为此类事情而设计。 内核提供了一个边界,您可以在该边界上保证原子操作,这对于同步很重要(在硬件支持的情况下也可以在用户空间中完成一些原子性)。

简而言之,当线程等待事件时,它会被放入该事件的等待列表中并标记为不可运行。
当事件发出信号时,内核唤醒等待列表中的事件并将它们标记为可运行,并且它们可以继续运行。 当事件发出信号时,线程可以立即唤醒,这自然是一个巨大的好处,而不是长时间休眠并时不时地重新检查条件。

即使一毫秒也是一个非常非常长的时间,您可能在这段时间内处理了数千个事件。 此外,时间分辨率传统上为 10 毫秒,因此睡眠时间少于 10 毫秒通常只会导致 10 毫秒睡眠。 有了事件,就可以立即唤醒线程并进行调度

The events are kernel primitives provided by the OS/Kernel that's designed just for this sort of things. The kernel provides a boundary upon which you can guarantee atomic operations which is important for synchronization(Some atomicity can be done in user space too with hardware support).

In short, when a thread waits on an event it's put on a waiting list for that event and marked as non-runnable.
When the event is signaled, the kernel wakes up the ones in the waiting list and marks them as runnable and they can continue to run. It's naturally a huge benefit that a thread can wake up immediately when the event is signalled, vs sleeping for a long time and recheck the condition every now and then.

Even one millisecond is a really really long time, you could have processed thousands of event in that time. Also the time resolution is traditionally 10ms, so sleeping less than 10ms usually just results in a 10ms sleep anyway. With an event, a thread can be woken up and scheduled immediately

ぺ禁宫浮华殁 2024-08-02 12:58:37

首先锁定 _workerWait 是没有意义的,Event 是一个系统(内核)对象,设计用于在线程之间发送信号(并在 Win32 API 中大量用于异步操作)。 因此,多个线程设置或重置它是非常安全的,无需额外的同步。

至于您的主要问题,还需要查看将事物放入队列的逻辑,以及有关每个作业完成了多少工作的一些信息(工作线程是否花费更多时间处理工作或等待工作)。

最好的解决方案可能是使用对象实例来锁定并使用 Monitor.PulseMonitor.Wait 作为条件变量。

编辑:查看要排队的代码,似乎答案 #1116297 说得对:考虑到许多工作项的处理速度非常快,1 毫秒的延迟太长了,无法等待。

拥有唤醒工作线程的机制的方法是正确的(因为不存在具有阻塞出队操作的.NET并发队列)。 然而,与使用事件相比,条件变量会更高效(因为在非竞争情况下,它不需要内核转换):

object sync = new Object();
var queue = new Queue<TriggerData>();

public void EnqueueTriggers(IEnumerable<TriggerData> triggers) {
  lock (sync) {
    foreach (var t in triggers) {
      queue.Enqueue(t);
    }
    Monitor.Pulse(sync);  // Use PulseAll if there are multiple worker threads
  }
}

void WorkerThread() {
  while (!exit) {
    TriggerData job = DequeueTrigger();
    // Do work
  }
}

private TriggerData DequeueTrigger() {
  lock (sync) {
    if (queue.Count > 0) {
      return queue.Dequeue();
    }
    while (queue.Count == 0) {
      Monitor.Wait(sync);
    }
    return queue.Dequeue();
  }
}

Monitor.Wait 将释放参数上的锁,等到 针对锁调用 Pulse()PulseAll(),然后重新进入锁并返回。 需要重新检查等待条件,因为其他一些线程可能已从队列中读取该项目。

First locking on _workerWait is pointless, an Event is a system (kernel) object designed for signaling between threads (and heavily used in the Win32 API for asynchronous operations). Therefore it is quite safe for multiple threads to set or reset it without additional synchronization.

As to your main question, need to see the logic for placing things on the queue as well, and some information on how much work is done for each job (is the worker thread spending more time processing work or on waiting for work).

Likely the best solution would be to use an object instance to lock on and use Monitor.Pulse and Monitor.Wait as a condition variable.

Edit: With a view of the code to enqueue, it appears that answer #1116297 has it right: a 1ms delay is too long to wait, given that many of the work items will be extremely quick to process.

The approach of having a mechanism to wake up the worker thread is correct (as there is no .NET concurrent queue with a blocking dequeue operation). However rather than using an event, a condition variable is going to be a little more efficient (as in non-contended cases it does not require a kernel transition):

object sync = new Object();
var queue = new Queue<TriggerData>();

public void EnqueueTriggers(IEnumerable<TriggerData> triggers) {
  lock (sync) {
    foreach (var t in triggers) {
      queue.Enqueue(t);
    }
    Monitor.Pulse(sync);  // Use PulseAll if there are multiple worker threads
  }
}

void WorkerThread() {
  while (!exit) {
    TriggerData job = DequeueTrigger();
    // Do work
  }
}

private TriggerData DequeueTrigger() {
  lock (sync) {
    if (queue.Count > 0) {
      return queue.Dequeue();
    }
    while (queue.Count == 0) {
      Monitor.Wait(sync);
    }
    return queue.Dequeue();
  }
}

Monitor.Wait will release the lock on the parameter, wait until Pulse() or PulseAll() is called against the lock, then re-enter the lock and return. Need to recheck the wait condition because some other thread could have read the item off the queue.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文