合并最新的,但只推左边

发布于 2024-11-18 18:14:57 字数 1133 浏览 1 评论 0原文

我需要实现一个 CombineLatest 版本(我在这里将其称为 WithLatest),它为左侧的每个项目和右侧的最新项目调用选择器。它不应该只推动右侧的项目发生变化。

我认为这是构建Observable.Create还是现有扩展的组合并不是特别重要;无论哪种方式,我都会将其设为“盒装”扩展方法。

示例

var left = new Subject<int>();
var right = new Subject<int>();

left.WithLatest(right, (l,r) => l + " " + r).Dump();

left.OnNext(1);   // <1>
left.OnNext(2);   // <2>
right.OnNext(1);  // <3>
right.OnNext(2);  // <4>
left.OnNext(3);   // <5>

应该产生

2 1
3 2

编辑:我的示例的逻辑是:

  1. 左侧填充 1。右侧为空,没有推送任何值。
  2. Left 更新为 2(它忘记了之前的值)。右侧仍然是空的,因此没有任何内容被推送。
  3. Right 填充为 1,因此 Left = 2(最新值),Right = 1 被压入。 到目前为止,WithLatestCombineLatest 之间没有任何区别,
  4. 右侧已更新 - 没有推送任何内容。 这就是不同之处
  5. Left 更新为 3,因此 Left = 3,Right = 2(最新值)被推送。

有人建议我尝试:

var lr = right.ObserveOn(Scheduler.TaskPool).Latest();
left.Select(l => l + " " + lr.First()).Dump();

但这会阻止我的测试的当前线程。

I need to implement a version of CombineLatest (I'll call it WithLatest here) that calls the selector for every item on the left and the latest item on the right. It shouldn't push for items on the right changing only.

I think whether this is built Observable.Create or a combination of existing extensions is not particularly important; I'll be making this a "boxed" extension method either way.

Example

var left = new Subject<int>();
var right = new Subject<int>();

left.WithLatest(right, (l,r) => l + " " + r).Dump();

left.OnNext(1);   // <1>
left.OnNext(2);   // <2>
right.OnNext(1);  // <3>
right.OnNext(2);  // <4>
left.OnNext(3);   // <5>

should yield

2 1
3 2

Edit: The logic of my example goes:

  1. Left becomes populated with 1. Right is empty, no values pushed.
  2. Left becomes updated with 2 (it forgets the previous value). Right is still empty, so nothing is pushed.
  3. Right becomes populated with 1, so Left = 2 (the latest value), Right = 1 is pushed. Up to this point, there is no difference between WithLatest and CombineLatest
  4. Right is updated -- nothing is pushed. This is what's different
  5. Left is updated with 3, so Left = 3, Right = 2 (the latest value) is pushed.

It's been suggested that I try:

var lr = right.ObserveOn(Scheduler.TaskPool).Latest();
left.Select(l => l + " " + lr.First()).Dump();

but this blocks on the current thread for my test.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

荒芜了季节 2024-11-25 18:14:57

您可以使用现有的运算符来完成此操作。

Func<int, int, string> selector = (l, r) => l + " " + r;

var query = right.Publish(rs => left.Zip(rs.MostRecent(0), selector).SkipUntil(rs));
  • Publish 确保我们只订阅 right 一次,并在 rs 的所有订阅者之间共享订阅。

  • MostRecentIObservable 转换为 IEnumerable,它始终从源可观察值生成最近发出的值。

  • 每次可观察对象发出一个值时,IObservableIEnumerable 之间的

    Zip 都会发出一个值。

  • SkipUntil 跳过在 right 发出值之前发生的对 (l, r)

You can do this using existing operators.

Func<int, int, string> selector = (l, r) => l + " " + r;

var query = right.Publish(rs => left.Zip(rs.MostRecent(0), selector).SkipUntil(rs));
  • Publish ensures we only ever subscribe to right once and share the subscription among all subscribers to rs.

  • MostRecent turns an IObservable<T> into an IEnumerable<T> that always yields the most recently emitted value from the source observable.

  • Zip between IObservable<T> and IEnumerable<U> emits a value each time the observable emits a value.

  • SkipUntil skips the pairs (l, r) which occur before right ever emits a value.

无法言说的痛 2024-11-25 18:14:57

我也对“仅向左侧推送”的 CombineLatest 有同样的需求。

我将该解决方案设为 Observable.Sample 的“重载”,因为这就是该方法的作用:
它使用采样器(左)对(右)进行采样,并具有提供resultSelector的附加功能(如CombineLatest)。

public static IObservable<TResult> Sample<TSource, TSample, TResult>(
    this IObservable<TSource> source,
    IObservable<TSample> sampler,
    Func<TSource, TSample, TResult> resultSelector)
{
    var multiSampler = sampler.Publish().RefCount();
    return source.CombineLatest(multiSampler, resultSelector).Sample(multiSampler);
}

I also had the same need for a CombineLatest which "pushes only for the left".

I made the solution an "overload" of Observable.Sample, because that's what the method does:
It samples a source (right) with a sampler (left), with the additional capability of providing a resultSelector (like in CombineLatest).

public static IObservable<TResult> Sample<TSource, TSample, TResult>(
    this IObservable<TSource> source,
    IObservable<TSample> sampler,
    Func<TSource, TSample, TResult> resultSelector)
{
    var multiSampler = sampler.Publish().RefCount();
    return source.CombineLatest(multiSampler, resultSelector).Sample(multiSampler);
}
娜些时光,永不杰束 2024-11-25 18:14:57

根据帖子作者选择的解决方案,我认为有一个更简单的解决方案,利用 DistinctUntilChanged

public static IObservable<TResult> CombineLatestOnLeft<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) {
        return leftSource
            .Select<TLeft, Tuple<TLeft, int>>(Tuple.Create<TLeft, int>)
            .CombineLatest(rightSource,
                (l, r) => new { Index = l.Item2, Left = l.Item1, Right = r })
            .DistinctUntilChanged(x => x.Index)
            .Select(x => selector(x.Left, x.Right));
    }

或者即使

public static IObservable<TResult> CombineLatestOnLeft<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) {
        return leftSource
            .CombineLatest(rightSource,
                (l, r) => new { Left = l, Right = r })
            .DistinctUntilChanged(x => x.Left)
            .Select(x => selector(x.Left, x.Right));
    }

您只关心 leftSource 的不同值

Based on the solution picked by the post author I think there's an even simpler solution utilizing DistinctUntilChanged:

public static IObservable<TResult> CombineLatestOnLeft<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) {
        return leftSource
            .Select<TLeft, Tuple<TLeft, int>>(Tuple.Create<TLeft, int>)
            .CombineLatest(rightSource,
                (l, r) => new { Index = l.Item2, Left = l.Item1, Right = r })
            .DistinctUntilChanged(x => x.Index)
            .Select(x => selector(x.Left, x.Right));
    }

or even

public static IObservable<TResult> CombineLatestOnLeft<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) {
        return leftSource
            .CombineLatest(rightSource,
                (l, r) => new { Left = l, Right = r })
            .DistinctUntilChanged(x => x.Left)
            .Select(x => selector(x.Left, x.Right));
    }

if you only care about distinct values of leftSource

流年已逝 2024-11-25 18:14:57

我今天为项目制作了一个 RX 运算符来执行此操作。

这是我的解决方案:

    public static IObservable<Tuple<TSource, TTarget>> JoinLeftSoft<TSource, TTarget>(
        this IObservable<TSource> source, IObservable<TTarget> right)
    {
        return source
            .Select(x => new Tuple<object, TSource>(new object(), x))
            .CombineLatest(right, (l, r) => new Tuple<object, TSource, TTarget>(l.Item1, l.Item2, r))
            .DistinctUntilChanged(t => t.Item1)
            .Select(t => new Tuple<TSource, TTarget>(t.Item2, t.Item3));
    }

I made a RX operator for project today that does this.

Here's my solutions:

    public static IObservable<Tuple<TSource, TTarget>> JoinLeftSoft<TSource, TTarget>(
        this IObservable<TSource> source, IObservable<TTarget> right)
    {
        return source
            .Select(x => new Tuple<object, TSource>(new object(), x))
            .CombineLatest(right, (l, r) => new Tuple<object, TSource, TTarget>(l.Item1, l.Item2, r))
            .DistinctUntilChanged(t => t.Item1)
            .Select(t => new Tuple<TSource, TTarget>(t.Item2, t.Item3));
    }
一个人练习一个人 2024-11-25 18:14:57

在最新的System.Reactive上,我们可以使用WithLatestFrom扩展方法。

left.WithLatestFrom(right, (l, r) => l + " " + r).Dump();

结果将正确如下。

3 2 

On latest System.Reactive, we can use WithLatestFrom extension method.

left.WithLatestFrom(right, (l, r) => l + " " + r).Dump();

The result would be below correctly.

3 2 
笑,眼淚并存 2024-11-25 18:14:57

这是使用 Create 的 hacky 方法 - 并没有真正构建它,如果它实际上不起作用,那就是我的错:)

public static IObservable<TRet> WithLatest<TLeft, TRight, TRet>(
        this IObservable<TLeft> lhs, 
        IObservable<TRight> rhs, 
        Func<TLeft, TRight, TRet> sel)
{
    return Observable.Create<TRet>(subj => {
        bool rhsSet = false;
        bool deaded = false;
        var latestRhs = default(TRight);

        Action onDeaded = null;

        var rhsDisp = rhs.Subscribe(
            x => { latestRhs = x; rhsSet = true; }, 
            ex => { subj.OnError(ex); onDeaded(); });

        var lhsDisp = lhs
            .Where(_ => deaded == false && rhsSet == true)
            .Subscribe(
                x => subj.OnNext(sel(x, latestRhs)),
                ex => { subj.OnError(ex); onDeaded(); },
                () => { subj.OnCompleted(); onDeaded(); });

        onDeaded = () => {
            deaded = true;
            if (lhsDisp != null) {
                lhsDisp.Dispose();
                lhsDisp = null;
            }
            if (rhsDisp != null) {
                rhsDisp.Dispose();
                rhsDisp = null;
            }
        };

        return onDeaded;
    });
}

Here's the hacky way using Create - didn't actually build it, mea culpa if it doesn't actually work :)

public static IObservable<TRet> WithLatest<TLeft, TRight, TRet>(
        this IObservable<TLeft> lhs, 
        IObservable<TRight> rhs, 
        Func<TLeft, TRight, TRet> sel)
{
    return Observable.Create<TRet>(subj => {
        bool rhsSet = false;
        bool deaded = false;
        var latestRhs = default(TRight);

        Action onDeaded = null;

        var rhsDisp = rhs.Subscribe(
            x => { latestRhs = x; rhsSet = true; }, 
            ex => { subj.OnError(ex); onDeaded(); });

        var lhsDisp = lhs
            .Where(_ => deaded == false && rhsSet == true)
            .Subscribe(
                x => subj.OnNext(sel(x, latestRhs)),
                ex => { subj.OnError(ex); onDeaded(); },
                () => { subj.OnCompleted(); onDeaded(); });

        onDeaded = () => {
            deaded = true;
            if (lhsDisp != null) {
                lhsDisp.Dispose();
                lhsDisp = null;
            }
            if (rhsDisp != null) {
                rhsDisp.Dispose();
                rhsDisp = null;
            }
        };

        return onDeaded;
    });
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文