我正在使用a ; 在生产者 - 消费者场景中,我需要分别以10个项目的批量消耗渠道,而无需让任何消费的物品以缓冲区的空闲时间超过5秒钟。此持续时间是从通道读取项目和处理包含此项目的批次之间允许的最大延迟。最大的延迟策略优先于批量大小策略,因此即使使用少于10个项目也应处理批次,以满足最大延迟要求。
我能够以 readallBatches
public static async IAsyncEnumerable<T[]> ReadAllBatches<T>(
this ChannelReader<T> channelReader, int batchSize)
{
List<T> buffer = new();
while (true)
{
T item;
try { item = await channelReader.ReadAsync(); }
catch (ChannelClosedException) { break; }
buffer.Add(item);
if (buffer.Count == batchSize)
{
yield return buffer.ToArray();
buffer.Clear();
}
}
if (buffer.Count > 0) yield return buffer.ToArray();
await channelReader.Completion; // Propagate possible failure
}
await foreach (Item[] batch in myChannel.Reader.ReadAllBatches(10))
{
Console.WriteLine($"Processing batch of {batch.Length} items");
}
/strong>如何增强我的 readallBatches&lt; t&gt;
实现了其他 timespan timepan timeout
参数,该参数可以执行上述最大延迟策略,而无需将第三方软件包安装到以上我的项目?
重要:所请求的实施不应容易受到报道的记忆泄漏问题的影响此处。因此,消耗通道的循环不应导致应用程序使用的内存的稳定增量,以防撰写频道中的项目的生产商已延长了一段时间的闲置。
注意:我知道A ,在连续批次之间执行最大的间隔策略吗? /en-us/dotnet/api/system.collections.generic.iasyncenumoser-1“ rel =“ nofollow noreferrer”> iasyncencenumerable&lt&lt&lt; 。由于性能原因,我对一种直接针对 ChannelReader&lt; t&gt; t&gt; t&gt; t&gt; t&gt; t&gt;
的方法感兴趣。
I am using a Channel<T>
in a producer-consumer scenario, and I have the requirement to consume the channel in batches of 10 items each, and without letting any consumed item to stay idle in a buffer for more than 5 seconds. This duration is the maximum latency allowed between reading an item from the channel, and processing a batch that contains this item. The maximum latency policy has precedence over the batch size policy, so a batch should be processed even with fewer than 10 items, in order to satisfy the max-latency requirement.
I was able to implement the first requirement, in the form of a ReadAllBatches
extension method for the ChannelReader<T>
class:
public static async IAsyncEnumerable<T[]> ReadAllBatches<T>(
this ChannelReader<T> channelReader, int batchSize)
{
List<T> buffer = new();
while (true)
{
T item;
try { item = await channelReader.ReadAsync(); }
catch (ChannelClosedException) { break; }
buffer.Add(item);
if (buffer.Count == batchSize)
{
yield return buffer.ToArray();
buffer.Clear();
}
}
if (buffer.Count > 0) yield return buffer.ToArray();
await channelReader.Completion; // Propagate possible failure
}
I am planning to use it like this:
await foreach (Item[] batch in myChannel.Reader.ReadAllBatches(10))
{
Console.WriteLine(quot;Processing batch of {batch.Length} items");
}
My question is: how can I enhance my ReadAllBatches<T>
implementation with an additional TimeSpan timeout
parameter, that enforces the aforementioned maximum latency policy, and without installing third-party packages to my project?
Important: The requested implementation should not be susceptible to the memory leak issue that has been reported here. So the loop that consumes the channel should not cause the steady increment of the memory used by the application, in case the producer that writes the items in the channel has become idle for a prolonged period of time.
Note: I am aware that a similar question exists regarding batching the IAsyncEnumerable<T>
interface, but I am not interested to that. I am interested for a method that targets directly the ChannelReader<T>
type, for performance reasons.
发布评论
评论(2)
以下是发布“>在github上,作者: tkrafael 。
内部
concellationTokenSource
在消耗批处理中的第一个元素后立即安排取消计时器以取消。用法示例:
此实现是无损的,这意味着没有从渠道消耗的任何项目都有丢失的危险。如果枚举被取消或通道故障,则在错误传播之前,将在最终批次中发出任何消耗的项目。
注意:以防源
ChannelReader&lt; t&gt;
是在取消cancellationToken
的同时完成的,取消,取消优先于完成。这与ChannelReader的所有本机方法&lt; t&gt;
和ChannelWriter&lt; t&gt;
类都是相同的行为。这意味着即使所有工作已经完成,也可以抛出操作cancanceledexception
。Below is an implementation of an idea that was posted on GitHub, by tkrafael.
The internal
CancellationTokenSource
is scheduled with a timer for cancellation, immediately after consuming the first element in the batch.Usage example:
This implementation is non-destructive, meaning that no items that have been consumed from the channel are in danger of being lost. In case the enumeration is canceled or the channel is faulted, any consumed items will be emitted in a final batch, before the propagation of the error.
Note: In case the source
ChannelReader<T>
is completed at the same time that thecancellationToken
is canceled, the cancellation has precedence over completion. This is the same behavior with all native methods of theChannelReader<T>
andChannelWriter<T>
classes. It means that it's possible (although rare) for anOperationCanceledException
to be thrown, even in case all the work has completed.我创建了一个自定义Logger提供商,该提供商将消息发送到Kafka,这就是我实现缓冲通道的方式。
然后,您只需调用
overeasync
,它将不断循环将数据冲洗到外部源(例如kafka)。I created a custom logger provider that sends messages to Kafka and this is how I implemented the buffer channel.
Then you just call the
ConsumeAsync
and it'll continually loop to flush the data to external source (e.g. Kafka).