如何取消 BlockingCollection 上的 GetConsumingEnumerable()
在下面的代码中,当生产者不生产并且我想突破 foreach 并退出任务时,我使用 CancellationToken 唤醒 GetConsumingEnumerable()。但我没有看到 IsCancellationRequested 被记录,并且我的 Task.Wait(timeOut) 等待完整的超时时间。我做错了什么?
userToken.Task = Task.Factory.StartNew(state =>
{
userToken.CancelToken = new CancellationTokenSource();
foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token))
{
if (userToken.CancelToken.IsCancellationRequested)
{
Log.Write("BroadcastQueue IsCancellationRequested");
break;
...
}
}
return 0;
}, "TaskSubscribe", TaskCreationOptions.LongRunning);
之后...
UserToken.CancelToken.Cancel();
try
{
task.Wait(timeOut);
}
catch (AggregateException ar)
{
Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg);
}
catch (OperationCanceledException)
{
Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg);
}
In the following code I'm using the CancellationToken to wake up the GetConsumingEnumerable() when the producer is not producing and I want to break out of the foreach and exit the Task. But I dont see IsCancellationRequested being logged and my Task.Wait(timeOut) waits for the full timeOut period. What am I doing wrong?
userToken.Task = Task.Factory.StartNew(state =>
{
userToken.CancelToken = new CancellationTokenSource();
foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token))
{
if (userToken.CancelToken.IsCancellationRequested)
{
Log.Write("BroadcastQueue IsCancellationRequested");
break;
...
}
}
return 0;
}, "TaskSubscribe", TaskCreationOptions.LongRunning);
later...
UserToken.CancelToken.Cancel();
try
{
task.Wait(timeOut);
}
catch (AggregateException ar)
{
Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg);
}
catch (OperationCanceledException)
{
Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg);
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您可以使用
CompleteAdding()
,这表示不会再向集合中添加更多项目。如果使用 GetConsumingEnumerable,foreach 将正常结束,因为它知道等待更多项目是没有意义的。基本上,一旦您完成向 BlockingCollection 添加项目,只需执行以下操作:
任何使用 GetConsumingEnumerable 执行 foreach 循环的线程都将停止循环。
You could use
CompleteAdding()
which signifies that no more items will be added to the collection. IfGetConsumingEnumerable
is used, the foreach will end gracefully as it will know there's no point in waiting for more items.Basically once you have finished adding items to the
BlockingCollection
just do:Any threads which are doing a foreach loop with GetConsumingEnumerable will stop looping.
我已经创建了快速原型,它似乎对我有用。
请注意令牌取消请求之前的 Thread.Sleep(1000)。
您可能正在为 Token 变量创建一个竞争条件,因为您创建并访问了>item.CancelToken 不同线程中的变量。
例如,旨在取消任务的代码可能会在错误(先前或空)取消标记上调用取消。
I've created quick prototype, and it seems work for me.
Note Thread.Sleep(1000) right before token cancel request.
It is possible that you are creating a race condition for Token variable, since you create and access
item.CancelToken
variable in different threads.E.g. the code that is intended to cancel task might invoke cancel on wrong (previous or null) cancellation token.