反应式扩展是否支持滚动缓冲区?

发布于 2024-12-07 14:48:12 字数 579 浏览 1 评论 0原文

我正在使用反应式扩展将数据整理到 100 毫秒的缓冲区中:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100))
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);

这工作得很好。但是,我想要的行为与 Buffer 操作提供的行为略有不同。本质上,如果收到另一个数据项,我想重置计时器。只有当整个 100ms 都没有收到数据时我才想处理它。这开启了永远处理数据的可能性,因此我还应该能够指定最大计数。我会想象这样的事情:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)

我环顾四周,但在 Rx 中找不到类似的东西?有人能证实/否认这一点吗?

I'm using reactive extensions to collate data into buffers of 100ms:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100))
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);

This works fine. However, I want slightly different behavior than that provided by the Buffer operation. Essentially, I want to reset the timer if another data item is received. Only when no data has been received for the entire 100ms do I want to handle it. This opens up the possibility of never handling the data, so I should also be able to specify a maximum count. I would imagine something along the lines of:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)

I've had a look around and haven't been able to find anything like this in Rx? Can anyone confirm/deny this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

拥抱我好吗 2024-12-14 14:48:12

这可以通过组合 可观察。首先,让我们解决忽略最大计数条件的更简单的问题:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

强大的 Window 方法 完成了繁重的工作。现在很容易了解如何添加最大计数:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
{
    var closes = stream.Throttle(delay);
    if (max != null)
    {
        var overflows = stream.Where((x,index) => index+1>=max);
        closes = closes.Merge(overflows);
    }
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

我将在我的博客上写一篇文章对此进行解释。 https://gist.github.com/2244036

Window 方法的文档:

This is possible by combining the built-in Window and Throttle methods of Observable. First, let's solve the simpler problem where we ignore the maximum count condition:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

The powerful Window method did the heavy lifting. Now it's easy enough to see how to add a maximum count:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
{
    var closes = stream.Throttle(delay);
    if (max != null)
    {
        var overflows = stream.Where((x,index) => index+1>=max);
        closes = closes.Merge(overflows);
    }
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

I'll write a post explaining this on my blog. https://gist.github.com/2244036

Documentation for the Window method:

芯好空 2024-12-14 14:48:12

我编写了一个扩展来完成您所要做的大部分事情 - BufferWithInactivity。

这里是:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source,
    TimeSpan inactivity,
    int maximumBufferSize)
{
    return Observable.Create<IEnumerable<T>>(o =>
    {
        var gate = new object();
        var buffer = new List<T>();
        var mutable = new SerialDisposable();
        var subscription = (IDisposable)null;
        var scheduler = Scheduler.ThreadPool;

        Action dump = () =>
        {
            var bts = buffer.ToArray();
            buffer = new List<T>();
            if (o != null)
            {
                o.OnNext(bts);
            }
        };

        Action dispose = () =>
        {
            if (subscription != null)
            {
                subscription.Dispose();
            }
            mutable.Dispose();
        };

        Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
            onAction =>
            {
                lock (gate)
                {
                    dispose();
                    dump();
                    if (o != null)
                    {
                        onAction(o);
                    }
                }
            };

        Action<Exception> onError = ex =>
            onErrorOrCompleted(x => x.OnError(ex));

        Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());

        Action<T> onNext = t =>
        {
            lock (gate)
            {
                buffer.Add(t);
                if (buffer.Count == maximumBufferSize)
                {
                    dump();
                    mutable.Disposable = Disposable.Empty;
                }
                else
                {
                    mutable.Disposable = scheduler.Schedule(inactivity, () =>
                    {
                        lock (gate)
                        {
                            dump();
                        }
                    });
                }
            }
        };

        subscription =
            source
                .ObserveOn(scheduler)
                .Subscribe(onNext, onError, onCompleted);

        return () =>
        {
            lock (gate)
            {
                o = null;
                dispose();
            }
        };
    });
}

I wrote an extension to do most of what you're after - BufferWithInactivity.

Here it is:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source,
    TimeSpan inactivity,
    int maximumBufferSize)
{
    return Observable.Create<IEnumerable<T>>(o =>
    {
        var gate = new object();
        var buffer = new List<T>();
        var mutable = new SerialDisposable();
        var subscription = (IDisposable)null;
        var scheduler = Scheduler.ThreadPool;

        Action dump = () =>
        {
            var bts = buffer.ToArray();
            buffer = new List<T>();
            if (o != null)
            {
                o.OnNext(bts);
            }
        };

        Action dispose = () =>
        {
            if (subscription != null)
            {
                subscription.Dispose();
            }
            mutable.Dispose();
        };

        Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
            onAction =>
            {
                lock (gate)
                {
                    dispose();
                    dump();
                    if (o != null)
                    {
                        onAction(o);
                    }
                }
            };

        Action<Exception> onError = ex =>
            onErrorOrCompleted(x => x.OnError(ex));

        Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());

        Action<T> onNext = t =>
        {
            lock (gate)
            {
                buffer.Add(t);
                if (buffer.Count == maximumBufferSize)
                {
                    dump();
                    mutable.Disposable = Disposable.Empty;
                }
                else
                {
                    mutable.Disposable = scheduler.Schedule(inactivity, () =>
                    {
                        lock (gate)
                        {
                            dump();
                        }
                    });
                }
            }
        };

        subscription =
            source
                .ObserveOn(scheduler)
                .Subscribe(onNext, onError, onCompleted);

        return () =>
        {
            lock (gate)
            {
                o = null;
                dispose();
            }
        };
    });
}
吃兔兔 2024-12-14 14:48:12

使用 Rx Extensions 2.0,您可以通过接受超时和大小的新缓冲区重载来满足这两个要求:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100), 1)
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);

请参阅 https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx 查看文档。

With Rx Extensions 2.0, your can answer both requirements with a new Buffer overload accepting a timeout and a size:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100), 1)
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);

See https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx for the documentation.

零時差 2024-12-14 14:48:12

正如 Rohit Sharma 在 Panic 上校的解决方案中的评论中提到的,项目将被缓冲并且不会被缓冲的位置存在问题。除非生成了项目,否则推送给订阅者。

正如此 comment 中所述,问题是 p.Window (() => closes),因为它打开了一个可能会错过事件的间隙。

处理每个窗口后将调用该 lambda。 Window 运算符将针对 lambda 每次返回的内容调用 Subscribe,因为据它所知,您可能每次都会从该 lambda 返回完全不同的 IObservable。

由于现在始终使用相同的 lambda,因此我们需要调整 maxCount。如果没有更改,maxCount 将永远不会重置,并且在命中一次后,每个新事件都将超过 maxCount。

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
    var publish = stream.Publish(p =>
    {
        var closes = p.Throttle(delay);

        if (maxCount != null)
        {
            Int32 i = 0;

            var overflows = p.Where(x =>
            {
                ++i;

                if (i >= maxCount)
                {
                    i = 0;
                    return true;
                }

                return false;
            });

            closes = closes.Merge(overflows);
        }

        return p.Window(closes).SelectMany(window => window.ToList());
    });

    return publish;
}

更新:
经过进一步的测试,我发现在某些情况下,项目仍然无法正确推送给订阅者。

以下是我们使用的解决方法,4 个月以来一直没有出现任何问题。

解决方法是将 .Delay(...) 添加到任何 TimeSpan 中。

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
    var publish = stream.Publish(p =>
    {
        var closes = p.Throttle(delay);

        if (maxCount != null)
        {
            var overflows = stream.Where((x, index) => index + 1 >= maxCount);
            closes = closes.Merge(overflows);
        }

        return p.Window(() => closes).SelectMany(window => window.ToList()).Delay(TimeSpan.Zero);
    });

    return publish;
}

As Rohit Sharma mentioned with his comment at Colonel Panic's solution, there is a problem with where items will be buffered and will not be pushed to subscriber unless an item is generated.

As described in this comment the problem is p.Window(() => closes), because it opens up a gap in which events can be missed.

That lambda is going to be invoked after each window is processed. And the Window operator is going to call Subscribe on what the lambda returns each time, because as far as it knows, you might return a completely different IObservable from that lambda every time.

Since now always the same lambda is used, we need to adjust the maxCount. Without the change the maxCount would never be reseted and after it was hit once, every new event would be over the maxCount.

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
    var publish = stream.Publish(p =>
    {
        var closes = p.Throttle(delay);

        if (maxCount != null)
        {
            Int32 i = 0;

            var overflows = p.Where(x =>
            {
                ++i;

                if (i >= maxCount)
                {
                    i = 0;
                    return true;
                }

                return false;
            });

            closes = closes.Merge(overflows);
        }

        return p.Window(closes).SelectMany(window => window.ToList());
    });

    return publish;
}

Update:
After further tests i found out that still, in some cases, items will not be correctly pushed to the subscriber.

Here is the workaround which works for us since already 4 months without any problems.

The workaround is adding .Delay(...) with any TimeSpan.

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? maxCount = null)
{
    var publish = stream.Publish(p =>
    {
        var closes = p.Throttle(delay);

        if (maxCount != null)
        {
            var overflows = stream.Where((x, index) => index + 1 >= maxCount);
            closes = closes.Merge(overflows);
        }

        return p.Window(() => closes).SelectMany(window => window.ToList()).Delay(TimeSpan.Zero);
    });

    return publish;
}
独自唱情﹋歌 2024-12-14 14:48:12

我想这可以在 Buffer 方法之上实现,如下所示:

public static IObservable<IList<T>> SlidingBuffer<T>(this IObservable<T> obs, TimeSpan span, int max)
        {
            return Observable.CreateWithDisposable<IList<T>>(cl =>
            {
                var acc = new List<T>();
                return obs.Buffer(span)
                        .Subscribe(next =>
                        {
                            if (next.Count == 0) //no activity in time span
                            {
                                cl.OnNext(acc);
                                acc.Clear();
                            }
                            else
                            {
                                acc.AddRange(next);
                                if (acc.Count >= max) //max items collected
                                {
                                    cl.OnNext(acc);
                                    acc.Clear();
                                }
                            }
                        }, err => cl.OnError(err), () => { cl.OnNext(acc); cl.OnCompleted(); });
            });
        }

注意:我还没有测试过它,但我希望它能给你这个想法。

I guess this can be implemented on top of Buffer method as shown below:

public static IObservable<IList<T>> SlidingBuffer<T>(this IObservable<T> obs, TimeSpan span, int max)
        {
            return Observable.CreateWithDisposable<IList<T>>(cl =>
            {
                var acc = new List<T>();
                return obs.Buffer(span)
                        .Subscribe(next =>
                        {
                            if (next.Count == 0) //no activity in time span
                            {
                                cl.OnNext(acc);
                                acc.Clear();
                            }
                            else
                            {
                                acc.AddRange(next);
                                if (acc.Count >= max) //max items collected
                                {
                                    cl.OnNext(acc);
                                    acc.Clear();
                                }
                            }
                        }, err => cl.OnError(err), () => { cl.OnNext(acc); cl.OnCompleted(); });
            });
        }

NOTE: I haven't tested it, but I hope it gives you the idea.

红尘作伴 2024-12-14 14:48:12

Panic 上校的解决方案几乎是完美的。唯一缺少的是 Publish 组件,以便使该解决方案也适用于冷序列。

/// <summary>
/// Projects each element of an observable sequence into a buffer that's sent out
/// when either a given inactivity timespan has elapsed, or it's full,
/// using the specified scheduler to run timers.
/// </summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, int maxCount,
    IScheduler scheduler = default)
{
    if (maxCount < 1) throw new ArgumentOutOfRangeException(nameof(maxCount));
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
    {
        var combinedBoundaries = Observable.Merge
        (
            published.Throttle(dueTime, scheduler),
            published.Skip(maxCount - 1)
        );

        return published
            .Window(() => combinedBoundaries)
            .SelectMany(window => window.ToList());
    });
}

除了添加 Publish 之外,我还用等效但更短的内容替换了原始的 .Where((_, index) => index + 1 >= maxCount) .Skip(maxCount - 1)。为了完整起见,还有一个 IScheduler 参数,用于配置运行计时器的调度程序。

Colonel Panic's solution is almost perfect. The only thing that is missing is a Publish component, in order to make the solution work with cold sequences too.

/// <summary>
/// Projects each element of an observable sequence into a buffer that's sent out
/// when either a given inactivity timespan has elapsed, or it's full,
/// using the specified scheduler to run timers.
/// </summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, int maxCount,
    IScheduler scheduler = default)
{
    if (maxCount < 1) throw new ArgumentOutOfRangeException(nameof(maxCount));
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
    {
        var combinedBoundaries = Observable.Merge
        (
            published.Throttle(dueTime, scheduler),
            published.Skip(maxCount - 1)
        );

        return published
            .Window(() => combinedBoundaries)
            .SelectMany(window => window.ToList());
    });
}

Beyond adding the Publish, I've also replaced the original .Where((_, index) => index + 1 >= maxCount) with the equivalent but shorter .Skip(maxCount - 1). For completeness there is also an IScheduler parameter, which configures the scheduler where the timer is run.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文