基于等待任务的队列

发布于 2024-12-11 07:19:12 字数 1601 浏览 2 评论 0原文

我想知道是否存在 ConcurrentQueue 的实现/包装器,类似到 BlockingCollection 从集合中获取不是阻塞,而是异步的,并且会导致异步等待,直到将项目放入队列中为止。

我已经提出了自己的实现,但它似乎没有按预期执行。我想知道我是否正在重新发明已经存在的东西。

这是我的实现:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}

I'm wondering if there exists an implementation/wrapper for ConcurrentQueue, similar to BlockingCollection where taking from the collection does not block, but is instead asynchronous and will cause an async await until an item is placed in the queue.

I've come up with my own implementation, but it does not seem to be performing as expected. I'm wondering if I'm reinventing something that already exists.

Here's my implementation:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(10

太阳男子 2024-12-18 07:19:12

我不知道无锁解决方案,但您可以看看新的 数据流库异步 CTP 的一部分。一个简单的 BufferBlock就足够了,例如:

BufferBlock<int> buffer = new BufferBlock<int>();

通过数据流块类型的扩展方法最容易完成生产和消费。

生产非常简单:

buffer.Post(13);

消费是异步就绪的:

int item = await buffer.ReceiveAsync();

如果可能的话,我建议您使用数据流;使这样的缓冲区既高效又正确比乍看起来要困难得多。

I don't know of a lock-free solution, but you can take a look at the new Dataflow library, part of the Async CTP. A simple BufferBlock<T> should suffice, e.g.:

BufferBlock<int> buffer = new BufferBlock<int>();

Production and consumption are most easily done via extension methods on the dataflow block types.

Production is as simple as:

buffer.Post(13);

and consumption is async-ready:

int item = await buffer.ReceiveAsync();

I do recommend you use Dataflow if possible; making such a buffer both efficient and correct is more difficult than it first appears.

不寐倦长更 2024-12-18 07:19:12

使用 C# 8.0 IAsyncEnumerable数据流库

// Instatiate an async queue
var queue = new AsyncQueue<int>();

// Then, loop through the elements of queue.
// This loop won't stop until it is canceled or broken out of
// (for that, use queue.WithCancellation(..) or break;)
await foreach(int i in queue) {
    // Writes a line as soon as some other Task calls queue.Enqueue(..)
    Console.WriteLine(i);
}

具有AsyncQueue的实现,如下所示:

public class AsyncQueue<T> : IAsyncEnumerable<T>
{
    private readonly SemaphoreSlim _enumerationSemaphore = new SemaphoreSlim(1);
    private readonly BufferBlock<T> _bufferBlock = new BufferBlock<T>();

    public void Enqueue(T item) => _bufferBlock.Post(item);

    public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default)
    {
        // We lock this so we only ever enumerate once at a time.
        // That way we ensure all items are returned in a continuous
        // fashion with no 'holes' in the data when two foreach compete.
        await _enumerationSemaphore.WaitAsync(token);

        try 
        {
            // Return new elements until cancellationToken is triggered.
            while (true) 
            {
                // Make sure to throw on cancellation so the Task will transfer into a canceled state
                token.ThrowIfCancellationRequested();
                yield return await _bufferBlock.ReceiveAsync(token);
            }
        } 
        finally 
        {
            _enumerationSemaphore.Release();
        }

    }
}

Simple approach with C# 8.0 IAsyncEnumerable and Dataflow library

// Instatiate an async queue
var queue = new AsyncQueue<int>();

// Then, loop through the elements of queue.
// This loop won't stop until it is canceled or broken out of
// (for that, use queue.WithCancellation(..) or break;)
await foreach(int i in queue) {
    // Writes a line as soon as some other Task calls queue.Enqueue(..)
    Console.WriteLine(i);
}

With an implementation of AsyncQueue as follows:

public class AsyncQueue<T> : IAsyncEnumerable<T>
{
    private readonly SemaphoreSlim _enumerationSemaphore = new SemaphoreSlim(1);
    private readonly BufferBlock<T> _bufferBlock = new BufferBlock<T>();

    public void Enqueue(T item) => _bufferBlock.Post(item);

    public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default)
    {
        // We lock this so we only ever enumerate once at a time.
        // That way we ensure all items are returned in a continuous
        // fashion with no 'holes' in the data when two foreach compete.
        await _enumerationSemaphore.WaitAsync(token);

        try 
        {
            // Return new elements until cancellationToken is triggered.
            while (true) 
            {
                // Make sure to throw on cancellation so the Task will transfer into a canceled state
                token.ThrowIfCancellationRequested();
                yield return await _bufferBlock.ReceiveAsync(token);
            }
        } 
        finally 
        {
            _enumerationSemaphore.Release();
        }

    }
}
硬不硬你别怂 2024-12-18 07:19:12

现在有一个官方方法可以做到这一点:System.Threading.Channels。它内置于 .NET Core 3.0 及更高版本(包括 .NET 5.0 和 6.0)的核心运行时中,但它也可以作为 .NET Standard 2.0 和 2.1 上的 NuGet 包提供。您可以在此处阅读文档。

var channel = System.Threading.Channels.Channel.CreateUnbounded<int>();

排队工作:

// This will succeed and finish synchronously if the channel is unbounded.
channel.Writer.TryWrite(42);

完成通道:

channel.Writer.TryComplete();

从通道读取:

var i = await channel.Reader.ReadAsync();

或者,如果您有 .NET Core 3.0 或更高版本:

await foreach (int i in channel.Reader.ReadAllAsync())
{
    // whatever processing on i...
}

There is an official way to do this now: System.Threading.Channels. It's built into the core runtime on .NET Core 3.0 and higher (including .NET 5.0 and 6.0), but it's also available as a NuGet package on .NET Standard 2.0 and 2.1. You can read through the docs here.

var channel = System.Threading.Channels.Channel.CreateUnbounded<int>();

To enqueue work:

// This will succeed and finish synchronously if the channel is unbounded.
channel.Writer.TryWrite(42);

To complete the channel:

channel.Writer.TryComplete();

To read from the channel:

var i = await channel.Reader.ReadAsync();

Or, if you have .NET Core 3.0 or higher:

await foreach (int i in channel.Reader.ReadAllAsync())
{
    // whatever processing on i...
}
热情消退 2024-12-18 07:19:12

实现此目的的一种简单方法是使用 SemaphoreSlim

public class AwaitableQueue<T>
{
    private SemaphoreSlim semaphore = new SemaphoreSlim(0);
    private readonly object queueLock = new object();
    private Queue<T> queue = new Queue<T>();

    public void Enqueue(T item)
    {
        lock (queueLock)
        {
            queue.Enqueue(item);
            semaphore.Release();
        }
    }

    public T WaitAndDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        semaphore.Wait(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }

    public async Task<T> WhenDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        await semaphore.WaitAsync(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }
}

其优点在于 SemaphoreSlim 可以处理实现 Wait()< 的所有复杂性。 /code> 和 WaitAsync() 功能。缺点是队列长度由信号量和队列本身跟踪,并且它们都神奇地保持同步。

One simple and easy way to implement this is with a SemaphoreSlim:

public class AwaitableQueue<T>
{
    private SemaphoreSlim semaphore = new SemaphoreSlim(0);
    private readonly object queueLock = new object();
    private Queue<T> queue = new Queue<T>();

    public void Enqueue(T item)
    {
        lock (queueLock)
        {
            queue.Enqueue(item);
            semaphore.Release();
        }
    }

    public T WaitAndDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        semaphore.Wait(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }

    public async Task<T> WhenDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        await semaphore.WaitAsync(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }
}

The beauty of this is that the SemaphoreSlim handles all of the complexity of implementing the Wait() and WaitAsync() functionality. The downside is that queue length is tracked by both the semaphore and the queue itself, and they both magically stay in sync.

简单 2024-12-18 07:19:12

我的尝试(它在创建“承诺”时引发了一个事件,外部生产者可以使用它来了解何时生产更多项目):

public class AsyncQueue<T>
{
    private ConcurrentQueue<T> _bufferQueue;
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue;
    private object _syncRoot = new object();

    public AsyncQueue()
    {
        _bufferQueue = new ConcurrentQueue<T>();
        _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>();
    }

    /// <summary>
    /// Enqueues the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    public void Enqueue(T item)
    {
        TaskCompletionSource<T> promise;
        do
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;                                       
            }
        }
        while (promise != null);

        lock (_syncRoot)
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;
            }

            _bufferQueue.Enqueue(item);
        }            
    }

    /// <summary>
    /// Dequeues the asynchronous.
    /// </summary>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <returns></returns>
    public Task<T> DequeueAsync(CancellationToken cancellationToken)
    {
        T item;

        if (!_bufferQueue.TryDequeue(out item))
        {
            lock (_syncRoot)
            {
                if (!_bufferQueue.TryDequeue(out item))
                {
                    var promise = new TaskCompletionSource<T>();
                    cancellationToken.Register(() => promise.TrySetCanceled());

                    _promisesQueue.Enqueue(promise);
                    this.PromiseAdded.RaiseEvent(this, EventArgs.Empty);

                    return promise.Task;
                }
            }
        }

        return Task.FromResult(item);
    }

    /// <summary>
    /// Gets a value indicating whether this instance has promises.
    /// </summary>
    /// <value>
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>.
    /// </value>
    public bool HasPromises
    {
        get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; }
    }

    /// <summary>
    /// Occurs when a new promise
    /// is generated by the queue
    /// </summary>
    public event EventHandler PromiseAdded;
}

My atempt (it have an event raised when a "promise" is created, and it can be used by an external producer to know when to produce more items):

public class AsyncQueue<T>
{
    private ConcurrentQueue<T> _bufferQueue;
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue;
    private object _syncRoot = new object();

    public AsyncQueue()
    {
        _bufferQueue = new ConcurrentQueue<T>();
        _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>();
    }

    /// <summary>
    /// Enqueues the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    public void Enqueue(T item)
    {
        TaskCompletionSource<T> promise;
        do
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;                                       
            }
        }
        while (promise != null);

        lock (_syncRoot)
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;
            }

            _bufferQueue.Enqueue(item);
        }            
    }

    /// <summary>
    /// Dequeues the asynchronous.
    /// </summary>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <returns></returns>
    public Task<T> DequeueAsync(CancellationToken cancellationToken)
    {
        T item;

        if (!_bufferQueue.TryDequeue(out item))
        {
            lock (_syncRoot)
            {
                if (!_bufferQueue.TryDequeue(out item))
                {
                    var promise = new TaskCompletionSource<T>();
                    cancellationToken.Register(() => promise.TrySetCanceled());

                    _promisesQueue.Enqueue(promise);
                    this.PromiseAdded.RaiseEvent(this, EventArgs.Empty);

                    return promise.Task;
                }
            }
        }

        return Task.FromResult(item);
    }

    /// <summary>
    /// Gets a value indicating whether this instance has promises.
    /// </summary>
    /// <value>
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>.
    /// </value>
    public bool HasPromises
    {
        get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; }
    }

    /// <summary>
    /// Occurs when a new promise
    /// is generated by the queue
    /// </summary>
    public event EventHandler PromiseAdded;
}
梦在深巷 2024-12-18 07:19:12

对于您的用例来说(考虑到学习曲线),这可能有点过分了,但是反应式扩展 提供了异步组合所需的所有粘合剂。

您本质上是订阅更改,当它们可用时,它们就会推送给您,并且您可以让系统将更改推送到单独的线程上。

It may be overkill for your use case (given the learning curve), but Reactive Extentions provides all the glue you could ever want for asynchronous composition.

You essentially subscribe to changes and they are pushed to you as they become available, and you can have the system push the changes on a separate thread.

时间你老了 2024-12-18 07:19:12

查看 https://github.com/somdoron/AsyncCollection,您既可以异步出队,也可以使用 C# 8.0 IAsyncEnumerable。

该 API 与 BlockingCollection 非常相似。

AsyncCollection<int> collection = new AsyncCollection<int>();

var t = Task.Run(async () =>
{
    while (!collection.IsCompleted)
    {
        var item = await collection.TakeAsync();

        // process
    }
});

for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}

collection.CompleteAdding();

t.Wait();

使用 IAsyncEnumable:

AsyncCollection<int> collection = new AsyncCollection<int>();

var t = Task.Run(async () =>
{
    await foreach (var item in collection)
    {
        // process
    }
});

for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}

collection.CompleteAdding();

t.Wait();

Check out https://github.com/somdoron/AsyncCollection, you can both dequeue asynchronously and use C# 8.0 IAsyncEnumerable.

The API is very similar to BlockingCollection.

AsyncCollection<int> collection = new AsyncCollection<int>();

var t = Task.Run(async () =>
{
    while (!collection.IsCompleted)
    {
        var item = await collection.TakeAsync();

        // process
    }
});

for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}

collection.CompleteAdding();

t.Wait();

With IAsyncEnumeable:

AsyncCollection<int> collection = new AsyncCollection<int>();

var t = Task.Run(async () =>
{
    await foreach (var item in collection)
    {
        // process
    }
});

for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}

collection.CompleteAdding();

t.Wait();
赢得她心 2024-12-18 07:19:12

这是我当前正在使用的实现。

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();
    object queueSyncLock = new object();
    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> DequeueAsync(CancellationToken ct)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        ct.Register(() =>
        {
            lock (queueSyncLock)
            {
                tcs.TrySetCanceled();
            }
        });
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs = null;
        T firstItem = default(T);
        lock (queueSyncLock)
        {
            while (true)
            {
                if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem))
                {
                    waitingQueue.TryDequeue(out tcs);
                    if (tcs.Task.IsCanceled)
                    {
                        continue;
                    }
                    queue.TryDequeue(out firstItem);
                }
                else
                {
                    break;
                }
                tcs.SetResult(firstItem);
            }
        }
    }
}

它工作得很好,但是在 queueSyncLock 上有很多争用,因为我大量使用 CancellationToken 来取消一些等待任务。当然,这会大大减少我在 BlockingCollection 中看到的阻塞,但是......

我想知道是否有一种更平滑、无锁的方法来实现相同的目的

Here's the implementation I'm currently using.

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();
    object queueSyncLock = new object();
    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> DequeueAsync(CancellationToken ct)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        ct.Register(() =>
        {
            lock (queueSyncLock)
            {
                tcs.TrySetCanceled();
            }
        });
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs = null;
        T firstItem = default(T);
        lock (queueSyncLock)
        {
            while (true)
            {
                if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem))
                {
                    waitingQueue.TryDequeue(out tcs);
                    if (tcs.Task.IsCanceled)
                    {
                        continue;
                    }
                    queue.TryDequeue(out firstItem);
                }
                else
                {
                    break;
                }
                tcs.SetResult(firstItem);
            }
        }
    }
}

It works good enough, but there's quite a lot of contention on queueSyncLock, as I am making quite a lot of use of the CancellationToken to cancel some of the waiting tasks. Of course, this leads to considerably less blocking I would see with a BlockingCollection but...

I'm wondering if there is a smoother, lock free means of achieving the same end

日暮斜阳 2024-12-18 07:19:12

8 年后,我遇到了这个问题,并准备实现在 nuget 包/命名空间中找到的 MS AsyncQueue 类:Microsoft.VisualStudio.Threading

感谢 @ Theodor Zoulias 提到的这个 api 可能已经过时了,DataFlow 库将是一个不错的选择。

所以我编辑了我的 AsyncQueue<>使用 BufferBlock<> 的实现。几乎相同,但效果更好。

我在 AspNet Core 后台线程中使用它,它完全异步运行。

protected async Task MyRun()
{
    BufferBlock<MyObj> queue = new BufferBlock<MyObj>();
    Task enqueueTask = StartDataIteration(queue);

    while (await queue.OutputAvailableAsync())
    {
        var myObj = queue.Receive();
        // do something with myObj
    }

}

public async Task StartDataIteration(BufferBlock<MyObj> queue)
{
    var cursor = await RunQuery();
    while(await cursor.Next()) { 
        queue.Post(cursor.Current);
    }
    queue.Complete(); // <<< signals the consumer when queue.Count reaches 0
}

我发现使用queue.OutputAvailableAsync()解决了我使用AsyncQueue<>时遇到的问题。 -- 尝试确定队列何时完成,而不必检查出队任务。

Well 8 years later I hit this very question and was about to implement the MS AsyncQueue<T> class found in nuget package/namespace: Microsoft.VisualStudio.Threading

Thanks to @Theodor Zoulias for mentioning this api may be outdated and the DataFlow lib would be a good alternative.

So I edited my AsyncQueue<> implementation to use BufferBlock<>. Almost the same but works better.

I use this in an AspNet Core background thread and it runs fully async.

protected async Task MyRun()
{
    BufferBlock<MyObj> queue = new BufferBlock<MyObj>();
    Task enqueueTask = StartDataIteration(queue);

    while (await queue.OutputAvailableAsync())
    {
        var myObj = queue.Receive();
        // do something with myObj
    }

}

public async Task StartDataIteration(BufferBlock<MyObj> queue)
{
    var cursor = await RunQuery();
    while(await cursor.Next()) { 
        queue.Post(cursor.Current);
    }
    queue.Complete(); // <<< signals the consumer when queue.Count reaches 0
}

I found that using the queue.OutputAvailableAsync() fixed the issue that I had with AsyncQueue<> -- trying to determine when the queue was complete and not having to inspect the dequeue task.

假情假意假温柔 2024-12-18 07:19:12

您可以只使用 BlockingCollection (使用默认的 ConcurrentQueue )并将对 Take 的调用包装在 Task 中,这样你可以等待它:

var bc = new BlockingCollection<T>();

T element = await Task.Run( () => bc.Take() );

You could just use a BlockingCollection ( using the default ConcurrentQueue ) and wrap the call to Take in a Task so you can await it:

var bc = new BlockingCollection<T>();

T element = await Task.Run( () => bc.Take() );
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文