在rx.net中,如何使主题与TaskCompletionsource行为相似?
在rx.net中,如何使主题
类似于 taskcompletionsource.task
行为?
即使完成,它也需要缓存并回复第一个事件。 asyncSubject
也不是 replaySubject(BufferSize:1)
都会做到这一点。
例如(让我们称其为 PromisesUbject
):
//var subj = new ReplaySubject<int>(bufferSize: 1);
var subj = new PromiseSubject<int>();
subj.Subscribe(i => Console.WriteLine(i));
subj.OnNext(1);
subj.OnNext(2);
subj.OnNext(3);
subj.OnCompleted();
subj.Subscribe(i => Console.WriteLine(i));
Console.ReadLine();
预期输出:
1
1
我可以使用 taskcompletionsource
, taskobservableExtensions.toObservable
>主题base 衍生的主题实现,但是使用RX运算符的组成,是否有一种优雅的方式?
更新,我通过 taskCompletionsource
的初步尝试:
public class PromiseSubject<T> : ISubject<T>
{
private readonly TaskCompletionSource<(bool HasValue, T Value)> _tcs;
private readonly IObservable<T> _observable;
public PromiseSubject()
{
_tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
_observable = _tcs.Task.ToObservable()
.Where(r => r.HasValue).Select(r => r.Value!);
}
public void OnCompleted() =>
_tcs.TrySetResult((false, default!));
public void OnError(Exception error) =>
_tcs.TrySetException(error);
public void OnNext(T value) =>
_tcs.TrySetResult((true, value));
public IDisposable Subscribe(IObserver<T> observer) =>
_observable.Subscribe(observer);
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
这是Jeff Mercado的答案的简化版本。我认为只需完成第一个
bufferSize:1)
之后,就可以简单地实现理想的行为。实际上,
asyncSubject&lt; t&gt;
,如@noseratio指出的,in 评论更简单,而且更有效,因为它将其单个值存储在字段中而不是数组中。因此,在这一系列事件中:
...除第一个以外的所有命令都将是无效的。稍后订阅
writeOncesubject
时,它将发射值1
,该值存储在其缓冲区中,然后是on Completed
Notification。This is a simplified version of Jeff Mercado's answer. I think that the desirable behavior can be achieved simply by completing a
ReplaySubject(bufferSize: 1)
after the firstOnNext
.Actually an
AsyncSubject<T>
, as pointed out by @noseratio in a comment, is even simpler, and also slightly more efficient because it stores its single value in a field instead of an array.So in this sequence of events:
...all commands except the first will be no-ops. When the
writeOnceSubject
is subscribed later, it will emit the value1
that is stored in its buffer, followed by anOnCompleted
notification.您可以用两个主题来编写它,一个重播以发射值,如果设置为了散发值,另一个则可以控制初始化。
You could write it in terms of two subjects, one replay subject to emit the value if set, and another to control initialization.
您所描述的听起来与 WriteOnceBlock&lt; t&gt; 来自TPL DataFlow库。数据流块具有方便的扩展方法 Asobervable ,因此,基于此想法,实现将看起来像这样:
不幸的是,这个想法不起作用。
writeOncesubject&lt; t&gt;
的订阅者仅获得oncompleted()
Notification。没有OnNext()
发射。我刚刚发布了意外地表现出Asobservable”> bug Report bug Report >在Github上有关此问题。更新:此处是 Microsoft的反馈史蒂芬·图布(Stephen Toub)的错误报告:
What you describe sounds quite similar to the
WriteOnceBlock<T>
from the TPL Dataflow library. The dataflow blocks have a convenient extension methodAsObservable
, so based on this idea an implementation would look like this:Unfortunately this idea doesn't work. The subscribers of the
WriteOnceSubject<T>
are getting only anOnCompleted()
notification. NoOnNext()
is emitted. I just posted a bug report on GitHub about this issue.Update: Here is Microsoft's feedback regarding the bug report, by Stephen Toub: