WaitHandle.WaitAll 64 个句柄限制的解决方法?

发布于 2024-08-29 22:07:09 字数 1022 浏览 10 评论 0原文

我的应用程序通过 ThreadPool.QueueUserWorkItem 生成大量不同的小型工作线程,我通过多个ManualResetEvent 实例对其进行跟踪。我使用 WaitHandle.WaitAll 方法阻止我的应用程序关闭,直到这些线程完成为止。

然而,我以前从未遇到过任何问题,因为我的应用程序承受更多负载,即创建更多线程,我现在开始遇到此异常:

WaitHandles 必须小于或等于 64 - 缺少文档

最好的替代解决方案是什么?

代码片段

List<AutoResetEvent> events = new List<AutoResetEvent>();

// multiple instances of...
var evt = new AutoResetEvent(false);
events.Add(evt);
ThreadPool.QueueUserWorkItem(delegate
{
    // do work
    evt.Set();
});

...
WaitHandle.WaitAll(events.ToArray());

解决方法

int threadCount = 0;
ManualResetEvent finished = new ManualResetEvent(false);

...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
    try
    {
         // do work
    }
    finally
    {
        if (Interlocked.Decrement(ref threadCount) == 0)
        {
             finished.Set();
        }
    }
});

...
finished.WaitOne();

My application spawns loads of different small worker threads via ThreadPool.QueueUserWorkItem which I keep track of via multiple ManualResetEvent instances. I use the WaitHandle.WaitAll method to block my application from closing until these threads have completed.

I have never had any issues before, however, as my application is coming under more load i.e. more threads being created, I am now beginning to get this exception:

WaitHandles must be less than or equal to 64 - missing documentation

What is the best alternative solution to this?

Code Snippet

List<AutoResetEvent> events = new List<AutoResetEvent>();

// multiple instances of...
var evt = new AutoResetEvent(false);
events.Add(evt);
ThreadPool.QueueUserWorkItem(delegate
{
    // do work
    evt.Set();
});

...
WaitHandle.WaitAll(events.ToArray());

Workaround

int threadCount = 0;
ManualResetEvent finished = new ManualResetEvent(false);

...
Interlocked.Increment(ref threadCount);
ThreadPool.QueueUserWorkItem(delegate
{
    try
    {
         // do work
    }
    finally
    {
        if (Interlocked.Decrement(ref threadCount) == 0)
        {
             finished.Set();
        }
    }
});

...
finished.WaitOne();

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(8

雨巷深深 2024-09-05 22:07:09

创建一个跟踪正在运行的任务数量的变量:

int numberOfTasks = 100;

创建一个信号:

ManualResetEvent signal = new ManualResetEvent(false);

每当任务完成时就减少任务数量:

if (Interlocked.Decrement(ref numberOftasks) == 0)
{

如果没有剩余任务,则设置信号:

    signal.Set();
}

同时,在其他地方,等待信号发出放:

signal.WaitOne();

Create a variable that keeps track of the number of running tasks:

int numberOfTasks = 100;

Create a signal:

ManualResetEvent signal = new ManualResetEvent(false);

Decrement the number of tasks whenever a task is finished:

if (Interlocked.Decrement(ref numberOftasks) == 0)
{

If there is no task remaining, set the signal:

    signal.Set();
}

Meanwhile, somewhere else, wait for the signal to be set:

signal.WaitOne();
゛清羽墨安 2024-09-05 22:07:09

从 .NET 4.0 开始,您还有另外两个(在我看来,更干净的)选项可供您使用。

第一种是使用 CountdownEvent。它不需要您自己处理递增和递减:

int tasks = ...; /* however many tasks you're performing */

// Dispose when done.
using (var e = new CountdownEvent(tasks))
{
    // Queue work.
    ThreadPool.QueueUserWorkItem(() => {
        // Do work
        ...

        // Signal when done.
        e.Signal();
    });

    // Wait till the countdown reaches zero.
    e.Wait();
}

但是,有一个更强大的解决方案,那就是使用 Task,如下所示:

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Factory.StartNew(() => {
        // Do work.
    }

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Task.WaitAll(tasks);

使用 Task< /code> 类以及对 WaitAll 更干净,IMO,因为您在整个代码中编织了更少的线程原语(注意,没有等待句柄);您不必设置计数器,处理递增/递减,您只需设置任务然后等待它们。这使得代码更能表达您想要做什么,而不是如何做(至少在管理并行化方面)。

.NET 4.5 提供了更多选项,您可以通过调用 Task 类上的静态 Run 方法

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Run(() => {
        // Do work.
    })

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Tasks.WaitAll(tasks);

或者,您可以利用 TPL DataFlow 库(它位于 System 命名空间中,因此它是官方的,即使它是从 NuGet 下载的,如实体框架)并使用 < code>ActionBlock,如下所示:

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<object>(o => {
    // Do work.
});

// Post 100 times.
foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null);

// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();

// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();

请注意,ActionBlock 默认情况下一次处理一项,因此如果您想让它处理多项一次操作,您必须通过传递 ExecutionDataflowBlockOptions 实例并设置 MaxDegreeOfParallelism 属性

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

如果您的操作确实是线程安全的,那么您可以设置 < code>MaxDegreeOfParallelsim 属性为 DataFlowBlockOptions.Unbounded

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { 
    MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded
});

重点是,您可以对选项的并行程度进行细粒度控制。

当然,如果您想要将一系列项目传递到 ActionBlock 实例中,那么您可以链接 ISourceBlock实现来提供 ActionBlock,如下所示:

// The buffer block.
var buffer = new BufferBlock<int>();

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<int>(o => {
    // Do work.
});

// Link the action block to the buffer block.
// NOTE: An IDisposable is returned here, you might want to dispose
// of it, although not totally necessary if everything works, but
// still, good housekeeping.
using (link = buffer.LinkTo(actionBlock, 
    // Want to propagate completion state to the action block.
    new DataflowLinkOptions {
        PropagateCompletion = true,
    },
    // Can filter on items flowing through if you want.
    i => true)
{ 
    // Post 100 times to the *buffer*
    foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i);

    // Signal complete, this doesn't actually stop
    // the block, but says that everything is done when the currently
    // posted items are completed.
    actionBlock.Complete();

    // Wait for everything to complete, the Completion property
    // exposes a Task which can be waited on.
    actionBlock.Completion.Wait();
}

根据您需要执行的操作,TPL 数据流库将成为一个更具吸引力的选项,因为它可以处理链接在一起的所有任务之间的并发性,并且它允许您非常具体地说明您希望每个部分的并行程度,同时保持每个块的关注点适当分离。

Starting with .NET 4.0, you have two more (and IMO, cleaner) options available to you.

The first is to use the CountdownEvent class. It prevents the need of having to handle the incrementing and decrementing on your own:

int tasks = ...; /* however many tasks you're performing */

// Dispose when done.
using (var e = new CountdownEvent(tasks))
{
    // Queue work.
    ThreadPool.QueueUserWorkItem(() => {
        // Do work
        ...

        // Signal when done.
        e.Signal();
    });

    // Wait till the countdown reaches zero.
    e.Wait();
}

However, there's an even more robust solution, and that's to use the Task class, like so:

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Factory.StartNew(() => {
        // Do work.
    }

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Task.WaitAll(tasks);

Using the Task class and the call to WaitAll is much cleaner, IMO, as you're weaving less threading primitives throughout your code (notice, no wait handles); you don't have to set up a counter, handle incrementing/decrementing, you just set up your tasks and then wait on them. This lets the code be more expressive in the what of what you want to do and not the primitives of how (at least, in terms of managing the parallelization of it).

.NET 4.5 offers even more options, you can simplify the generation of the sequence of Task instances by calling the static Run method on the Task class:

// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
    // Create task here.
    Task.Run(() => {
        // Do work.
    })

    // No signalling, no anything.
).ToArray();

// Wait on all the tasks.
Tasks.WaitAll(tasks);

Or, you could take advantage of the TPL DataFlow library (it's in the System namespace, so it's official, even though it's a download from NuGet, like Entity Framework) and use an ActionBlock<TInput>, like so:

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<object>(o => {
    // Do work.
});

// Post 100 times.
foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null);

// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();

// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();

Note that the ActionBlock<TInput> by default processes one item at a time, so if you want to have it process multiple actions at one time, you have to set the number of concurrent items you want to process in the constructor by passing a ExecutionDataflowBlockOptions instance and setting the MaxDegreeOfParallelism property:

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

If your action is truly thread safe, then you can set the MaxDegreeOfParallelsim property to DataFlowBlockOptions.Unbounded:

var actionBlock = new ActionBlock<object>(o => {
    // Do work.
}, new ExecutionDataflowBlockOptions { 
    MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded
});

The point being, you have fine-grained control over how parallel you want your options to be.

Of course, if you have a sequence of items that you want passed into your ActionBlock<TInput> instance, then you can link an ISourceBlock<TOutput> implementation to feed the ActionBlock<TInput>, like so:

// The buffer block.
var buffer = new BufferBlock<int>();

// Create the action block.  Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<int>(o => {
    // Do work.
});

// Link the action block to the buffer block.
// NOTE: An IDisposable is returned here, you might want to dispose
// of it, although not totally necessary if everything works, but
// still, good housekeeping.
using (link = buffer.LinkTo(actionBlock, 
    // Want to propagate completion state to the action block.
    new DataflowLinkOptions {
        PropagateCompletion = true,
    },
    // Can filter on items flowing through if you want.
    i => true)
{ 
    // Post 100 times to the *buffer*
    foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i);

    // Signal complete, this doesn't actually stop
    // the block, but says that everything is done when the currently
    // posted items are completed.
    actionBlock.Complete();

    // Wait for everything to complete, the Completion property
    // exposes a Task which can be waited on.
    actionBlock.Completion.Wait();
}

Depending on what you need to do, the TPL Dataflow library becomes a much more attractive option, in that it handles the concurrency across all the tasks linked together, and it allows you to be very specific about just how parallel you want each piece to be, while maintaining proper separation of concerns for each block.

甜尕妞 2024-09-05 22:07:09

您的解决方法不正确。原因是,如果最后一个工作项导致 threadCount 在之前变为零,SetWaitOne 可能会发生争用。 > 排队线程必须有机会对所有工作项进行排队。修复方法很简单。将排队线程视为一个工作项本身。将 threadCount 初始化为 1,并在排队完成时递减并发出信号。

int threadCount = 1;
ManualResetEvent finished = new ManualResetEvent(false);
...
Interlocked.Increment(ref threadCount); 
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
        if (Interlocked.Decrement(ref threadCount) == 0) 
        { 
             finished.Set(); 
        } 
    } 
}); 
... 
if (Interlocked.Decrement(ref threadCount) == 0)
{
  finished.Set();
}
finished.WaitOne(); 

作为个人喜好,我喜欢使用 CountdownEvent 类来为我进行计数。

var finished = new CountdownEvent(1);
...
finished.AddCount();
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
      finished.Signal();
    } 
}); 
... 
finished.Signal();
finished.Wait(); 

Your workaround is not correct. The reason is that the Set and WaitOne could race if the last work item causes the threadCount to go to zero before the queueing thread has had to chance to queue all work items. The fix is simple. Treat your queueing thread as if it were a work item itself. Initialize threadCount to 1 and do a decrement and signal when the queueing is complete.

int threadCount = 1;
ManualResetEvent finished = new ManualResetEvent(false);
...
Interlocked.Increment(ref threadCount); 
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
        if (Interlocked.Decrement(ref threadCount) == 0) 
        { 
             finished.Set(); 
        } 
    } 
}); 
... 
if (Interlocked.Decrement(ref threadCount) == 0)
{
  finished.Set();
}
finished.WaitOne(); 

As a personal preference I like using the CountdownEvent class to do the counting for me.

var finished = new CountdownEvent(1);
...
finished.AddCount();
ThreadPool.QueueUserWorkItem(delegate 
{ 
    try 
    { 
         // do work 
    } 
    finally 
    { 
      finished.Signal();
    } 
}); 
... 
finished.Signal();
finished.Wait(); 
痴者 2024-09-05 22:07:09

添加到 dtb 的答案中,您可以将其包装到一个漂亮的简单类中。

public class Countdown : IDisposable
{
    private readonly ManualResetEvent done;
    private readonly int total;
    private long current;

    public Countdown(int total)
    {
        this.total = total;
        current = total;
        done = new ManualResetEvent(false);
    }

    public void Signal()
    {
        if (Interlocked.Decrement(ref current) == 0)
        {
            done.Set();
        }
    }

    public void Wait()
    {
        done.WaitOne();
    }

    public void Dispose()
    {
        ((IDisposable)done).Dispose();
    }
}

Adding to dtb's answer you can wrap this into a nice simple class.

public class Countdown : IDisposable
{
    private readonly ManualResetEvent done;
    private readonly int total;
    private long current;

    public Countdown(int total)
    {
        this.total = total;
        current = total;
        done = new ManualResetEvent(false);
    }

    public void Signal()
    {
        if (Interlocked.Decrement(ref current) == 0)
        {
            done.Set();
        }
    }

    public void Wait()
    {
        done.WaitOne();
    }

    public void Dispose()
    {
        ((IDisposable)done).Dispose();
    }
}
很酷又爱笑 2024-09-05 22:07:09

当我们想要回调时添加到 dtb 的答案。

using System;
using System.Runtime.Remoting.Messaging;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        Main m = new Main();
        m.TestMRE();
        Console.ReadKey();

    }
}

class Main
{
    CalHandler handler = new CalHandler();
    int numberofTasks =0;
    public void TestMRE()
    {

        for (int j = 0; j <= 3; j++)
        {
            Console.WriteLine("Outer Loop is :" + j.ToString());
            ManualResetEvent signal = new ManualResetEvent(false);
            numberofTasks = 4;
            for (int i = 0; i <= 3; i++)
            {
                CalHandler.count caller = new CalHandler.count(handler.messageHandler);
                caller.BeginInvoke(i, new AsyncCallback(NumberCallback),signal);
            }
            signal.WaitOne();
        }

    }

    private void NumberCallback(IAsyncResult result)
    {
        AsyncResult asyncResult = (AsyncResult)result;

        CalHandler.count caller = (CalHandler.count)asyncResult.AsyncDelegate;

        int num = caller.EndInvoke(asyncResult);

        Console.WriteLine("Number is :"+ num.ToString());

        ManualResetEvent mre = (ManualResetEvent)asyncResult.AsyncState;
        if (Interlocked.Decrement(ref numberofTasks) == 0)
        {
            mre.Set();
        }
    }

}
public class CalHandler
{
    public delegate int count(int number);

    public int messageHandler ( int number )
    {
        return number;
    }

}

Adding to dtb's answer when we want to have callbacks.

using System;
using System.Runtime.Remoting.Messaging;
using System.Threading;

class Program
{
    static void Main(string[] args)
    {
        Main m = new Main();
        m.TestMRE();
        Console.ReadKey();

    }
}

class Main
{
    CalHandler handler = new CalHandler();
    int numberofTasks =0;
    public void TestMRE()
    {

        for (int j = 0; j <= 3; j++)
        {
            Console.WriteLine("Outer Loop is :" + j.ToString());
            ManualResetEvent signal = new ManualResetEvent(false);
            numberofTasks = 4;
            for (int i = 0; i <= 3; i++)
            {
                CalHandler.count caller = new CalHandler.count(handler.messageHandler);
                caller.BeginInvoke(i, new AsyncCallback(NumberCallback),signal);
            }
            signal.WaitOne();
        }

    }

    private void NumberCallback(IAsyncResult result)
    {
        AsyncResult asyncResult = (AsyncResult)result;

        CalHandler.count caller = (CalHandler.count)asyncResult.AsyncDelegate;

        int num = caller.EndInvoke(asyncResult);

        Console.WriteLine("Number is :"+ num.ToString());

        ManualResetEvent mre = (ManualResetEvent)asyncResult.AsyncState;
        if (Interlocked.Decrement(ref numberofTasks) == 0)
        {
            mre.Set();
        }
    }

}
public class CalHandler
{
    public delegate int count(int number);

    public int messageHandler ( int number )
    {
        return number;
    }

}
归途 2024-09-05 22:07:09
protected void WaitAllExt(WaitHandle[] waitHandles)
{
    //workaround for limitation of WaitHandle.WaitAll by <=64 wait handles
    const int waitAllArrayLimit = 64;
    var prevEndInd = -1;
    while (prevEndInd < waitHandles.Length - 1)
    {
        var stInd = prevEndInd + 1;
        var eInd = stInd + waitAllArrayLimit - 1;
        if (eInd > waitHandles.Length - 1)
        {
            eInd = waitHandles.Length - 1;
        }
        prevEndInd = eInd;

        //do wait
        var whSubarray = waitHandles.Skip(stInd).Take(eInd - stInd + 1).ToArray();
        WaitHandle.WaitAll(whSubarray);
    }

}
protected void WaitAllExt(WaitHandle[] waitHandles)
{
    //workaround for limitation of WaitHandle.WaitAll by <=64 wait handles
    const int waitAllArrayLimit = 64;
    var prevEndInd = -1;
    while (prevEndInd < waitHandles.Length - 1)
    {
        var stInd = prevEndInd + 1;
        var eInd = stInd + waitAllArrayLimit - 1;
        if (eInd > waitHandles.Length - 1)
        {
            eInd = waitHandles.Length - 1;
        }
        prevEndInd = eInd;

        //do wait
        var whSubarray = waitHandles.Skip(stInd).Take(eInd - stInd + 1).ToArray();
        WaitHandle.WaitAll(whSubarray);
    }

}
恋你朝朝暮暮 2024-09-05 22:07:09

我确实通过简单地对要等待的事件数量进行分页来解决这个问题,而没有损失太多性能,并且它在生产环境中运行得很好。遵循代码:

        var events = new List<ManualResetEvent>();

        // code omited

        var newEvent = new ManualResetEvent(false);
        events.Add(newEvent);
        ThreadPool.QueueUserWorkItem(c => {

            //thread code
            newEvent.Set();
        });

        // code omited

        var wait = true;
        while (wait)
        {
            WaitHandle.WaitAll(events.Take(60).ToArray());
            events.RemoveRange(0, events.Count > 59 ? 60 : events.Count);
            wait = events.Any();

        }

I did solved it by simply paginating the number of events to wait without much performace lost, and it's working perfectly on production environment. Follows the code:

        var events = new List<ManualResetEvent>();

        // code omited

        var newEvent = new ManualResetEvent(false);
        events.Add(newEvent);
        ThreadPool.QueueUserWorkItem(c => {

            //thread code
            newEvent.Set();
        });

        // code omited

        var wait = true;
        while (wait)
        {
            WaitHandle.WaitAll(events.Take(60).ToArray());
            events.RemoveRange(0, events.Count > 59 ? 60 : events.Count);
            wait = events.Any();

        }
ゝ杯具 2024-09-05 22:07:09

这是另一个解决方案。这里的“事件”是ManualResetEvent 的列表。列表的大小可以大于 64 (MAX_EVENTS_NO)。

int len = events.Count;
if (len <= MAX_EVENTS_NO)
    {
        WaitHandle.WaitAll(events.ToArray());
    } else {
        int start = 0;
        int num = MAX_EVENTS_NO;
        while (true)
            {
                if(start + num > len)
                {
                   num = len - start;
                }
                List<ManualResetEvent> sublist = events.GetRange(start, num);
                WaitHandle.WaitAll(sublist.ToArray());
                start += num;
                if (start >= len)
                   break;
           }
   }

Here is another solution. Here is the "events" is a list of ManualResetEvent. The size of the list can be greater than 64 (MAX_EVENTS_NO).

int len = events.Count;
if (len <= MAX_EVENTS_NO)
    {
        WaitHandle.WaitAll(events.ToArray());
    } else {
        int start = 0;
        int num = MAX_EVENTS_NO;
        while (true)
            {
                if(start + num > len)
                {
                   num = len - start;
                }
                List<ManualResetEvent> sublist = events.GetRange(start, num);
                WaitHandle.WaitAll(sublist.ToArray());
                start += num;
                if (start >= len)
                   break;
           }
   }

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文