如何在自定义ReadOnlySequence实现结束时修剪零?
Web插座服务器期望JSON字符串。 _websocket.sendasync
发送 readonlymemory< byte>
。问题是 sequence.first
具有落后零。由于尾随的零,它导致无效的JSON消息。问题是我如何修剪它们?
使用
var request = new JsonRpcRequest<object>
{
JsonRpc = "2.0",
Id = 1,
Method = "public/get_instruments",
Params = @params
};
var message = JsonSerializer.Serialize(request);
using var buffer = MemoryPool<byte>.Shared.Rent(Encoding.UTF8.GetByteCount(message));
Encoding.UTF8.GetEncoder().Convert(message, buffer.Memory.Span, true, out _, out _, out _);
var seq = new OwnedMemorySequence<byte>();
seq.Append(buffer);
var msg = new ChannelWebSocket.Message
{
MessageType = WebSocketMessageType.Text,
Payload = seq
};
await client.Output.WriteAsync(msg).ConfigureAwait(false);
代码
private async Task OutputLoopAsync(CancellationToken cancellationToken)
{
await foreach (var message in _output.Reader.ReadAllAsync())
{
var sequence = message.Payload.ReadOnlySequence;
if (sequence.IsEmpty)
continue;
while (!sequence.IsSingleSegment)
{
await _webSocket.SendAsync(sequence.First, message.MessageType, false, cancellationToken);
sequence = sequence.Slice(sequence.First.Length);
}
await _webSocket.SendAsync(sequence.First, message.MessageType, true, cancellationToken);
message.Payload.Dispose();
}
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
}
public sealed class Message
{
public WebSocketMessageType MessageType { get; set; }
public OwnedMemorySequence<byte> Payload { get; set; } = null!;
}
public sealed class OwnedMemorySequence<T> : IDisposable
{
private readonly CollectionDisposable _disposable = new();
private readonly MemorySequence<T> _sequence = new();
public ReadOnlySequence<T> ReadOnlySequence => _sequence.ReadOnlySequence;
public OwnedMemorySequence<T> Append(IMemoryOwner<T> memoryOwner)
{
_disposable.Add(memoryOwner);
_sequence.Append(memoryOwner.Memory);
return this;
}
public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
{
return _sequence.CreateReadOnlySequence(firstBufferStartIndex, lastBufferEndIndex);
}
public void Dispose()
{
_disposable.Dispose();
}
}
public static class MemoryOwnerSliceExtensions
{
public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start, int length)
{
if (start == 0 && length == owner.Memory.Length)
return owner;
return new SliceOwner<T>(owner, start, length);
}
public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> owner, int start)
{
if (start == 0)
return owner;
return new SliceOwner<T>(owner, start);
}
private sealed class SliceOwner<T> : IMemoryOwner<T>
{
private readonly IMemoryOwner<T> _owner;
public SliceOwner(IMemoryOwner<T> owner, int start, int length)
{
_owner = owner;
Memory = _owner.Memory.Slice(start, length);
}
public SliceOwner(IMemoryOwner<T> owner, int start)
{
_owner = owner;
Memory = _owner.Memory[start..];
}
public Memory<T> Memory { get; }
public void Dispose()
{
_owner.Dispose();
}
}
}
public sealed class MemorySequence<T>
{
private MemorySegment? _head;
private MemorySegment? _tail;
public ReadOnlySequence<T> ReadOnlySequence => CreateReadOnlySequence(0, _tail?.Memory.Length ?? 0);
public MemorySequence<T> Append(ReadOnlyMemory<T> buffer)
{
if (_tail == null)
_head = _tail = new MemorySegment(buffer, 0);
else
_tail = _tail.Append(buffer);
return this;
}
public ReadOnlySequence<T> CreateReadOnlySequence(int firstBufferStartIndex, int lastBufferEndIndex)
{
return _tail == null ? new ReadOnlySequence<T>(Array.Empty<T>()) : new ReadOnlySequence<T>(_head!, firstBufferStartIndex, _tail, lastBufferEndIndex);
}
private sealed class MemorySegment : ReadOnlySequenceSegment<T>
{
public MemorySegment(ReadOnlyMemory<T> memory, long runningIndex)
{
Memory = memory;
RunningIndex = runningIndex;
}
public MemorySegment Append(ReadOnlyMemory<T> nextMemory)
{
var next = new MemorySegment(nextMemory, RunningIndex + Memory.Length);
Next = next;
return next;
}
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您不需要修剪。您只需要相应地切成记忆。
请注意
MemoryPool&lt; t&gt; .rent
(的报价)返回” [...]一个能够持有的内存块,至少 minbuffersize t。<< /em>“这里的重要位是“ 至少 ”,这意味着允许返回的内存大于请求的大小。您需要做的就是,如果将其添加到 onsedmemorysequence&lt; t&gt; _ seporence 成员之前,请从内存中创建所需大小的切片。
You don't need to trim. You just need to slice your memory accordingly.
Pay attention to
MemoryPool<T>.Rent
(quote from the doc) returning "[...] a memory block capable capable of holding at least minBufferSize elements of T." The important bit here is "at least", meaning the returned memory is allowed to be larger than the requested size.All you need to do is to create a slice of the requested size from the memory if it happens to be larger than the requested size before adding it to OwnedMemorySequence<T>_sequence member .