基于等待任务的队列
我想知道是否存在 ConcurrentQueue 的实现/包装器,类似到 BlockingCollection 从集合中获取不是阻塞,而是异步的,并且会导致异步等待,直到将项目放入队列中为止。
我已经提出了自己的实现,但它似乎没有按预期执行。我想知道我是否正在重新发明已经存在的东西。
这是我的实现:
public class MessageQueue<T>
{
ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
ConcurrentQueue<TaskCompletionSource<T>> waitingQueue =
new ConcurrentQueue<TaskCompletionSource<T>>();
object queueSyncLock = new object();
public void Enqueue(T item)
{
queue.Enqueue(item);
ProcessQueues();
}
public async Task<T> Dequeue()
{
TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
waitingQueue.Enqueue(tcs);
ProcessQueues();
return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
}
private void ProcessQueues()
{
TaskCompletionSource<T> tcs=null;
T firstItem=default(T);
while (true)
{
bool ok;
lock (queueSyncLock)
{
ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
if (ok)
{
waitingQueue.TryDequeue(out tcs);
queue.TryDequeue(out firstItem);
}
}
if (!ok) break;
tcs.SetResult(firstItem);
}
}
}
I'm wondering if there exists an implementation/wrapper for ConcurrentQueue, similar to BlockingCollection where taking from the collection does not block, but is instead asynchronous and will cause an async await until an item is placed in the queue.
I've come up with my own implementation, but it does not seem to be performing as expected. I'm wondering if I'm reinventing something that already exists.
Here's my implementation:
public class MessageQueue<T>
{
ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
ConcurrentQueue<TaskCompletionSource<T>> waitingQueue =
new ConcurrentQueue<TaskCompletionSource<T>>();
object queueSyncLock = new object();
public void Enqueue(T item)
{
queue.Enqueue(item);
ProcessQueues();
}
public async Task<T> Dequeue()
{
TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
waitingQueue.Enqueue(tcs);
ProcessQueues();
return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
}
private void ProcessQueues()
{
TaskCompletionSource<T> tcs=null;
T firstItem=default(T);
while (true)
{
bool ok;
lock (queueSyncLock)
{
ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
if (ok)
{
waitingQueue.TryDequeue(out tcs);
queue.TryDequeue(out firstItem);
}
}
if (!ok) break;
tcs.SetResult(firstItem);
}
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(10)
我不知道无锁解决方案,但您可以看看新的 数据流库,异步 CTP 的一部分。一个简单的 BufferBlock就足够了,例如:
通过数据流块类型的扩展方法最容易完成生产和消费。
生产非常简单:
消费是异步就绪的:
如果可能的话,我建议您使用数据流;使这样的缓冲区既高效又正确比乍看起来要困难得多。
I don't know of a lock-free solution, but you can take a look at the new Dataflow library, part of the Async CTP. A simple
BufferBlock<T>
should suffice, e.g.:Production and consumption are most easily done via extension methods on the dataflow block types.
Production is as simple as:
and consumption is async-ready:
I do recommend you use Dataflow if possible; making such a buffer both efficient and correct is more difficult than it first appears.
使用 C# 8.0
IAsyncEnumerable
和 数据流库具有
AsyncQueue
的实现,如下所示:Simple approach with C# 8.0
IAsyncEnumerable
and Dataflow libraryWith an implementation of
AsyncQueue
as follows:现在有一个官方方法可以做到这一点:
System.Threading.Channels
。它内置于 .NET Core 3.0 及更高版本(包括 .NET 5.0 和 6.0)的核心运行时中,但它也可以作为 .NET Standard 2.0 和 2.1 上的 NuGet 包提供。您可以在此处阅读文档。排队工作:
完成通道:
从通道读取:
或者,如果您有 .NET Core 3.0 或更高版本:
There is an official way to do this now:
System.Threading.Channels
. It's built into the core runtime on .NET Core 3.0 and higher (including .NET 5.0 and 6.0), but it's also available as a NuGet package on .NET Standard 2.0 and 2.1. You can read through the docs here.To enqueue work:
To complete the channel:
To read from the channel:
Or, if you have .NET Core 3.0 or higher:
实现此目的的一种简单方法是使用
SemaphoreSlim
:其优点在于
SemaphoreSlim
可以处理实现Wait()< 的所有复杂性。 /code> 和
WaitAsync()
功能。缺点是队列长度由信号量和队列本身跟踪,并且它们都神奇地保持同步。One simple and easy way to implement this is with a
SemaphoreSlim
:The beauty of this is that the
SemaphoreSlim
handles all of the complexity of implementing theWait()
andWaitAsync()
functionality. The downside is that queue length is tracked by both the semaphore and the queue itself, and they both magically stay in sync.我的尝试(它在创建“承诺”时引发了一个事件,外部生产者可以使用它来了解何时生产更多项目):
My atempt (it have an event raised when a "promise" is created, and it can be used by an external producer to know when to produce more items):
对于您的用例来说(考虑到学习曲线),这可能有点过分了,但是反应式扩展 提供了异步组合所需的所有粘合剂。
您本质上是订阅更改,当它们可用时,它们就会推送给您,并且您可以让系统将更改推送到单独的线程上。
It may be overkill for your use case (given the learning curve), but Reactive Extentions provides all the glue you could ever want for asynchronous composition.
You essentially subscribe to changes and they are pushed to you as they become available, and you can have the system push the changes on a separate thread.
查看 https://github.com/somdoron/AsyncCollection,您既可以异步出队,也可以使用 C# 8.0 IAsyncEnumerable。
该 API 与 BlockingCollection 非常相似。
使用 IAsyncEnumable:
Check out https://github.com/somdoron/AsyncCollection, you can both dequeue asynchronously and use C# 8.0 IAsyncEnumerable.
The API is very similar to BlockingCollection.
With IAsyncEnumeable:
这是我当前正在使用的实现。
它工作得很好,但是在
queueSyncLock
上有很多争用,因为我大量使用CancellationToken
来取消一些等待任务。当然,这会大大减少我在 BlockingCollection 中看到的阻塞,但是......我想知道是否有一种更平滑、无锁的方法来实现相同的目的
Here's the implementation I'm currently using.
It works good enough, but there's quite a lot of contention on
queueSyncLock
, as I am making quite a lot of use of theCancellationToken
to cancel some of the waiting tasks. Of course, this leads to considerably less blocking I would see with aBlockingCollection
but...I'm wondering if there is a smoother, lock free means of achieving the same end
8 年后,我遇到了这个问题,并准备实现在 nuget 包/命名空间中找到的 MS
AsyncQueue
类:Microsoft.VisualStudio.Threading感谢 @ Theodor Zoulias 提到的这个 api 可能已经过时了,DataFlow 库将是一个不错的选择。
所以我编辑了我的 AsyncQueue<>使用 BufferBlock<> 的实现。几乎相同,但效果更好。
我在 AspNet Core 后台线程中使用它,它完全异步运行。
我发现使用queue.OutputAvailableAsync()解决了我使用AsyncQueue<>时遇到的问题。 -- 尝试确定队列何时完成,而不必检查出队任务。
Well 8 years later I hit this very question and was about to implement the MS
AsyncQueue<T>
class found in nuget package/namespace: Microsoft.VisualStudio.ThreadingThanks to @Theodor Zoulias for mentioning this api may be outdated and the DataFlow lib would be a good alternative.
So I edited my AsyncQueue<> implementation to use BufferBlock<>. Almost the same but works better.
I use this in an AspNet Core background thread and it runs fully async.
I found that using the queue.OutputAvailableAsync() fixed the issue that I had with AsyncQueue<> -- trying to determine when the queue was complete and not having to inspect the dequeue task.
您可以只使用
BlockingCollection
(使用默认的ConcurrentQueue
)并将对Take
的调用包装在Task
中,这样你可以等待
它:You could just use a
BlockingCollection
( using the defaultConcurrentQueue
) and wrap the call toTake
in aTask
so you canawait
it: