如何在自定义ReadOnlySequence实现结束时修剪零?

发布于 2025-01-22 17:20:35 字数 5015 浏览 0 评论 0 原文

Web插座服务器期望JSON字符串。 _websocket.sendasync 发送 readonlymemory< byte> 。问题是 sequence.first 具有落后零。由于尾随的零,它导致无效的JSON消息。问题是我如何修剪它们?

使用

var request = new JsonRpcRequest<object>
{
    JsonRpc = "2.0",
    Id = 1,
    Method = "public/get_instruments",
    Params = @params
};

var message = JsonSerializer.Serialize(request);

using var buffer = MemoryPool<byte>.Shared.Rent(Encoding.UTF8.GetByteCount(message));
Encoding.UTF8.GetEncoder().Convert(message, buffer.Memory.Span, true, out _, out _, out _);

var seq = new OwnedMemorySequence<byte>();
seq.Append(buffer);

var msg = new ChannelWebSocket.Message
{
    MessageType = WebSocketMessageType.Text,
    Payload = seq
};

await client.Output.WriteAsync(msg).ConfigureAwait(false);

代码

private async Task OutputLoopAsync(CancellationToken cancellationToken)
{
    await foreach (var message in _output.Reader.ReadAllAsync())
    {
        var sequence = message.Payload.ReadOnlySequence;
        if (sequence.IsEmpty)
            continue;

        while (!sequence.IsSingleSegment)
        {
            await _webSocket.SendAsync(sequence.First, message.MessageType, false, cancellationToken);
            sequence = sequence.Slice(sequence.First.Length);
        }

        await _webSocket.SendAsync(sequence.First, message.MessageType, true, cancellationToken);
        message.Payload.Dispose();
    }

    await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
}

public sealed class Message
{
    public WebSocketMessageType MessageType { get; set; }
    public OwnedMemorySequence<byte> Payload { get; set; } = null!;
}

public sealed class OwnedMemorySequence<T> : IDisposable
{
    private readonly CollectionDisposable _disposable = new();
    private readonly MemorySequence<T> _sequence = new();

    public ReadOnlySequence<T> ReadOnlySequence => _sequence.ReadOnlySequence;

    public OwnedMemorySequence<T> Append(IMemoryOwner<T> memoryOwner)
    {
        _disposable.Add(memoryOwner);
        _sequence.Append(memoryOwner.Memory);
        return this;
    }

    public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
    {
        return _sequence.CreateReadOnlySequence(firstBufferStartIndex, lastBufferEndIndex);
    }

    public void Dispose()
    {
        _disposable.Dispose();
    }
}

public static class MemoryOwnerSliceExtensions
{
    public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start, int length)
    {
        if (start == 0 && length == owner.Memory.Length)
            return owner;
        return new SliceOwner<T>(owner, start, length);
    }

    public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start)
    {
        if (start == 0)
            return owner;
        return new SliceOwner<T>(owner, start);
    }

    private sealed class SliceOwner<T> : IMemoryOwner<T>
    {
        private readonly IMemoryOwner<T> _owner;

        public SliceOwner(IMemoryOwner<T> owner, int start, int length)
        {
            _owner = owner;
            Memory = _owner.Memory.Slice(start, length);
        }

        public SliceOwner(IMemoryOwner<T> owner, int start)
        {
            _owner = owner;
            Memory = _owner.Memory[start..];
        }

        public Memory<T> Memory { get; }

        public void Dispose()
        {
            _owner.Dispose();
        }
    }
}

public sealed class MemorySequence<T>
{
    private MemorySegment? _head;
    private MemorySegment? _tail;

    public ReadOnlySequence<T> ReadOnlySequence => CreateReadOnlySequence(0, _tail?.Memory.Length ?? 0);

    public MemorySequence<T> Append(ReadOnlyMemory<T> buffer)
    {
        if (_tail == null)
            _head = _tail = new MemorySegment(buffer, 0);
        else
            _tail = _tail.Append(buffer);
        return this;
    }

    public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
    {
        return _tail == null ? new ReadOnlySequence<T>(Array.Empty<T>()) : new ReadOnlySequence<T>(_head!, firstBufferStartIndex, _tail, lastBufferEndIndex);
    }

    private sealed class MemorySegment : ReadOnlySequenceSegment<T>
    {
        public MemorySegment(ReadOnlyMemory<T> memory, long runningIndex)
        {
            Memory = memory;
            RunningIndex = runningIndex;
        }

        public MemorySegment Append(ReadOnlyMemory<T> nextMemory)
        {
            var next = new MemorySegment(nextMemory, RunningIndex + Memory.Length);
            Next = next;
            return next;
        }
    }
}

The web socket server expects JSON strings. _webSocket.SendAsync sends ReadOnlyMemory<byte>. The issue is that sequence.First has trailing zeros. It results in an invalid JSON message because of the trailing zeros. The question is how do I trim them?

Usage

var request = new JsonRpcRequest<object>
{
    JsonRpc = "2.0",
    Id = 1,
    Method = "public/get_instruments",
    Params = @params
};

var message = JsonSerializer.Serialize(request);

using var buffer = MemoryPool<byte>.Shared.Rent(Encoding.UTF8.GetByteCount(message));
Encoding.UTF8.GetEncoder().Convert(message, buffer.Memory.Span, true, out _, out _, out _);

var seq = new OwnedMemorySequence<byte>();
seq.Append(buffer);

var msg = new ChannelWebSocket.Message
{
    MessageType = WebSocketMessageType.Text,
    Payload = seq
};

await client.Output.WriteAsync(msg).ConfigureAwait(false);

Code

private async Task OutputLoopAsync(CancellationToken cancellationToken)
{
    await foreach (var message in _output.Reader.ReadAllAsync())
    {
        var sequence = message.Payload.ReadOnlySequence;
        if (sequence.IsEmpty)
            continue;

        while (!sequence.IsSingleSegment)
        {
            await _webSocket.SendAsync(sequence.First, message.MessageType, false, cancellationToken);
            sequence = sequence.Slice(sequence.First.Length);
        }

        await _webSocket.SendAsync(sequence.First, message.MessageType, true, cancellationToken);
        message.Payload.Dispose();
    }

    await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
}

public sealed class Message
{
    public WebSocketMessageType MessageType { get; set; }
    public OwnedMemorySequence<byte> Payload { get; set; } = null!;
}

public sealed class OwnedMemorySequence<T> : IDisposable
{
    private readonly CollectionDisposable _disposable = new();
    private readonly MemorySequence<T> _sequence = new();

    public ReadOnlySequence<T> ReadOnlySequence => _sequence.ReadOnlySequence;

    public OwnedMemorySequence<T> Append(IMemoryOwner<T> memoryOwner)
    {
        _disposable.Add(memoryOwner);
        _sequence.Append(memoryOwner.Memory);
        return this;
    }

    public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
    {
        return _sequence.CreateReadOnlySequence(firstBufferStartIndex, lastBufferEndIndex);
    }

    public void Dispose()
    {
        _disposable.Dispose();
    }
}

public static class MemoryOwnerSliceExtensions
{
    public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start, int length)
    {
        if (start == 0 && length == owner.Memory.Length)
            return owner;
        return new SliceOwner<T>(owner, start, length);
    }

    public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start)
    {
        if (start == 0)
            return owner;
        return new SliceOwner<T>(owner, start);
    }

    private sealed class SliceOwner<T> : IMemoryOwner<T>
    {
        private readonly IMemoryOwner<T> _owner;

        public SliceOwner(IMemoryOwner<T> owner, int start, int length)
        {
            _owner = owner;
            Memory = _owner.Memory.Slice(start, length);
        }

        public SliceOwner(IMemoryOwner<T> owner, int start)
        {
            _owner = owner;
            Memory = _owner.Memory[start..];
        }

        public Memory<T> Memory { get; }

        public void Dispose()
        {
            _owner.Dispose();
        }
    }
}

public sealed class MemorySequence<T>
{
    private MemorySegment? _head;
    private MemorySegment? _tail;

    public ReadOnlySequence<T> ReadOnlySequence => CreateReadOnlySequence(0, _tail?.Memory.Length ?? 0);

    public MemorySequence<T> Append(ReadOnlyMemory<T> buffer)
    {
        if (_tail == null)
            _head = _tail = new MemorySegment(buffer, 0);
        else
            _tail = _tail.Append(buffer);
        return this;
    }

    public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
    {
        return _tail == null ? new ReadOnlySequence<T>(Array.Empty<T>()) : new ReadOnlySequence<T>(_head!, firstBufferStartIndex, _tail, lastBufferEndIndex);
    }

    private sealed class MemorySegment : ReadOnlySequenceSegment<T>
    {
        public MemorySegment(ReadOnlyMemory<T> memory, long runningIndex)
        {
            Memory = memory;
            RunningIndex = runningIndex;
        }

        public MemorySegment Append(ReadOnlyMemory<T> nextMemory)
        {
            var next = new MemorySegment(nextMemory, RunningIndex + Memory.Length);
            Next = next;
            return next;
        }
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

内心旳酸楚 2025-01-29 17:20:35

您不需要修剪。您只需要相应地切成记忆。

请注意 MemoryPool&lt; t&gt; .rent 的报价)返回” [...]一个能够持有的内存块,至少 minbuffersize t。<< /em>“这里的重要位是“ 至少 ”,这意味着允许返回的内存大于请求的大小。

您需要做的就是,如果将其添加到 onsedmemorysequence&lt; t&gt; _ seporence 成员之前,请从内存中创建所需大小的切片。

You don't need to trim. You just need to slice your memory accordingly.

Pay attention to MemoryPool<T>.Rent (quote from the doc) returning "[...] a memory block capable capable of holding at least minBufferSize elements of T." The important bit here is "at least", meaning the returned memory is allowed to be larger than the requested size.

All you need to do is to create a slice of the requested size from the memory if it happens to be larger than the requested size before adding it to OwnedMemorySequence<T>_sequence member .

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文