如何取消 BlockingCollection 上的 GetConsumingEnumerable()

发布于 2025-01-01 18:24:14 字数 999 浏览 1 评论 0原文

在下面的代码中,当生产者不生产并且我想突破 foreach 并退出任务时,我使用 CancellationToken 唤醒 GetConsumingEnumerable()。但我没有看到 IsCancellationRequested 被记录,并且我的 Task.Wait(timeOut) 等待完整的超时时间。我做错了什么?

userToken.Task = Task.Factory.StartNew(state =>
{
    userToken.CancelToken = new CancellationTokenSource();

    foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token))
    {
        if (userToken.CancelToken.IsCancellationRequested)
        {
            Log.Write("BroadcastQueue IsCancellationRequested");
            break;
            ...
        }
    }

    return 0;
}, "TaskSubscribe", TaskCreationOptions.LongRunning);

之后...

UserToken.CancelToken.Cancel();          
try
{
    task.Wait(timeOut);
}
catch (AggregateException ar)
{
    Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg);
}
catch (OperationCanceledException)
{
    Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg);
}

In the following code I'm using the CancellationToken to wake up the GetConsumingEnumerable() when the producer is not producing and I want to break out of the foreach and exit the Task. But I dont see IsCancellationRequested being logged and my Task.Wait(timeOut) waits for the full timeOut period. What am I doing wrong?

userToken.Task = Task.Factory.StartNew(state =>
{
    userToken.CancelToken = new CancellationTokenSource();

    foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token))
    {
        if (userToken.CancelToken.IsCancellationRequested)
        {
            Log.Write("BroadcastQueue IsCancellationRequested");
            break;
            ...
        }
    }

    return 0;
}, "TaskSubscribe", TaskCreationOptions.LongRunning);

later...

UserToken.CancelToken.Cancel();          
try
{
    task.Wait(timeOut);
}
catch (AggregateException ar)
{
    Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg);
}
catch (OperationCanceledException)
{
    Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg);
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

孤寂小茶 2025-01-08 18:24:14

您可以使用CompleteAdding(),这表示不会再向集合中添加更多项目。如果使用 GetConsumingEnumerable,foreach 将正常结束,因为它知道等待更多项目是没有意义的。

基本上,一旦您完成向 BlockingCollection 添加项目,只需执行以下操作:

myBlockingCollection.CompleteAdding() 

任何使用 GetConsumingEnumerable 执行 foreach 循环的线程都将停止循环。

You could use CompleteAdding() which signifies that no more items will be added to the collection. If GetConsumingEnumerable is used, the foreach will end gracefully as it will know there's no point in waiting for more items.

Basically once you have finished adding items to the BlockingCollection just do:

myBlockingCollection.CompleteAdding() 

Any threads which are doing a foreach loop with GetConsumingEnumerable will stop looping.

偷得浮生 2025-01-08 18:24:14

我已经创建了快速原型,它似乎对我有用。

请注意令牌取消请求之前的 Thread.Sleep(1000)。
您可能正在为 Token 变量创建一个竞争条件,因为您创建并访问了>item.CancelToken 不同线程中的变量。

例如,旨在取消任务的代码可能会在错误(先前或空)取消标记上调用取消。

static void Main(string[] args)
{
    CancellationTokenSource token = null;
    BlockingCollection<string> coll = new BlockingCollection<string>();
    var t = Task.Factory.StartNew(state =>
    {
        token = new CancellationTokenSource();
        try
        {
            foreach (var broadcast in coll.GetConsumingEnumerable(token.Token))
            {
                if (token.IsCancellationRequested)
                {
                    return;
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Cancel");
            return;
        }
    }, "TaskSubscribe", TaskCreationOptions.LongRunning);
    Thread.Sleep(1000);
    token.Cancel();
    t.Wait();
}

I've created quick prototype, and it seems work for me.

Note Thread.Sleep(1000) right before token cancel request.
It is possible that you are creating a race condition for Token variable, since you create and access item.CancelToken variable in different threads.

E.g. the code that is intended to cancel task might invoke cancel on wrong (previous or null) cancellation token.

static void Main(string[] args)
{
    CancellationTokenSource token = null;
    BlockingCollection<string> coll = new BlockingCollection<string>();
    var t = Task.Factory.StartNew(state =>
    {
        token = new CancellationTokenSource();
        try
        {
            foreach (var broadcast in coll.GetConsumingEnumerable(token.Token))
            {
                if (token.IsCancellationRequested)
                {
                    return;
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Cancel");
            return;
        }
    }, "TaskSubscribe", TaskCreationOptions.LongRunning);
    Thread.Sleep(1000);
    token.Cancel();
    t.Wait();
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文