在rx.net中,如何使主题与TaskCompletionsource行为相似?

发布于 2025-02-05 17:46:52 字数 1686 浏览 1 评论 0 原文

在rx.net中,如何使主题类似于 taskcompletionsource.task 行为?

即使完成,它也需要缓存并回复第一个事件。 asyncSubject 也不是 replaySubject(BufferSize:1)都会做到这一点。

例如(让我们称其为 PromisesUbject ):

//var subj = new ReplaySubject<int>(bufferSize: 1);
var subj = new PromiseSubject<int>();

subj.Subscribe(i => Console.WriteLine(i));

subj.OnNext(1);
subj.OnNext(2);
subj.OnNext(3);
subj.OnCompleted();

subj.Subscribe(i => Console.WriteLine(i));

Console.ReadLine();

预期输出:

1
1

我可以使用 taskcompletionsource taskobservableExtensions.toObservable >主题base 衍生的主题实现,但是使用RX运算符的组成,是否有一种优雅的方式?

更新,我通过 taskCompletionsource 的初步尝试:

public class PromiseSubject<T> : ISubject<T>
{
    private readonly TaskCompletionSource<(bool HasValue, T Value)> _tcs;
    private readonly IObservable<T> _observable;

    public PromiseSubject()
    {
        _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
        _observable = _tcs.Task.ToObservable()
            .Where(r => r.HasValue).Select(r => r.Value!);
    }

    public void OnCompleted() =>
        _tcs.TrySetResult((false, default!));

    public void OnError(Exception error) =>
        _tcs.TrySetException(error);

    public void OnNext(T value) =>
        _tcs.TrySetResult((true, value));

    public IDisposable Subscribe(IObserver<T> observer) =>
        _observable.Subscribe(observer);
}

In Rx.NET, how do I make a Subject to resemble TaskCompletionSource.Task behavior?

It needs to cache and reply the first event, even if completed. Neither AsyncSubject nor ReplaySubject(bufferSize: 1) would do that.

For example (let's call it PromiseSubject):

//var subj = new ReplaySubject<int>(bufferSize: 1);
var subj = new PromiseSubject<int>();

subj.Subscribe(i => Console.WriteLine(i));

subj.OnNext(1);
subj.OnNext(2);
subj.OnNext(3);
subj.OnCompleted();

subj.Subscribe(i => Console.WriteLine(i));

Console.ReadLine();

Expected output:

1
1

I can possibly cook it up using TaskCompletionSource, TaskObservableExtensions.ToObservable and a custom SubjectBase-derived subject implementation, but is there an elegant way of doing it using a composition of Rx operators?

Updated, my initial attempt via TaskCompletionSource:

public class PromiseSubject<T> : ISubject<T>
{
    private readonly TaskCompletionSource<(bool HasValue, T Value)> _tcs;
    private readonly IObservable<T> _observable;

    public PromiseSubject()
    {
        _tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
        _observable = _tcs.Task.ToObservable()
            .Where(r => r.HasValue).Select(r => r.Value!);
    }

    public void OnCompleted() =>
        _tcs.TrySetResult((false, default!));

    public void OnError(Exception error) =>
        _tcs.TrySetException(error);

    public void OnNext(T value) =>
        _tcs.TrySetResult((true, value));

    public IDisposable Subscribe(IObserver<T> observer) =>
        _observable.Subscribe(observer);
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

帅哥哥的热头脑 2025-02-12 17:46:52

这是Jeff Mercado的答案的简化版本。我认为只需完成第一个 bufferSize:1)之后,就可以简单地实现理想的行为。

实际上, asyncSubject&lt; t&gt; ,如@noseratio指出的,in 评论更简单,而且更有效,因为它将其单个值存储在字段中而不是数组中。

public class WriteOnceSubject<T> : ISubject<T>
{
    private readonly AsyncSubject<T> subject = new();

    public void OnNext(T value) { subject.OnNext(value); subject.OnCompleted(); }
    public void OnError(Exception error) => subject.OnError(error);
    public void OnCompleted() => subject.OnCompleted();

    public IDisposable Subscribe(IObserver<T> observer) => subject.Subscribe(observer);
}

因此,在这一系列事件中:

writeOnceSubject.OnNext(1);
writeOnceSubject.OnNext(2);
writeOnceSubject.OnNext(3);
writeOnceSubject.OnCompleted();
writeOnceSubject.OnError(new Exception());

...除第一个以外的所有命令都将是无效的。稍后订阅 writeOncesubject 时,它将发射值 1 ,该值存储在其缓冲区中,然后是 on Completed Notification。

This is a simplified version of Jeff Mercado's answer. I think that the desirable behavior can be achieved simply by completing a ReplaySubject(bufferSize: 1) after the first OnNext.

Actually an AsyncSubject<T>, as pointed out by @noseratio in a comment, is even simpler, and also slightly more efficient because it stores its single value in a field instead of an array.

public class WriteOnceSubject<T> : ISubject<T>
{
    private readonly AsyncSubject<T> subject = new();

    public void OnNext(T value) { subject.OnNext(value); subject.OnCompleted(); }
    public void OnError(Exception error) => subject.OnError(error);
    public void OnCompleted() => subject.OnCompleted();

    public IDisposable Subscribe(IObserver<T> observer) => subject.Subscribe(observer);
}

So in this sequence of events:

writeOnceSubject.OnNext(1);
writeOnceSubject.OnNext(2);
writeOnceSubject.OnNext(3);
writeOnceSubject.OnCompleted();
writeOnceSubject.OnError(new Exception());

...all commands except the first will be no-ops. When the writeOnceSubject is subscribed later, it will emit the value 1 that is stored in its buffer, followed by an OnCompleted notification.

蝶…霜飞 2025-02-12 17:46:52

您可以用两个主题来编写它,一个重播以发射值,如果设置为了散发值,另一个则可以控制初始化。

public class PromiseSubject<T> : ISubject<T>
{
    private readonly Subject<T> initialize = new();
    private readonly ReplaySubject<T> subject = new(1);
    public PromiseSubject() => initialize.Subscribe(subject);
    
    public void OnCompleted() => initialize.OnCompleted();
    public void OnError(Exception error) => initialize.OnError(error);
    public void OnNext(T value)
    {
        initialize.OnNext(value);
        initialize.OnCompleted();
    }

    public IDisposable Subscribe(IObserver<T> observer) => subject.Subscribe(observer);
}

You could write it in terms of two subjects, one replay subject to emit the value if set, and another to control initialization.

public class PromiseSubject<T> : ISubject<T>
{
    private readonly Subject<T> initialize = new();
    private readonly ReplaySubject<T> subject = new(1);
    public PromiseSubject() => initialize.Subscribe(subject);
    
    public void OnCompleted() => initialize.OnCompleted();
    public void OnError(Exception error) => initialize.OnError(error);
    public void OnNext(T value)
    {
        initialize.OnNext(value);
        initialize.OnCompleted();
    }

    public IDisposable Subscribe(IObserver<T> observer) => subject.Subscribe(observer);
}
伪装你 2025-02-12 17:46:52

您所描述的听起来与 WriteOnceBlock&lt; t&gt; 来自TPL DataFlow库。数据流块具有方便的扩展方法 Asobervable ,因此,基于此想法,实现将看起来像这样:

public class WriteOnceSubject<T> : ISubject<T>
{
    private readonly WriteOnceBlock<T> _block = new WriteOnceBlock<T>(x => x);

    public void OnCompleted() => _block.Complete();
    public void OnError(Exception error) => ((ISourceBlock<T>)_block).Fault(error);
    public void OnNext(T value) => _block.Post(value);

    public IDisposable Subscribe(IObserver<T> observer)
        => _block.AsObservable().Subscribe(observer);
}

不幸的是,这个想法不起作用。 writeOncesubject&lt; t&gt; 的订阅者仅获得 oncompleted() Notification。没有 OnNext()发射。我刚刚发布了意外地表现出Asobservable”> bug Report bug Report >在Github上有关此问题。


更新:此处是 Microsoft的反馈史蒂芬·图布(Stephen Toub)的错误报告:

writeOnceBlock 只有一个值,该值可以多次消耗,因此块在给出值后立即完成。 ASOBSERVABLE 检查源是否已完成,并将其视为不再有数据的指示。因此,如果您在将数据传递给 writeNceBlock 之前订阅观察者,则 writeNceBlock 将尽职尽责地将这些数据传播到完成之前的链接目标,并且观察者将接收到它,但是如果观察者在 writeNceBlock 完成后订阅,则将假定没有数据来完成,并且它本身会发出信号完成。

有可能从 Asobervable 中删除这些检查,如果源已经完成,则可能会以某种费用,但现在 wrighonceblock Asobservable 不是完美的。


What you describe sounds quite similar to the WriteOnceBlock<T> from the TPL Dataflow library. The dataflow blocks have a convenient extension method AsObservable, so based on this idea an implementation would look like this:

public class WriteOnceSubject<T> : ISubject<T>
{
    private readonly WriteOnceBlock<T> _block = new WriteOnceBlock<T>(x => x);

    public void OnCompleted() => _block.Complete();
    public void OnError(Exception error) => ((ISourceBlock<T>)_block).Fault(error);
    public void OnNext(T value) => _block.Post(value);

    public IDisposable Subscribe(IObserver<T> observer)
        => _block.AsObservable().Subscribe(observer);
}

Unfortunately this idea doesn't work. The subscribers of the WriteOnceSubject<T> are getting only an OnCompleted() notification. No OnNext() is emitted. I just posted a bug report on GitHub about this issue.


Update: Here is Microsoft's feedback regarding the bug report, by Stephen Toub:

WriteOnceBlock only ever has a single value, which is consumable any number of times, and as such the block completes as soon as it's been given a value. AsObservable checks whether a source has completed and takes that as an indication that no more data will be coming. So if you subscribe the observer prior to data being passed to the WriteOnceBlock, the WriteOnceBlock will dutifully propagate that data to linked targets prior to completing and the observer will receive it, but if the observer is subscribed after the WriteOnceBlock has completed, it'll assume no data is coming, and it'll itself signal completion.

It's possible those checks could be removed from AsObservable, at some expense if the source has already completed, but at present WriteOnceBlock composability with AsObservable isn't perfect.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文