如何懒洋洋地对iAsyncencencention进行分区?

发布于 2025-01-31 18:43:08 字数 814 浏览 3 评论 0 原文

我有一个iAsyncencenumerable,它返回本质上是键/iEnumerable< value> pairs的序列。我有消耗此和其他类似枚举的代码,假设它将收到唯一的钥匙集合。但是我的数据源之一不遵守这一约束。但是,它确实将重复的键组合在一起。 (您不会看到[ k1 k2 k1 ]。)

这应该很简单,可以用包装器解决数据,以分区数据通过键并串联值,除了我在 system.linq.async 中没有看到任何可用的分区操作员。有 groupby tolookup ,但是这两个都是急切的操作员,可以立即消耗整个序列。由于涉及大量数据,这不适合我的目的。

是否有任何简单的方法可以分区 iaSyncenumerable 类似于 groupby ,根据键选择器对输入进行分组,但是在键时保持其行为完全懒惰并按需生成新的分组变化?

编辑:我希望查看Morelinq是否有类似的东西,并找到了 groupAdjacent ,但是代码显示,尽管它不会热切地消耗整个输入序列,但它仍会在开始新组时热切地消耗整个组时,团体。我正在寻找一种可以在其分组中返回懒惰的方法。听起来比听起来还要棘手!

I have an IAsyncEnumerable that returns what is essentially a sequence of Key/IEnumerable<Value> pairs. I have code consuming this and other similar enumerables, that assumes it will be receiving a unique collection of keys. But one of my data sources does not obey this constraint. It does, however, keep duplicate keys grouped together. (You won't see [k1, k2, k1].)

This should be fairly simple to resolve with a wrapper that partitions the data by key and concatenates the values, except that I don't see any usable partitioning operator in System.Linq.Async. There are GroupBy and ToLookup, but both of these are eager operators that will consume the entire sequence immediately. This is not suitable for my purposes, due to large amounts of data being involved.

Is there any simple way to partition an IAsyncEnumerable similar to GroupBy, grouping inputs according to a key selector, but keeping its behavior fully lazy and generating new groupings on demand when the key changes?

EDIT: I looked to see if MoreLINQ has anything like this, and found GroupAdjacent, but the code shows that, while it does not eagerly consume the entire input sequence, it will still eagerly consume the entire group when starting a new group. I'm looking for a method that will return a lazy enumerable in its groupings. It's trickier than it sounds!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

对你的占有欲 2025-02-07 18:43:08

这是异步序列的 groupAdjacent 操作符,类似于 morelinq 包装的差异,不会缓冲发射分组的元素。预计分组将以正确的顺序充分列举,一次分组,否则将丢弃 InvalidOperationException

此实现需要包装 system.linq.async 该实施 iaSyncGrouping&lt; out tkey,out telement&gt; 接口。

/// <summary>
/// Groups the adjacent elements of a sequence according to a specified
/// key selector function.
/// </summary>
/// <remarks>
/// The groups don't contain buffered elements.
/// Enumerating the groups in the correct order is mandatory.
/// </remarks>
public static IAsyncEnumerable<IAsyncGrouping<TKey, TSource>>
    GroupAdjacent<TSource, TKey>(
        this IAsyncEnumerable<TSource> source,
        Func<TSource, TKey> keySelector,
        IEqualityComparer<TKey> keyComparer = null)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(keySelector);
    keyComparer ??= EqualityComparer<TKey>.Default;
    return Implementation();

    async IAsyncEnumerable<IAsyncGrouping<TKey, TSource>> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        Tuple<TSource, TKey, bool> sharedState = null;
        var enumerator = source.GetAsyncEnumerator(cancellationToken);
        try
        {
            if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
                yield break;
            var firstItem = enumerator.Current;
            var firstKey = keySelector(firstItem);
            sharedState = new(firstItem, firstKey, true);

            Tuple<TSource, TKey, bool> previousState = null;
            while (true)
            {
                var state = Volatile.Read(ref sharedState);
                if (ReferenceEquals(state, previousState))
                    throw new InvalidOperationException("Out of order enumeration.");
                var (item, key, exists) = state;
                if (!exists) yield break;
                previousState = state;
                yield return new AsyncGrouping<TKey, TSource>(key, GetAdjacent(state));
            }
        }
        finally { await enumerator.DisposeAsync().ConfigureAwait(false); }

        async IAsyncEnumerable<TSource> GetAdjacent(Tuple<TSource, TKey, bool> state)
        {
            if (!ReferenceEquals(Volatile.Read(ref sharedState), state))
                throw new InvalidOperationException("Out of order enumeration.");
            var (stateItem, stateKey, stateExists) = state;
            Debug.Assert(stateExists);
            yield return stateItem;
            Tuple<TSource, TKey, bool> nextState;
            while (true)
            {
                if (!ReferenceEquals(Volatile.Read(ref sharedState), state))
                    throw new InvalidOperationException("Out of order enumeration.");
                if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
                {
                    nextState = new(default, default, false);
                    break;
                }
                var item = enumerator.Current;
                var key = keySelector(item);
                if (!keyComparer.Equals(key, stateKey))
                {
                    nextState = new(item, key, true);
                    break;
                }
                yield return item;
            }
            if (!ReferenceEquals(Interlocked.CompareExchange(
                ref sharedState, nextState, state), state))
                throw new InvalidOperationException("Out of order enumeration.");
        }
    }
}

private class AsyncGrouping<TKey, TElement> : IAsyncGrouping<TKey, TElement>
{
    private readonly TKey _key;
    private readonly IAsyncEnumerable<TElement> _sequence;

    public AsyncGrouping(TKey key, IAsyncEnumerable<TElement> sequence)
    {
        _key = key;
        _sequence = sequence;
    }

    public TKey Key => _key;

    public IAsyncEnumerator<TElement> GetAsyncEnumerator(
        CancellationToken cancellationToken = default)
    {
        return _sequence.GetAsyncEnumerator(cancellationToken);
    }
}

用法示例:

IAsyncEnumerable<IGrouping<string, double>> source = //...

IAsyncEnumerable<IAsyncGrouping<string, double>> merged = source
    .GroupAdjacent(g => g.Key)
    .Select(gg => new AsyncGrouping<string, double>(
        gg.Key, gg.Select(g => g.ToAsyncEnumerable()).Concat()));

此示例以包含分组的序列开头,目标是将任何具有相同键的相邻分组组合到包含其所有元素的单个异步分组的相同键。应用 groupAdjacent(g =&gt; g.key)运算符我们获得此类型:

IAsyncEnumerable<IAsyncGrouping<string, IGrouping<string, double>>>

因此,在此阶段,每个异步分组都包含内部分组,而不是各个元素。我们需要 concat 此嵌套结构以获取我们想要的东西。 concat 运算符存在于软件包,它具有此签名:

public static IAsyncEnumerable<TSource> Concat<TSource>(
    this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources);

toasyncenumerable operator(System.linq.async)已连接到同步内部分组,以满足此签名。

Here is a GroupAdjacent operator for asynchronous sequences, similar to the synonymous operator of the MoreLinq package, with the difference that it doesn't buffer the elements of the emitted groupings. The groupings are expected to be enumerated fully, in the correct order, one grouping at a time, otherwise an InvalidOperationException will be thrown.

This implementation requires the package System.Linq.Async, because it emits groupings that implement the IAsyncGrouping<out TKey, out TElement> interface.

/// <summary>
/// Groups the adjacent elements of a sequence according to a specified
/// key selector function.
/// </summary>
/// <remarks>
/// The groups don't contain buffered elements.
/// Enumerating the groups in the correct order is mandatory.
/// </remarks>
public static IAsyncEnumerable<IAsyncGrouping<TKey, TSource>>
    GroupAdjacent<TSource, TKey>(
        this IAsyncEnumerable<TSource> source,
        Func<TSource, TKey> keySelector,
        IEqualityComparer<TKey> keyComparer = null)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(keySelector);
    keyComparer ??= EqualityComparer<TKey>.Default;
    return Implementation();

    async IAsyncEnumerable<IAsyncGrouping<TKey, TSource>> Implementation(
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        Tuple<TSource, TKey, bool> sharedState = null;
        var enumerator = source.GetAsyncEnumerator(cancellationToken);
        try
        {
            if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
                yield break;
            var firstItem = enumerator.Current;
            var firstKey = keySelector(firstItem);
            sharedState = new(firstItem, firstKey, true);

            Tuple<TSource, TKey, bool> previousState = null;
            while (true)
            {
                var state = Volatile.Read(ref sharedState);
                if (ReferenceEquals(state, previousState))
                    throw new InvalidOperationException("Out of order enumeration.");
                var (item, key, exists) = state;
                if (!exists) yield break;
                previousState = state;
                yield return new AsyncGrouping<TKey, TSource>(key, GetAdjacent(state));
            }
        }
        finally { await enumerator.DisposeAsync().ConfigureAwait(false); }

        async IAsyncEnumerable<TSource> GetAdjacent(Tuple<TSource, TKey, bool> state)
        {
            if (!ReferenceEquals(Volatile.Read(ref sharedState), state))
                throw new InvalidOperationException("Out of order enumeration.");
            var (stateItem, stateKey, stateExists) = state;
            Debug.Assert(stateExists);
            yield return stateItem;
            Tuple<TSource, TKey, bool> nextState;
            while (true)
            {
                if (!ReferenceEquals(Volatile.Read(ref sharedState), state))
                    throw new InvalidOperationException("Out of order enumeration.");
                if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
                {
                    nextState = new(default, default, false);
                    break;
                }
                var item = enumerator.Current;
                var key = keySelector(item);
                if (!keyComparer.Equals(key, stateKey))
                {
                    nextState = new(item, key, true);
                    break;
                }
                yield return item;
            }
            if (!ReferenceEquals(Interlocked.CompareExchange(
                ref sharedState, nextState, state), state))
                throw new InvalidOperationException("Out of order enumeration.");
        }
    }
}

private class AsyncGrouping<TKey, TElement> : IAsyncGrouping<TKey, TElement>
{
    private readonly TKey _key;
    private readonly IAsyncEnumerable<TElement> _sequence;

    public AsyncGrouping(TKey key, IAsyncEnumerable<TElement> sequence)
    {
        _key = key;
        _sequence = sequence;
    }

    public TKey Key => _key;

    public IAsyncEnumerator<TElement> GetAsyncEnumerator(
        CancellationToken cancellationToken = default)
    {
        return _sequence.GetAsyncEnumerator(cancellationToken);
    }
}

Usage example:

IAsyncEnumerable<IGrouping<string, double>> source = //...

IAsyncEnumerable<IAsyncGrouping<string, double>> merged = source
    .GroupAdjacent(g => g.Key)
    .Select(gg => new AsyncGrouping<string, double>(
        gg.Key, gg.Select(g => g.ToAsyncEnumerable()).Concat()));

This example starts with a sequence that contains groupings, and the goal is to combine any adjacent groupings that have the same key to a single asynchronous grouping that contains all of their elements. After applying the GroupAdjacent(g => g.Key) operator we get this type:

IAsyncEnumerable<IAsyncGrouping<string, IGrouping<string, double>>>

So in this phase each asynchronous grouping contains inner groupings, not individual elements. We need to Concat this nested structure in order to get what we want. The Concat operator exists in the System.Interactive.Async package, and it has this signature:

public static IAsyncEnumerable<TSource> Concat<TSource>(
    this IAsyncEnumerable<IAsyncEnumerable<TSource>> sources);

The ToAsyncEnumerable operator (System.Linq.Async) is attached to the synchronous inner groupings, in order to satisfy this signature.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文