快速重复 TakeWhile 导致无限循环
如何使以下可观察的重复,直到stream.DataAvailable为假? 目前看来它永远不会停止。
Defer 部分内的 AsyncReadChunk 和 Observable.Return 进行 OnNext 调用,然后进行 OnCompleted 调用。 当 Repeat 收到 OnNext 调用时,它将其传递给 TakeWhile。当 TakeWhile 不满足时,它会完成可观察量,但我认为 OnNext 之后的 OnCompleted 速度太快,以至于导致 Repeat 重新订阅可观察量并导致无限循环。
我该如何纠正这种行为?
public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
{
return Observable.Defer(() =>
{
try
{
return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]);
}
catch (Exception)
{
return Observable.Return(new byte[0]);
}
})
.Repeat()
.TakeWhile((dataChunk, index) => dataChunk.Length > 0);
}
How can I make the following observable repeat until stream.DataAvailable is false?
Currently it looks like it never stops.
AsyncReadChunk and Observable.Return inside the Defer section make OnNext call then OnCompleted call.
When Repeat receives the OnNext call it passes it to TakeWhile. When TakeWhile's is not satisfied it completes the observable but I think the OnCompleted that comes right after the OnNext is so fast that it makes Repeat to re-subscribes to the observable and causes the infinite loop.
How can I correct this behaviour?
public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
{
return Observable.Defer(() =>
{
try
{
return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]);
}
catch (Exception)
{
return Observable.Return(new byte[0]);
}
})
.Repeat()
.TakeWhile((dataChunk, index) => dataChunk.Length > 0);
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
自我回答:(下面是问题作者 Samet 发布的答案。但是,他将答案作为问题的一部分发布。我将其移至单独的答案中,标记为社区wiki,因为作者自己没有动它。)
我通过重构发现这是调度程序的问题。 Return 使用 Immediate 调度程序,而 Repeat 使用 CurrentThread。固定代码如下。
SELF ANSWER: (Below is an answer posted by Samet, the author of the question. However, he posted the answer as part of the question. I'm moving it into a separate answer, marking as community wiki, since the author hasn't moved it himself.)
I discovered by refactoring that it is a problem with schedulers. The Return uses Immediate scheduler while Repeat uses CurrentThread. The fixed code is below.