将sendasync调用限制为每秒5条消息
我正在实施Binance的API。
WebSocket连接的限制为每秒5个传入消息。考虑一条消息:
- ping框架
- 乒乓球框架
- JSON受控消息(例如订阅,退订)
for ex。有一个简单的Web套接字包装器,例如官方 binance Connector 。根据上述限制,sendAsync
应每秒限制5条消息。如果几个线程同时调用SendAsync 5次(包括内置的clientwebsocket类),则将失败。我该如何优雅地解决这个问题?使用有界通道是一个解决方案吗?
public class BinanceWebSocket : IDisposable
{
private IBinanceWebSocketHandler handler;
private List<Func<string, Task>> onMessageReceivedFunctions;
private List<CancellationTokenRegistration> onMessageReceivedCancellationTokenRegistrations;
private CancellationTokenSource loopCancellationTokenSource;
private Uri url;
private int receiveBufferSize;
public BinanceWebSocket(IBinanceWebSocketHandler handler, string url, int receiveBufferSize = 8192)
{
this.handler = handler;
this.url = new Uri(url);
this.receiveBufferSize = receiveBufferSize;
this.onMessageReceivedFunctions = new List<Func<string, Task>>();
this.onMessageReceivedCancellationTokenRegistrations = new List<CancellationTokenRegistration>();
}
public async Task ConnectAsync(CancellationToken cancellationToken)
{
if (this.handler.State != WebSocketState.Open)
{
this.loopCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
await this.handler.ConnectAsync(this.url, cancellationToken);
await Task.Factory.StartNew(() => this.ReceiveLoop(loopCancellationTokenSource.Token, this.receiveBufferSize), loopCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
}
public async Task DisconnectAsync(CancellationToken cancellationToken)
{
if (this.loopCancellationTokenSource != null)
{
this.loopCancellationTokenSource.Cancel();
}
if (this.handler.State == WebSocketState.Open)
{
await this.handler.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
await this.handler.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
}
}
public void OnMessageReceived(Func<string, Task> onMessageReceived, CancellationToken cancellationToken)
{
this.onMessageReceivedFunctions.Add(onMessageReceived);
if (cancellationToken != CancellationToken.None)
{
var reg = cancellationToken.Register(() =>
this.onMessageReceivedFunctions.Remove(onMessageReceived));
this.onMessageReceivedCancellationTokenRegistrations.Add(reg);
}
}
public async Task SendAsync(string message, CancellationToken cancellationToken)
{
byte[] byteArray = Encoding.ASCII.GetBytes(message);
await this.handler.SendAsync(new ArraySegment<byte>(byteArray), WebSocketMessageType.Text, true, cancellationToken);
}
public void Dispose()
{
this.DisconnectAsync(CancellationToken.None).Wait();
this.handler.Dispose();
this.onMessageReceivedCancellationTokenRegistrations.ForEach(ct => ct.Dispose());
this.loopCancellationTokenSource.Dispose();
}
private async Task ReceiveLoop(CancellationToken cancellationToken, int receiveBufferSize = 8192)
{
WebSocketReceiveResult receiveResult = null;
try
{
while (!cancellationToken.IsCancellationRequested)
{
var buffer = new ArraySegment<byte>(new byte[receiveBufferSize]);
receiveResult = await this.handler.ReceiveAsync(buffer, cancellationToken);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
break;
}
string content = Encoding.UTF8.GetString(buffer.ToArray());
this.onMessageReceivedFunctions.ForEach(omrf => omrf(content));
}
}
catch (TaskCanceledException)
{
await this.DisconnectAsync(CancellationToken.None);
}
}
}
我不是100%确定它解决
sendAsync
的第二种方法是在循环中使用频道调用。 Singlereader
设置为true,这意味着一次只有一个消费者。从技术上讲,它应该解决这个问题,但我不是100%确定的,因为该频道可能只会限制缓冲区中的数量。
private readonly Channel<string> _messagesTextToSendQueue = Channel.CreateUnbounded<string>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false
});
public ValueTask SendAsync(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));
return _messagesTextToSendQueue.Writer.WriteAsync(message);
}
public void Send(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));
_messagesTextToSendQueue.Writer.TryWrite(message);
}
private async Task SendTextFromQueue()
{
try
{
while (await _messagesTextToSendQueue.Reader.WaitToReadAsync())
{
while (_messagesTextToSendQueue.Reader.TryRead(out var message))
{
try
{
await SendInternalSynchronized(message).ConfigureAwait(false);
}
catch (Exception e)
{
Logger.Error(e, L($"Failed to send text message: '{message}'. Error: {e.Message}"));
}
}
}
}
catch (TaskCanceledException)
{
// task was canceled, ignore
}
catch (OperationCanceledException)
{
// operation was canceled, ignore
}
catch (Exception e)
{
if (_cancellationTotal.IsCancellationRequested || _disposing)
{
// disposing/canceling, do nothing and exit
return;
}
Logger.Trace(L($"Sending text thread failed, error: {e.Message}. Creating a new sending thread."));
StartBackgroundThreadForSendingText();
}
}
I'm implementing Binance's API.
The documentation says:
WebSocket connections have a limit of 5 incoming messages per second. A message is considered:
- A PING frame
- A PONG frame
- A JSON controlled message (e.g. subscribe, unsubscribe)
For ex. there is a simple web socket wrapper such as the one from the official Binance Connector. According to the limitation above, SendAsync
should be restricted 5 messages per second. If a few threads call SendAsync 5 times at the same time (including PING frame which is built-in the ClientWebSocket class), it's going to fail. How can I solve the issue with that limitation gracefully? Using bounded channels is a solution?
public class BinanceWebSocket : IDisposable
{
private IBinanceWebSocketHandler handler;
private List<Func<string, Task>> onMessageReceivedFunctions;
private List<CancellationTokenRegistration> onMessageReceivedCancellationTokenRegistrations;
private CancellationTokenSource loopCancellationTokenSource;
private Uri url;
private int receiveBufferSize;
public BinanceWebSocket(IBinanceWebSocketHandler handler, string url, int receiveBufferSize = 8192)
{
this.handler = handler;
this.url = new Uri(url);
this.receiveBufferSize = receiveBufferSize;
this.onMessageReceivedFunctions = new List<Func<string, Task>>();
this.onMessageReceivedCancellationTokenRegistrations = new List<CancellationTokenRegistration>();
}
public async Task ConnectAsync(CancellationToken cancellationToken)
{
if (this.handler.State != WebSocketState.Open)
{
this.loopCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
await this.handler.ConnectAsync(this.url, cancellationToken);
await Task.Factory.StartNew(() => this.ReceiveLoop(loopCancellationTokenSource.Token, this.receiveBufferSize), loopCancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
}
public async Task DisconnectAsync(CancellationToken cancellationToken)
{
if (this.loopCancellationTokenSource != null)
{
this.loopCancellationTokenSource.Cancel();
}
if (this.handler.State == WebSocketState.Open)
{
await this.handler.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
await this.handler.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
}
}
public void OnMessageReceived(Func<string, Task> onMessageReceived, CancellationToken cancellationToken)
{
this.onMessageReceivedFunctions.Add(onMessageReceived);
if (cancellationToken != CancellationToken.None)
{
var reg = cancellationToken.Register(() =>
this.onMessageReceivedFunctions.Remove(onMessageReceived));
this.onMessageReceivedCancellationTokenRegistrations.Add(reg);
}
}
public async Task SendAsync(string message, CancellationToken cancellationToken)
{
byte[] byteArray = Encoding.ASCII.GetBytes(message);
await this.handler.SendAsync(new ArraySegment<byte>(byteArray), WebSocketMessageType.Text, true, cancellationToken);
}
public void Dispose()
{
this.DisconnectAsync(CancellationToken.None).Wait();
this.handler.Dispose();
this.onMessageReceivedCancellationTokenRegistrations.ForEach(ct => ct.Dispose());
this.loopCancellationTokenSource.Dispose();
}
private async Task ReceiveLoop(CancellationToken cancellationToken, int receiveBufferSize = 8192)
{
WebSocketReceiveResult receiveResult = null;
try
{
while (!cancellationToken.IsCancellationRequested)
{
var buffer = new ArraySegment<byte>(new byte[receiveBufferSize]);
receiveResult = await this.handler.ReceiveAsync(buffer, cancellationToken);
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
break;
}
string content = Encoding.UTF8.GetString(buffer.ToArray());
this.onMessageReceivedFunctions.ForEach(omrf => omrf(content));
}
}
catch (TaskCanceledException)
{
await this.DisconnectAsync(CancellationToken.None);
}
}
}
Second way which I'm not 100% sure it solves it
SendAsync
is being called in a loop using Channels. SingleReader
is set to true, which means there will be only one consumer at a time. It technically should solve the issue, but I'm not 100% sure because the channel might only be limiting the amount in the buffer.
private readonly Channel<string> _messagesTextToSendQueue = Channel.CreateUnbounded<string>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = false
});
public ValueTask SendAsync(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));
return _messagesTextToSendQueue.Writer.WriteAsync(message);
}
public void Send(string message)
{
Validations.Validations.ValidateInput(message, nameof(message));
_messagesTextToSendQueue.Writer.TryWrite(message);
}
private async Task SendTextFromQueue()
{
try
{
while (await _messagesTextToSendQueue.Reader.WaitToReadAsync())
{
while (_messagesTextToSendQueue.Reader.TryRead(out var message))
{
try
{
await SendInternalSynchronized(message).ConfigureAwait(false);
}
catch (Exception e)
{
Logger.Error(e, L(quot;Failed to send text message: '{message}'. Error: {e.Message}"));
}
}
}
}
catch (TaskCanceledException)
{
// task was canceled, ignore
}
catch (OperationCanceledException)
{
// operation was canceled, ignore
}
catch (Exception e)
{
if (_cancellationTotal.IsCancellationRequested || _disposing)
{
// disposing/canceling, do nothing and exit
return;
}
Logger.Trace(L(quot;Sending text thread failed, error: {e.Message}. Creating a new sending thread."));
StartBackgroundThreadForSendingText();
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我会尽量保持尽可能简单,并使用信号量较小为了实现这一目标,我创建了一个类来执行此任务。
现在要使用此类,您要做的就是设置您的限制和时间板,
因此基本上会发生的事情是,无论您的列表有多大,theTrottlinglimiter都只会每秒发送5条消息,并等待下一个秒接下来的5条消息。
因此,在您的情况下,将所有数据从您的呼叫中获取将
其存储到列表或任何集合中,然后将其传递给Senddata功能。
I would try to keep it as simple as possible and use Semaphore Slim to achieve this, I have created a class to perform this task.
Now to Use this class, all you have to do is set your limit and timeSpan
so basically what will happen here is, no matter how big your list is, the ThrottlingLimiter will only send 5 messages per second and wait for the next second to send the next 5 messages.
so, in your case, get all the data from your call to
store that into a list or any collection and pass that to the SendData function.