iAsyncencencentering和节流

发布于 2025-02-07 19:16:41 字数 105 浏览 2 评论 0原文

消耗iaSyncenncenumerable< t>时是否可以防风数据? 我有一系列数据流很快,我只对每n秒钟的最后一个元素感兴趣。

谢谢。

Is it possible to throttle the data when consuming IAsyncEnumerable<T>?
I have a stream of data coming in rapidly, and I'm only interested in the last element every N seconds.

Thanks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

原来分手还会想你 2025-02-14 19:16:41

拥有一个不同的端点似乎是有意义的,该端点返回最近的事件,而不是处理流。

如果您必须处理队列/流,则可以食用这些事件,并将每个新的传入分配给最新之类的东西,并按照所需的间隔阅读。

It would seem to make sense to have a different endpoint which returns the most recent event rather than a dealing with the stream.

If you have to deal with a queue/stream you could consume the events and assign each new incoming one to something like latest and read that at the desired interval.

夏了南城 2025-02-14 19:16:41

一种解决方案是将IASYNCENUMEASS转换为 iobservable&lt; t&gt;,并利用system.systems.reeactive

首先,您需要一个转换器。我找不到内置,所以我

using System;
using System.Collections.Generic;
using System.Reactive.Subjects;

public class AsyncEnumerableToObservable<T> : IObservable<T>
{
    private readonly IAsyncEnumerable<T> _source;
    private readonly Subject<T> _subject = new();

    public AsyncEnumerableToObservable(IAsyncEnumerable<T> source)
    {
        _source = source;
        BeginConsume();
    }

    public IDisposable Subscribe(IObserver<T> observer) => _subject.Subscribe(observer);

    private async void BeginConsume()
    {
        try
        {
            await foreach (var item in _source)
            {
                _subject.OnNext(item);
            }

            _subject.OnCompleted();
        }
        catch (Exception e)
        {
            _subject.OnError(e);
        }
    }
}

public static class AsyncEnumerableExtensions
{
    public static IObservable<T> ToObservable<T>(this IAsyncEnumerable<T> source)
    {
        return new AsyncEnumerableToObservable<T>(source);
    }
}

使用此转换器创建了自己的创建,您可以使用myEnumerable.toObservable(),并使用示例用于从system节流的方法。反应性

static class Program
{
    static async Task Main(string[] args)
    {
        IAsyncEnumerable<int> seq = CreateSeq();
        seq.ToObservable().Sample(TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);

        await Task.Delay(TimeSpan.FromSeconds(10));
    }

    private static async IAsyncEnumerable<int> CreateSeq()
    {
        int i = 0;
        while (true)
        {
            await Task.Yield();
            i++;
            yield return i;
        }
    }
}

One solution is to convert IAsyncEnumerable<T> to IObservable<T> and leverage power of System.Reactive

First, you need a converter. I couldn't find builtin so I've created my own

using System;
using System.Collections.Generic;
using System.Reactive.Subjects;

public class AsyncEnumerableToObservable<T> : IObservable<T>
{
    private readonly IAsyncEnumerable<T> _source;
    private readonly Subject<T> _subject = new();

    public AsyncEnumerableToObservable(IAsyncEnumerable<T> source)
    {
        _source = source;
        BeginConsume();
    }

    public IDisposable Subscribe(IObserver<T> observer) => _subject.Subscribe(observer);

    private async void BeginConsume()
    {
        try
        {
            await foreach (var item in _source)
            {
                _subject.OnNext(item);
            }

            _subject.OnCompleted();
        }
        catch (Exception e)
        {
            _subject.OnError(e);
        }
    }
}

public static class AsyncEnumerableExtensions
{
    public static IObservable<T> ToObservable<T>(this IAsyncEnumerable<T> source)
    {
        return new AsyncEnumerableToObservable<T>(source);
    }
}

With this converter you can use myEnumerable.ToObservable() and use Sample method for throttling from System.Reactive

static class Program
{
    static async Task Main(string[] args)
    {
        IAsyncEnumerable<int> seq = CreateSeq();
        seq.ToObservable().Sample(TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);

        await Task.Delay(TimeSpan.FromSeconds(10));
    }

    private static async IAsyncEnumerable<int> CreateSeq()
    {
        int i = 0;
        while (true)
        {
            await Task.Yield();
            i++;
            yield return i;
        }
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文