将sendasync调用限制为每秒5条消息

发布于 2025-02-03 02:02:25 字数 6546 浏览 5 评论 0原文

我正在实施Binance的API。

说:

WebSocket连接的限制为每秒5个传入消息。考虑一条消息:

  • ping框架
  • 乒乓球框架
  • JSON受控消息(例如订阅,退订)

for ex。有一个简单的Web套接字包装器,例如官方 binance Connector 。根据上述限制,sendAsync应每秒限制5条消息。如果几个线程同时调用SendAsync 5次(包括内置的clientwebsocket类),则将失败。我该如何优雅地解决这个问题?使用有界通道是一个解决方案吗?

public class BinanceWebSocket : IDisposable
{
    private IBinanceWebSocketHandler handler;
    private List<Func<string, Task>> onMessageReceivedFunctions;
    private List<CancellationTokenRegistration> onMessageReceivedCancellationTokenRegistrations;
    private CancellationTokenSource loopCancellationTokenSource;
    private Uri url;
    private int receiveBufferSize;

    public BinanceWebSocket(IBinanceWebSocketHandler handler, string url, int receiveBufferSize = 8192)
    {
        this.handler = handler;
        this.url = new Uri(url);
        this.receiveBufferSize = receiveBufferSize;
        this.onMessageReceivedFunctions = new List<Func<string, Task>>();
        this.onMessageReceivedCancellationTokenRegistrations = new List<CancellationTokenRegistration>();
    }

    public async Task ConnectAsync(CancellationToken cancellationToken)
    {
        if (this.handler.State != WebSocketState.Open)
        {
            this.loopCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            await this.handler.ConnectAsync(this.url, cancellationToken);
            await Task.Factory.StartNew(() => this.ReceiveLoop(loopCancellationTokenSource.Token, this.receiveBufferSize), loopCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }
    }

    public async Task DisconnectAsync(CancellationToken cancellationToken)
    {
        if (this.loopCancellationTokenSource != null)
        {
            this.loopCancellationTokenSource.Cancel();
        }
        if (this.handler.State == WebSocketState.Open)
        {
            await this.handler.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
            await this.handler.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
        }
    }

    public void OnMessageReceived(Func<string, Task> onMessageReceived, CancellationToken cancellationToken)
    {
        this.onMessageReceivedFunctions.Add(onMessageReceived);

        if (cancellationToken != CancellationToken.None)
        {
            var reg = cancellationToken.Register(() =>
                this.onMessageReceivedFunctions.Remove(onMessageReceived));

            this.onMessageReceivedCancellationTokenRegistrations.Add(reg);
        }
    }

    public async Task SendAsync(string message, CancellationToken cancellationToken)
    {
        byte[] byteArray = Encoding.ASCII.GetBytes(message);

        await this.handler.SendAsync(new ArraySegment<byte>(byteArray), WebSocketMessageType.Text, true, cancellationToken);
    }

    public void Dispose()
    {
        this.DisconnectAsync(CancellationToken.None).Wait();

        this.handler.Dispose();

        this.onMessageReceivedCancellationTokenRegistrations.ForEach(ct => ct.Dispose());

        this.loopCancellationTokenSource.Dispose();
    }

    private async Task ReceiveLoop(CancellationToken cancellationToken, int receiveBufferSize = 8192)
    {
        WebSocketReceiveResult receiveResult = null;
        try
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                var buffer = new ArraySegment<byte>(new byte[receiveBufferSize]);
                receiveResult = await this.handler.ReceiveAsync(buffer, cancellationToken);

                if (receiveResult.MessageType == WebSocketMessageType.Close)
                {
                    break;
                }
                string content = Encoding.UTF8.GetString(buffer.ToArray());
                this.onMessageReceivedFunctions.ForEach(omrf => omrf(content));
            }
        }
        catch (TaskCanceledException)
        {
            await this.DisconnectAsync(CancellationToken.None);
        }
    }
}

我不是100%确定它解决

sendAsync的第二种方法是在循环中使用频道调用。 Singlereader设置为true,这意味着一次只有一个消费者。从技术上讲,它应该解决这个问题,但我不是100%确定的,因为该频道可能只会限制缓冲区中的数量。

private readonly Channel<string> _messagesTextToSendQueue = Channel.CreateUnbounded<string>(new UnboundedChannelOptions()
{
    SingleReader = true,
    SingleWriter = false
});

public ValueTask SendAsync(string message)
{
    Validations.Validations.ValidateInput(message, nameof(message));

    return _messagesTextToSendQueue.Writer.WriteAsync(message);
}

public void Send(string message)
{
    Validations.Validations.ValidateInput(message, nameof(message));

    _messagesTextToSendQueue.Writer.TryWrite(message);
}

private async Task SendTextFromQueue()
{
    try
    {
        while (await _messagesTextToSendQueue.Reader.WaitToReadAsync())
        {
            while (_messagesTextToSendQueue.Reader.TryRead(out var message))
            {
                try
                {
                    await SendInternalSynchronized(message).ConfigureAwait(false);
                }
                catch (Exception e)
                {
                    Logger.Error(e, L($"Failed to send text message: '{message}'. Error: {e.Message}"));
                }
            }
        }
    }
    catch (TaskCanceledException)
    {
        // task was canceled, ignore
    }
    catch (OperationCanceledException)
    {
        // operation was canceled, ignore
    }
    catch (Exception e)
    {
        if (_cancellationTotal.IsCancellationRequested || _disposing)
        {
            // disposing/canceling, do nothing and exit
            return;
        }

        Logger.Trace(L($"Sending text thread failed, error: {e.Message}. Creating a new sending thread."));
        StartBackgroundThreadForSendingText();
    }
}

I'm implementing Binance's API.

The documentation says:

WebSocket connections have a limit of 5 incoming messages per second. A message is considered:

  • A PING frame
  • A PONG frame
  • A JSON controlled message (e.g. subscribe, unsubscribe)

For ex. there is a simple web socket wrapper such as the one from the official Binance Connector. According to the limitation above, SendAsync should be restricted 5 messages per second. If a few threads call SendAsync 5 times at the same time (including PING frame which is built-in the ClientWebSocket class), it's going to fail. How can I solve the issue with that limitation gracefully? Using bounded channels is a solution?

public class BinanceWebSocket : IDisposable
{
    private IBinanceWebSocketHandler handler;
    private List<Func<string, Task>> onMessageReceivedFunctions;
    private List<CancellationTokenRegistration> onMessageReceivedCancellationTokenRegistrations;
    private CancellationTokenSource loopCancellationTokenSource;
    private Uri url;
    private int receiveBufferSize;

    public BinanceWebSocket(IBinanceWebSocketHandler handler, string url, int receiveBufferSize = 8192)
    {
        this.handler = handler;
        this.url = new Uri(url);
        this.receiveBufferSize = receiveBufferSize;
        this.onMessageReceivedFunctions = new List<Func<string, Task>>();
        this.onMessageReceivedCancellationTokenRegistrations = new List<CancellationTokenRegistration>();
    }

    public async Task ConnectAsync(CancellationToken cancellationToken)
    {
        if (this.handler.State != WebSocketState.Open)
        {
            this.loopCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            await this.handler.ConnectAsync(this.url, cancellationToken);
            await Task.Factory.StartNew(() => this.ReceiveLoop(loopCancellationTokenSource.Token, this.receiveBufferSize), loopCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
        }
    }

    public async Task DisconnectAsync(CancellationToken cancellationToken)
    {
        if (this.loopCancellationTokenSource != null)
        {
            this.loopCancellationTokenSource.Cancel();
        }
        if (this.handler.State == WebSocketState.Open)
        {
            await this.handler.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
            await this.handler.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
        }
    }

    public void OnMessageReceived(Func<string, Task> onMessageReceived, CancellationToken cancellationToken)
    {
        this.onMessageReceivedFunctions.Add(onMessageReceived);

        if (cancellationToken != CancellationToken.None)
        {
            var reg = cancellationToken.Register(() =>
                this.onMessageReceivedFunctions.Remove(onMessageReceived));

            this.onMessageReceivedCancellationTokenRegistrations.Add(reg);
        }
    }

    public async Task SendAsync(string message, CancellationToken cancellationToken)
    {
        byte[] byteArray = Encoding.ASCII.GetBytes(message);

        await this.handler.SendAsync(new ArraySegment<byte>(byteArray), WebSocketMessageType.Text, true, cancellationToken);
    }

    public void Dispose()
    {
        this.DisconnectAsync(CancellationToken.None).Wait();

        this.handler.Dispose();

        this.onMessageReceivedCancellationTokenRegistrations.ForEach(ct => ct.Dispose());

        this.loopCancellationTokenSource.Dispose();
    }

    private async Task ReceiveLoop(CancellationToken cancellationToken, int receiveBufferSize = 8192)
    {
        WebSocketReceiveResult receiveResult = null;
        try
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                var buffer = new ArraySegment<byte>(new byte[receiveBufferSize]);
                receiveResult = await this.handler.ReceiveAsync(buffer, cancellationToken);

                if (receiveResult.MessageType == WebSocketMessageType.Close)
                {
                    break;
                }
                string content = Encoding.UTF8.GetString(buffer.ToArray());
                this.onMessageReceivedFunctions.ForEach(omrf => omrf(content));
            }
        }
        catch (TaskCanceledException)
        {
            await this.DisconnectAsync(CancellationToken.None);
        }
    }
}

Second way which I'm not 100% sure it solves it

SendAsync is being called in a loop using Channels. SingleReader is set to true, which means there will be only one consumer at a time. It technically should solve the issue, but I'm not 100% sure because the channel might only be limiting the amount in the buffer.

private readonly Channel<string> _messagesTextToSendQueue = Channel.CreateUnbounded<string>(new UnboundedChannelOptions()
{
    SingleReader = true,
    SingleWriter = false
});

public ValueTask SendAsync(string message)
{
    Validations.Validations.ValidateInput(message, nameof(message));

    return _messagesTextToSendQueue.Writer.WriteAsync(message);
}

public void Send(string message)
{
    Validations.Validations.ValidateInput(message, nameof(message));

    _messagesTextToSendQueue.Writer.TryWrite(message);
}

private async Task SendTextFromQueue()
{
    try
    {
        while (await _messagesTextToSendQueue.Reader.WaitToReadAsync())
        {
            while (_messagesTextToSendQueue.Reader.TryRead(out var message))
            {
                try
                {
                    await SendInternalSynchronized(message).ConfigureAwait(false);
                }
                catch (Exception e)
                {
                    Logger.Error(e, L(
quot;Failed to send text message: '{message}'. Error: {e.Message}"));
                }
            }
        }
    }
    catch (TaskCanceledException)
    {
        // task was canceled, ignore
    }
    catch (OperationCanceledException)
    {
        // operation was canceled, ignore
    }
    catch (Exception e)
    {
        if (_cancellationTotal.IsCancellationRequested || _disposing)
        {
            // disposing/canceling, do nothing and exit
            return;
        }

        Logger.Trace(L(
quot;Sending text thread failed, error: {e.Message}. Creating a new sending thread."));
        StartBackgroundThreadForSendingText();
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

青衫儰鉨ミ守葔 2025-02-10 02:02:25

我会尽量保持尽可能简单,并使用信号量较小为了实现这一目标,我创建了一个类来执行此任务。

public class ThrottlingLimiter
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;

    public ThrottlingLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        if (maxActionsPerTimeUnit < 1)
            throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));

        if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > int.MaxValue)
            throw new ArgumentOutOfRangeException(nameof(timeUnit));

        _semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
        _timeUnit = timeUnit;
    }

    public async Task WaitAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        ScheduleSemaphoreRelease();
    }

    private async void ScheduleSemaphoreRelease()
    {
        await Task.Delay(_timeUnit).ConfigureAwait(false);
        _semaphore.Release();
    }
}

现在要使用此类,您要做的就是设置您的限制和时间板,

 public async Task SendData(List<string> allMessages)
 {
     // Limiting 5 calls per second
     ThrottlingLimiter throttlingLimiter = new ThrottlingLimiter(5, TimeSpan.FromSeconds(1));

     await Task.WhenAll(allMessages.Select(async message =>
     {
        await throttlingLimiter.WaitAsync();

        try {
            await SendInternalSynchronized(message);
            // I am not sure what this SendInternalSynchronized returns but I would return some thing to keep a track if this call is successful or not
        }
        catch (Exception e)
        {
           Logger.Error(e, L($"Failed to send text message: {message}'. Error: {e.Message}"));
        }
      });
 }

因此基本上会发生的事情是,无论您的列表有多大,theTrottlinglimiter都只会每秒发送5条消息,并等待下一个秒接下来的5条消息。

因此,在您的情况下,将所有数据从您的呼叫中获取将

 await _messagesTextToSendQueue.Reader.WaitToReadAsync();

其存储到列表或任何集合中,然后将其传递给Senddata功能。

I would try to keep it as simple as possible and use Semaphore Slim to achieve this, I have created a class to perform this task.

public class ThrottlingLimiter
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;

    public ThrottlingLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        if (maxActionsPerTimeUnit < 1)
            throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));

        if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > int.MaxValue)
            throw new ArgumentOutOfRangeException(nameof(timeUnit));

        _semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
        _timeUnit = timeUnit;
    }

    public async Task WaitAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        ScheduleSemaphoreRelease();
    }

    private async void ScheduleSemaphoreRelease()
    {
        await Task.Delay(_timeUnit).ConfigureAwait(false);
        _semaphore.Release();
    }
}

Now to Use this class, all you have to do is set your limit and timeSpan

 public async Task SendData(List<string> allMessages)
 {
     // Limiting 5 calls per second
     ThrottlingLimiter throttlingLimiter = new ThrottlingLimiter(5, TimeSpan.FromSeconds(1));

     await Task.WhenAll(allMessages.Select(async message =>
     {
        await throttlingLimiter.WaitAsync();

        try {
            await SendInternalSynchronized(message);
            // I am not sure what this SendInternalSynchronized returns but I would return some thing to keep a track if this call is successful or not
        }
        catch (Exception e)
        {
           Logger.Error(e, L(
quot;Failed to send text message: {message}'. Error: {e.Message}"));
        }
      });
 }

so basically what will happen here is, no matter how big your list is, the ThrottlingLimiter will only send 5 messages per second and wait for the next second to send the next 5 messages.

so, in your case, get all the data from your call to

 await _messagesTextToSendQueue.Reader.WaitToReadAsync();

store that into a list or any collection and pass that to the SendData function.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文