如何在流 B 触发时抑制流 A 的下一个事件

发布于 2024-11-05 13:00:51 字数 302 浏览 3 评论 0原文

我想每当流 B 触发时就停止流 A 以获得一个通知。两个流都将保持在线状态并且永远不会完成。

A: o--o--o--o--o--o--o--o--o  
B: --o-----o--------o-------  
R: o-----o-----o--o-----o--o  

或者

A: o--o--o--o--o--o--o--o--o  
B: -oo----oo-------oo-------  
R: o-----o-----o--o-----o--o  

I want to stop stream A for exactly one notification whenever stream B fires. Both streams will stay online and won't ever complete.

A: o--o--o--o--o--o--o--o--o  
B: --o-----o--------o-------  
R: o-----o-----o--o-----o--o  

or

A: o--o--o--o--o--o--o--o--o  
B: -oo----oo-------oo-------  
R: o-----o-----o--o-----o--o  

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

随梦而飞# 2024-11-12 13:00:51

这是我为 类似问题所做的 SkipWhen 运算符的版本 (区别在于,在原来的情况下,多个“B”会跳过多个“A”):

public static IObservable<TSource> SkipWhen<TSource, TOther>(this IObservable<TSource> source, 
    IObservable<TOther> other)
{
    return Observable.Create<TSource>(observer =>
    {
        object lockObject = new object();
        bool shouldSkip = false;

        var otherSubscription = new MutableDisposable();
        var sourceSubscription = new MutableDisposable();

        otherSubscription.Disposable = other.Subscribe(
            x => { lock(lockObject) { shouldSkip = true; } });

        sourceSubscription.Disposable = source.Where(_ =>
        {
            lock(lockObject)
            {
                if (shouldSkip)
                {
                    shouldSkip = false;
                    return false;
                }
                else
                {
                    return true;
                }
            }
        }).Subscribe(observer);

        return new CompositeDisposable(
            sourceSubscription, otherSubscription);
    });
}

如果当前实现成为瓶颈,请考虑更改锁实现以使用 ReaderWriterLockSlim

Here's a version of my SkipWhen operator I did for a similar question (the difference is that, in the original, multiple "B's" would skip multiple "A's"):

public static IObservable<TSource> SkipWhen<TSource, TOther>(this IObservable<TSource> source, 
    IObservable<TOther> other)
{
    return Observable.Create<TSource>(observer =>
    {
        object lockObject = new object();
        bool shouldSkip = false;

        var otherSubscription = new MutableDisposable();
        var sourceSubscription = new MutableDisposable();

        otherSubscription.Disposable = other.Subscribe(
            x => { lock(lockObject) { shouldSkip = true; } });

        sourceSubscription.Disposable = source.Where(_ =>
        {
            lock(lockObject)
            {
                if (shouldSkip)
                {
                    shouldSkip = false;
                    return false;
                }
                else
                {
                    return true;
                }
            }
        }).Subscribe(observer);

        return new CompositeDisposable(
            sourceSubscription, otherSubscription);
    });
}

If the current implementation becomes a bottleneck, consider changing the lock implementation to use a ReaderWriterLockSlim.

明媚如初 2024-11-12 13:00:51

当可观察量很热时(并且没有 refCount),此解决方案将起作用:

streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);
  1. .takeUntil(streamB):使流 A 在流 B 产生一个值。
  2. .skip(1):使流 A 在启动时跳过一个值(或作为 .repeat() 的结果)。
  3. .repeat():使流A无限期地重复(重新连接)。
  4. .merge(streamA.take(1)):在流的开头偏移 .skip(1) 的效果。

让 A 流每 5 秒跳过一次的示例:

var streamA,
    streamB;

streamA = Rx.Observable
    .interval(1000)
    .map(function (x) {
        return 'A:' + x;
}).publish();

streamB = Rx.Observable
    .interval(5000);

streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);

streamA.connect();

您还可以使用此沙箱 http://jsbin.com/gijorid/4/edit?js ,console 在运行代码时在控制台日志中执行BACTION(),手动推送一个值到streamB(这有助于分析代码)。

This solution will work when the observable is hot (and without refCount):

streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);
  1. .takeUntil(streamB): make stream A complete upon stream B producing a value.
  2. .skip(1): make stream A skip one value upon starting (or as a result of .repeat()).
  3. .repeat(): make stream A repeat (reconnect) indefinitely.
  4. .merge(streamA.take(1)): offset the effect of .skip(1) at the beginning of the stream.

Example of making A stream skip every 5 seconds:

var streamA,
    streamB;

streamA = Rx.Observable
    .interval(1000)
    .map(function (x) {
        return 'A:' + x;
}).publish();

streamB = Rx.Observable
    .interval(5000);

streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);

streamA.connect();

You can also use this sandbox http://jsbin.com/gijorid/4/edit?js,console to execute BACTION() in the console log at the time of running the code to manually push a value to streamB (which is helpful for analysing the code).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文