BlockingCollection - 高同步问题
将消息从多个线程获取到队列并让一个单独的线程一次处理该队列的一个项目的最佳方法是什么?
当我尝试断开活动与多个线程的连接时,我经常使用这种模式。
我为此使用了 BlockingCollection,如下面的代码摘录所示:
// start this task in a static constructor
Task.Factory.StartNew(() => ProcessMultiUseQueueEntries(), TaskCreationOptions.LongRunning);
private static BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>> _q = new BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>>();
/// <summary>
/// queued - Simple mechanism that will log the fact that this user is sending an xMsg (FROM a user)
/// </summary>
public static void LogXMsgFromUser(XClientMsgExt xMsg)
{
_q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(xMsg, null, "", BOStatus.Ignore));
}
/// <summary>
/// queued - Simple mechanism that will log the data being executed by this user
/// </summary>
public static void LogBOToUser(BOInfo boInfo)
{
_q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(null, boInfo, "", BOStatus.Ignore));
}
/// <summary>
/// queued - Simple mechanism that will log the status of the BO being executed by this user (causes the red square to flash)
/// </summary>
public static void LogBOStatus(string UserID, BOStatus status)
{
_q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(null, null, UserID, status));
}
/// <summary>
/// An endless thread that will keep checking the Queue for new entrants.
/// NOTE - no error handling since this can't fail... :) lol etc
/// </summary>
private static void ProcessMultiUseQueueEntries()
{
while (true) // eternal loop
{
Tuple<XClientMsgExt, BOInfo, string, BOStatus> tuple = _q.Take();
// Do stuff
}
}
这工作得很好 - 所以我想 - 直到 VS2010 中的性能向导开始突出显示 _q.Take() 行作为最高争用我的代码中的一行!
注意,我还使用了带有 ManualResetEvent 组合的标准 ConcurrentQueue,每次将项目插入队列时,我都会发出重置事件信号,允许工作线程检查和处理队列,但这也具有在 . WaitOne() 方法...
是否有其他方法来解决这种常见模式,即让许多线程将对象添加到并发队列中 - 并让单个线程在自己的时间内一次处理一个项目...
谢谢!!
What is the best way of getting messages from many threads onto a queue and have a separate thread processing items of this queue one at a time?
I am frequently using this pattern when trying to disconnect activities from many threads.
I am using a BlockingCollection for this as shown in a code extract below:
// start this task in a static constructor
Task.Factory.StartNew(() => ProcessMultiUseQueueEntries(), TaskCreationOptions.LongRunning);
private static BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>> _q = new BlockingCollection<Tuple<XClientMsgExt, BOInfo, string, BOStatus>>();
/// <summary>
/// queued - Simple mechanism that will log the fact that this user is sending an xMsg (FROM a user)
/// </summary>
public static void LogXMsgFromUser(XClientMsgExt xMsg)
{
_q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(xMsg, null, "", BOStatus.Ignore));
}
/// <summary>
/// queued - Simple mechanism that will log the data being executed by this user
/// </summary>
public static void LogBOToUser(BOInfo boInfo)
{
_q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(null, boInfo, "", BOStatus.Ignore));
}
/// <summary>
/// queued - Simple mechanism that will log the status of the BO being executed by this user (causes the red square to flash)
/// </summary>
public static void LogBOStatus(string UserID, BOStatus status)
{
_q.Add(new Tuple<XClientMsgExt, BOInfo, string, BOStatus>(null, null, UserID, status));
}
/// <summary>
/// An endless thread that will keep checking the Queue for new entrants.
/// NOTE - no error handling since this can't fail... :) lol etc
/// </summary>
private static void ProcessMultiUseQueueEntries()
{
while (true) // eternal loop
{
Tuple<XClientMsgExt, BOInfo, string, BOStatus> tuple = _q.Take();
// Do stuff
}
}
This works fine - so I thought - until the Performance Wizard in VS2010 started to highlight the _q.Take() row as the highest contention line in my code!
Note I have also used a standard ConcurrentQueue with a ManualResetEvent combination and each time I insert an item onto the queue I signal the resetevent allowing the worker thread to examine and process the Queue but this also had the same net effect of being highlighted on the .WaitOne() method...
Are there other ways of solving this common pattern of having many threads adding objects into a concurrent queue - and have a single thread ploughing its way through the items one at a time and in its own time...
Thanks!!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
竞争最高的线路?是的,因为它是一个阻塞集合!该调用将阻塞(例如,它可能正在等待
WaitHandle
),直到另一个元素添加到集合中。您确定这是一个问题吗?这听起来正是我所期望的。
如果不清楚我的意思,请考虑以下代码:
您期望上面的
Take
调用运行多长时间?我希望它会永远等待,因为blocker
中没有添加任何内容。因此,如果我分析上述代码的“性能”,我会在长时间运行的方法列表的顶部看到Take
。但这并不表示存在问题;再次强调:您想要阻止该调用。在完全独立的注释中,我是否可以建议将
Tuple
替换为属性具有描述性名称的类型? (当然,这个建议与你的问题无关;这只是一条一般性建议。)Highest contention line? Yes, because it is a blocking collection! That call will block (e.g., it might be waiting on a
WaitHandle
) until another element is added to the collection.Are you sure this is a problem? It sounds like exactly what I'd expect.
In case it isn't clear what I mean, consider this code:
How long would you expect that
Take
call above to run? I'd expect it to wait forever because nothing's being added toblocker
. Thus if I profiled the "performance" of the above code, I'd seeTake
right up at the top of the list of long-running methods. But this would be not be indicative of a problem; again: you want that call to block.On a completely separate note, might I recommend replacing
Tuple<XClientMsgExt, BOInfo, string, BOStatus>
with a type whose properties have descriptive names? (This suggestion has nothing to do with your question, of course; it's just a piece of general advice.)_q.Take()
是争用最高的行,其本身毫无意义。如果许多线程正在等待项目,就会出现争用。最大的问题是这种争用是否会损害您的性能。您需要找到以下几个问题的答案:如果您能够阻止队列增长,并且 CPU 资源不会花在获取物品上,那么就没有问题。除非您从集合中获得的线程数超出了必要的阅读量。
That
_q.Take()
is the highest contention line is meaningless in itself. There's going to be contention if many threads are waiting for items. The big question is whether that contention is costing you in terms of performance. A few questions you need to find the answers to:If you're able to keep the queue from growing and CPU resources aren't being spent on taking items, then there's no problem. Except perhaps you have more threads than necessary reading from the collection.
您是否考虑过使用实际的 MSMQ ?可能看起来有点矫枉过正,但它确实提供了您所需要的。并不是说您的应用程序不能既是编写器又具有专用线程来读取和处理消息。
Have you considered using an actual MSMQ? May seem like overkill, but it does provide what you need. No saying your application can't be both writer and have a dedicated thread to read and process the messages.