反应式节流阀返回时间跨度内添加的所有项目

发布于 2024-12-26 16:41:28 字数 401 浏览 0 评论 0原文

给定一个 IObservable,有没有一种方法可以使用 Throttle 行为(在添加项目时重置计时器,但让它返回其中添加的所有项目的集合) 。

Buffer 提供了类似的功能,它可以在每个时间跨度或计数上将数据分块到 IList 中,但我每次都需要重置该时间 添加一个项目。

我在这里看到了类似的问题,反应式扩展是否支持滚动缓冲区? ,但答案似乎并不理想,而且有点旧,所以我想知道 Rx-Main 的发行版本现在是否支持此功能。

Given an IObservable<T> is there a way to use Throttle behaviour (reset a timer when an item is added, but have it return a collection of all the items added within that time?

Buffer provides a similar functionality it that it chunks the data up into IList<T> on every time span or count. But I need that time to reset each time an item is added.

I've seen a similar question here, Does reactive extensions support rolling buffers?, but the answers don't seem ideal and it's a little old so I wondered if the release version of Rx-Main now supports this functionality out the box.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

情未る 2025-01-02 16:41:29

正如我在另一篇文章中回答的,是的,你可以!使用 ObservableThrottleWindow 方法:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

As I answered in the other post, yes you can! Using the Throttle and Window methods of Observable:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}
绮烟 2025-01-02 16:41:29

我通过添加 Publish 组件修改了 Colonel Panic 的 BufferUntilInactive 运算符,这样它也可以与冷可观察量一起正常工作:

/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers, which are produced based on time and activity,
/// using the specified scheduler to run timers.</summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler = default)
{
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
        published
            .Window(() => published.Throttle(dueTime, scheduler))
            .SelectMany(window => window.ToList())
    );
}

为了完整起见,我还添加了一个可选的 IScheduler 参数,它配置运行计时器的调度程序。

I amended Colonel Panic's BufferUntilInactive operator by adding a Publish component, so that it works correctly with cold observables too:

/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers, which are produced based on time and activity,
/// using the specified scheduler to run timers.</summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler = default)
{
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
        published
            .Window(() => published.Throttle(dueTime, scheduler))
            .SelectMany(window => window.ToList())
    );
}

For completeness I've also added an optional IScheduler parameter, which configures the scheduler where the timer is run.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文