如何在消费和处理任何单个项目之间执行最大的间隔策略,如何批处理渠道阅读器?

发布于 2025-01-30 16:51:37 字数 2276 浏览 3 评论 0 原文

我正在使用a ; 在生产者 - 消费者场景中,我需要分别以10个项目的批量消耗渠道,而无需让任何消费的物品以缓冲区的空闲时间超过5秒钟。此持续时间是从通道读取项目和处理包含此项目的批次之间允许的最大延迟。最大的延迟策略优先于批量大小策略,因此即使使用少于10个项目也应处理批次,以满足最大延迟要求。

我能够以 readallBatches

public static async IAsyncEnumerable<T[]> ReadAllBatches<T>(
    this ChannelReader<T> channelReader, int batchSize)
{
    List<T> buffer = new();
    while (true)
    {
        T item;
        try { item = await channelReader.ReadAsync(); }
        catch (ChannelClosedException) { break; }
        buffer.Add(item);
        if (buffer.Count == batchSize)
        {
            yield return buffer.ToArray();
            buffer.Clear();
        }
    }
    if (buffer.Count > 0) yield return buffer.ToArray();
    await channelReader.Completion; // Propagate possible failure
}

await foreach (Item[] batch in myChannel.Reader.ReadAllBatches(10))
{
    Console.WriteLine($"Processing batch of {batch.Length} items");
}

​/strong>如何增强我的 readallBatches&lt; t&gt; 实现了其他 timespan timepan timeout 参数,该参数可以执行上述最大延迟策略,而无需将第三方软件包安装到以上我的项目?

重要:所请求的实施不应容易受到报道的记忆泄漏问题的影响此处。因此,消耗通道的循环不应导致应用程序使用的内存的稳定增量,以防撰写频道中的项目的生产商已延长了一段时间的闲置。

注意:我知道A ,在连续批次之间执行最大的间隔策略吗? /en-us/dotnet/api/system.collections.generic.iasyncenumoser-1“ rel =“ nofollow noreferrer”> iasyncencenumerable&lt&lt&lt; 。由于性能原因,我对一种直接针对 ChannelReader&lt; t&gt; t&gt; t&gt; t&gt; t&gt; t&gt; 的方法感兴趣。

I am using a Channel<T> in a producer-consumer scenario, and I have the requirement to consume the channel in batches of 10 items each, and without letting any consumed item to stay idle in a buffer for more than 5 seconds. This duration is the maximum latency allowed between reading an item from the channel, and processing a batch that contains this item. The maximum latency policy has precedence over the batch size policy, so a batch should be processed even with fewer than 10 items, in order to satisfy the max-latency requirement.

I was able to implement the first requirement, in the form of a ReadAllBatches extension method for the ChannelReader<T> class:

public static async IAsyncEnumerable<T[]> ReadAllBatches<T>(
    this ChannelReader<T> channelReader, int batchSize)
{
    List<T> buffer = new();
    while (true)
    {
        T item;
        try { item = await channelReader.ReadAsync(); }
        catch (ChannelClosedException) { break; }
        buffer.Add(item);
        if (buffer.Count == batchSize)
        {
            yield return buffer.ToArray();
            buffer.Clear();
        }
    }
    if (buffer.Count > 0) yield return buffer.ToArray();
    await channelReader.Completion; // Propagate possible failure
}

I am planning to use it like this:

await foreach (Item[] batch in myChannel.Reader.ReadAllBatches(10))
{
    Console.WriteLine(
quot;Processing batch of {batch.Length} items");
}

My question is: how can I enhance my ReadAllBatches<T> implementation with an additional TimeSpan timeout parameter, that enforces the aforementioned maximum latency policy, and without installing third-party packages to my project?

Important: The requested implementation should not be susceptible to the memory leak issue that has been reported here. So the loop that consumes the channel should not cause the steady increment of the memory used by the application, in case the producer that writes the items in the channel has become idle for a prolonged period of time.

Note: I am aware that a similar question exists regarding batching the IAsyncEnumerable<T> interface, but I am not interested to that. I am interested for a method that targets directly the ChannelReader<T> type, for performance reasons.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

不疑不惑不回忆 2025-02-06 16:51:37

以下是发布“>在github上,作者: tkrafael

我有相同的“泄漏”问题,并解决了:

  • 第一次读取使用主令牌(如果我没有要处理的物品,请等到一个到达)
  • 所有其余项目必须在x毫秒中读取

这样,由于超时取消令牌,我将永远无法获得空的读物(好的,在关闭应用程序时可能是一个空的读数),当项目从渠道的作者到达时,我会得到正确的行为。

内部 concellationTokenSource 在消耗批处理中的第一个元素后立即安排取消计时器以取消。

/// <summary>
/// Reads all of the data from the channel in batches, enforcing a maximum
/// interval policy between consuming an item and emitting it in a batch.
/// </summary>
public static IAsyncEnumerable<T[]> ReadAllBatches<T>(
    this ChannelReader<T> source, int batchSize, TimeSpan timeSpan)
{
    ArgumentNullException.ThrowIfNull(source);
    if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize));
    if (timeSpan < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(timeSpan));
    return Implementation();

    async IAsyncEnumerable<T[]> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        CancellationTokenSource timerCts = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);
        try
        {
            List<T> buffer = new();
            while (true)
            {
                CancellationToken token = buffer.Count == 0 ?
                    cancellationToken : timerCts.Token;
                (T Value, bool HasValue) item;
                try
                {
                    item = (await source.ReadAsync(token).ConfigureAwait(false), true);
                }
                catch (ChannelClosedException) { break; }
                catch (OperationCanceledException)
                {
                    if (cancellationToken.IsCancellationRequested) break;
                    // Timeout occurred.
                    Debug.Assert(timerCts.IsCancellationRequested);
                    Debug.Assert(buffer.Count > 0);
                    item = default;
                }
                if (buffer.Count == 0) timerCts.CancelAfter(timeSpan);
                if (item.HasValue)
                {
                    buffer.Add(item.Value);
                    if (buffer.Count < batchSize) continue;
                }
                yield return buffer.ToArray();
                buffer.Clear();
                if (!timerCts.TryReset())
                {
                    timerCts.Dispose();
                    timerCts = CancellationTokenSource
                        .CreateLinkedTokenSource(cancellationToken);
                }
            }
            // Emit what's left before throwing exceptions.
            if (buffer.Count > 0) yield return buffer.ToArray();

            cancellationToken.ThrowIfCancellationRequested();

            // Propagate possible failure of the channel.
            if (source.Completion.IsCompleted)
                await source.Completion.ConfigureAwait(false);
        }
        finally { timerCts.Dispose(); }
    }
}

用法示例:

await foreach (Item[] batch in myChannel.Reader
    .ReadAllBatches(10, TimeSpan.FromSeconds(5)))
{
    Console.WriteLine($"Processing batch of {batch.Length} items");
}

此实现是无损的,这意味着没有从渠道消耗的任何项目都有丢失的危险。如果枚举被取消或通道故障,则在错误传播之前,将在最终批次中发出任何消耗的项目。

注意:以防源 ChannelReader&lt; t&gt; 是在取消 cancellationToken 的同时完成的,取消,取消优先于完成。这与 ChannelReader的所有本机方法&lt; t&gt; ChannelWriter&lt; t&gt; 类都是相同的行为。这意味着即使所有工作已经完成,也可以抛出操作cancanceledexception

Below is an implementation of an idea that was posted on GitHub, by tkrafael.

I had the same "leak" issue and resolved by:

  • First read uses main token (If I have no items to handle, just wait until one arrives)
  • All the remaining items must be read in x milliseconds

This way I will never get an empty read due to timeout cancellation token (ok, maybe one empty read when application is being shutdown) and I get correct behaviour when items arrives from channel's writer.

The internal CancellationTokenSource is scheduled with a timer for cancellation, immediately after consuming the first element in the batch.

/// <summary>
/// Reads all of the data from the channel in batches, enforcing a maximum
/// interval policy between consuming an item and emitting it in a batch.
/// </summary>
public static IAsyncEnumerable<T[]> ReadAllBatches<T>(
    this ChannelReader<T> source, int batchSize, TimeSpan timeSpan)
{
    ArgumentNullException.ThrowIfNull(source);
    if (batchSize < 1) throw new ArgumentOutOfRangeException(nameof(batchSize));
    if (timeSpan < TimeSpan.Zero)
        throw new ArgumentOutOfRangeException(nameof(timeSpan));
    return Implementation();

    async IAsyncEnumerable<T[]> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        CancellationTokenSource timerCts = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);
        try
        {
            List<T> buffer = new();
            while (true)
            {
                CancellationToken token = buffer.Count == 0 ?
                    cancellationToken : timerCts.Token;
                (T Value, bool HasValue) item;
                try
                {
                    item = (await source.ReadAsync(token).ConfigureAwait(false), true);
                }
                catch (ChannelClosedException) { break; }
                catch (OperationCanceledException)
                {
                    if (cancellationToken.IsCancellationRequested) break;
                    // Timeout occurred.
                    Debug.Assert(timerCts.IsCancellationRequested);
                    Debug.Assert(buffer.Count > 0);
                    item = default;
                }
                if (buffer.Count == 0) timerCts.CancelAfter(timeSpan);
                if (item.HasValue)
                {
                    buffer.Add(item.Value);
                    if (buffer.Count < batchSize) continue;
                }
                yield return buffer.ToArray();
                buffer.Clear();
                if (!timerCts.TryReset())
                {
                    timerCts.Dispose();
                    timerCts = CancellationTokenSource
                        .CreateLinkedTokenSource(cancellationToken);
                }
            }
            // Emit what's left before throwing exceptions.
            if (buffer.Count > 0) yield return buffer.ToArray();

            cancellationToken.ThrowIfCancellationRequested();

            // Propagate possible failure of the channel.
            if (source.Completion.IsCompleted)
                await source.Completion.ConfigureAwait(false);
        }
        finally { timerCts.Dispose(); }
    }
}

Usage example:

await foreach (Item[] batch in myChannel.Reader
    .ReadAllBatches(10, TimeSpan.FromSeconds(5)))
{
    Console.WriteLine(
quot;Processing batch of {batch.Length} items");
}

This implementation is non-destructive, meaning that no items that have been consumed from the channel are in danger of being lost. In case the enumeration is canceled or the channel is faulted, any consumed items will be emitted in a final batch, before the propagation of the error.

Note: In case the source ChannelReader<T> is completed at the same time that the cancellationToken is canceled, the cancellation has precedence over completion. This is the same behavior with all native methods of the ChannelReader<T> and ChannelWriter<T> classes. It means that it's possible (although rare) for an OperationCanceledException to be thrown, even in case all the work has completed.

栩栩如生 2025-02-06 16:51:37

我创建了一个自定义Logger提供商,该提供商将消息发送到Kafka,这就是我实现缓冲通道的方式。

public delegate Task BufferedChannelFlushHandler<in TEvent>(IReadOnlyList<TEvent> batch, CancellationToken cancellationToken);

public class BufferedChannel<TEvent>
where TEvent : notnull
{
    private readonly Channel<TEvent> _channel;
    private readonly BufferedChannelOptions _options;
    private readonly BufferedChannelFlushHandler<TEvent> _flushHandler;

    public ChannelWriter<TEvent> Writer => _channel.Writer;

    public ChannelReader<TEvent> Reader => _channel.Reader;

    public BufferedChannel(
        BufferedChannelOptions options,
        BufferedChannelFlushHandler<TEvent> flushHandler)
    {
        ThrowHelper.ThrowIfNull(nameof(options));
        ThrowHelper.ThrowIfNull(nameof(flushHandler));

        _options = options;

        _channel = Channel.CreateUnbounded<TEvent>(
            new UnboundedChannelOptions
            {
                SingleWriter = false,
                SingleReader = false
            });

        _flushHandler = flushHandler;
    }

    public async ValueTask ConsumeAsync()
    {
        try
        {
            var maxSize = _options!.MaxSize;

            var currentBatch = new List<TEvent>(maxSize);
            var startTime = DateTimeOffset.UtcNow;

            // Reader.Completion is the Task that completes when no more data
            // will ever be available to read from this channel (for example,
            // when the writer is completed).

            while (
                await Reader.WaitToReadAsync().ConfigureAwait(false)
                && Reader.Completion.Status != TaskStatus.RanToCompletion
            )
            {
                var item = await Reader.ReadAsync().ConfigureAwait(false);

                if (item is not null)
                {
                    currentBatch.Add(item);
                }

                if (currentBatch.Count >= maxSize || IsPastMaxLifetime(startTime))
                {
                    await FlushBufferAsync().ConfigureAwait(false);
                }
            }

            if (currentBatch.Count > 0)
            {
                await FlushBufferAsync().ConfigureAwait(false);
            }

            async ValueTask FlushBufferAsync()
            {
                var batch = currentBatch.ToArray();
                await _flushHandler(batch, default).ConfigureAwait(false);
                currentBatch.Clear();
                startTime = DateTimeOffset.UtcNow;

                Debug.Assert(batch.Length > 0, "Should not be affected when currentBatch is cleared");
            }
        }
        catch (Exception ex)
        {
            Debug.WriteLine($"[Consumer]: {ex.Message}");
            throw;
        }
    }

    private bool IsPastMaxLifetime(DateTimeOffset startTime) =>
        startTime.Add(_options!.MaxLifetime) < DateTimeOffset.UtcNow;
}

public class BufferedChannelOptions
{
    public int MaxSize { get; set; } = 1_000;

    public TimeSpan MaxLifetime { get; set; } = TimeSpan.FromSeconds(5);
}

然后,您只需调用 overeasync ,它将不断循环将数据冲洗到外部源(例如kafka)。

I created a custom logger provider that sends messages to Kafka and this is how I implemented the buffer channel.

public delegate Task BufferedChannelFlushHandler<in TEvent>(IReadOnlyList<TEvent> batch, CancellationToken cancellationToken);

public class BufferedChannel<TEvent>
where TEvent : notnull
{
    private readonly Channel<TEvent> _channel;
    private readonly BufferedChannelOptions _options;
    private readonly BufferedChannelFlushHandler<TEvent> _flushHandler;

    public ChannelWriter<TEvent> Writer => _channel.Writer;

    public ChannelReader<TEvent> Reader => _channel.Reader;

    public BufferedChannel(
        BufferedChannelOptions options,
        BufferedChannelFlushHandler<TEvent> flushHandler)
    {
        ThrowHelper.ThrowIfNull(nameof(options));
        ThrowHelper.ThrowIfNull(nameof(flushHandler));

        _options = options;

        _channel = Channel.CreateUnbounded<TEvent>(
            new UnboundedChannelOptions
            {
                SingleWriter = false,
                SingleReader = false
            });

        _flushHandler = flushHandler;
    }

    public async ValueTask ConsumeAsync()
    {
        try
        {
            var maxSize = _options!.MaxSize;

            var currentBatch = new List<TEvent>(maxSize);
            var startTime = DateTimeOffset.UtcNow;

            // Reader.Completion is the Task that completes when no more data
            // will ever be available to read from this channel (for example,
            // when the writer is completed).

            while (
                await Reader.WaitToReadAsync().ConfigureAwait(false)
                && Reader.Completion.Status != TaskStatus.RanToCompletion
            )
            {
                var item = await Reader.ReadAsync().ConfigureAwait(false);

                if (item is not null)
                {
                    currentBatch.Add(item);
                }

                if (currentBatch.Count >= maxSize || IsPastMaxLifetime(startTime))
                {
                    await FlushBufferAsync().ConfigureAwait(false);
                }
            }

            if (currentBatch.Count > 0)
            {
                await FlushBufferAsync().ConfigureAwait(false);
            }

            async ValueTask FlushBufferAsync()
            {
                var batch = currentBatch.ToArray();
                await _flushHandler(batch, default).ConfigureAwait(false);
                currentBatch.Clear();
                startTime = DateTimeOffset.UtcNow;

                Debug.Assert(batch.Length > 0, "Should not be affected when currentBatch is cleared");
            }
        }
        catch (Exception ex)
        {
            Debug.WriteLine(
quot;[Consumer]: {ex.Message}");
            throw;
        }
    }

    private bool IsPastMaxLifetime(DateTimeOffset startTime) =>
        startTime.Add(_options!.MaxLifetime) < DateTimeOffset.UtcNow;
}

public class BufferedChannelOptions
{
    public int MaxSize { get; set; } = 1_000;

    public TimeSpan MaxLifetime { get; set; } = TimeSpan.FromSeconds(5);
}

Then you just call the ConsumeAsync and it'll continually loop to flush the data to external source (e.g. Kafka).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文