将功能移出类
我想要这个 Web 套接字包装类中包含超时逻辑 .TimeoutAfter(...)
,因此它是可定制的。我不会粘贴整个类,因为不需要它,但可以在 GitHub。
await _webSocket.ConnectAsync(_url, CancellationToken.None).TimeoutAfter(OpenTimeoutMs).ConfigureAwait(false);
public static class TaskExtensions
{
public static async Task TimeoutAfter(this Task task, int timeoutMs)
{
if (await Task.WhenAny(task, Task.Delay(timeoutMs)) != task)
{
throw new TimeoutException($"Operation timed out after {timeoutMs} ms");
}
}
}
我的想法
最好使用 Func
吗?
public Func<Task> DoTimeout { get; set; }
await _webSocket.ConnectAsync(_url, CancellationToken.None).DoTimeout().ConfigureAwait(false);
Web 套接字包装器
using System;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Serilog;
using Serilog.Events;
namespace DeribitDotNet.Utils
{
public class ResilientWebSocket : IDisposable
{
private const int CloseTimeoutMs = 5000;
private const int OpenTimeoutMs = 5000;
private const int SendTimeoutMs = 5000;
private const int RetryOpenDelayMs = 5000;
private readonly AsyncManualResetEvent _connectionWaitHandle = new AsyncManualResetEvent();
private ClientWebSocket _webSocket;
private int _reconnecting;
private bool _disposed;
private readonly Uri _url;
private byte[] _writeBuffer = new byte[4096];
private byte[] _readBuffer = new byte[262144];
public bool IsConnected => _connectionWaitHandle.WaitTask().IsCompleted;
public event EventHandler Connected;
public event EventHandler Closed;
public event EventHandler<ValueTuple<string, DateTime>> MessageReceived;
public ResilientWebSocket(string url) => _url = new Uri(url);
public async ValueTask Initialise()
{
if (_disposed) throw new ObjectDisposedException("ResilientWebSocket", "Object has been disposed");
await Reconnect().ConfigureAwait(false);
}
public async Task Send(string message)
{
if (Log.IsEnabled(LogEventLevel.Verbose))
{
Log.Verbose($"Sending message: {message}");
}
var errorCounter = 0;
do
{
if (_disposed) throw new ObjectDisposedException("ResilientWebSocket", "Object has been disposed");
try
{
int length;
unsafe
{
fixed (char* pMessage = message)
fixed (byte* pBuffer = _writeBuffer)
{
// Will throw if buffer is too small
length = Encoding.Default.GetBytes(pMessage, message.Length, pBuffer, _writeBuffer.Length);
}
}
if (errorCounter > 0)
{
Log.Debug($"About to send length: {length} Message: {message}");
}
var sendTask = _webSocket.SendAsync(new ArraySegment<byte>(_writeBuffer, 0, length), WebSocketMessageType.Text, true,
CancellationToken.None);
if (errorCounter > 0)
{
Log.Debug($"Send task: {sendTask}. About to await with timeout");
}
await sendTask.TimeoutAfter(SendTimeoutMs).ConfigureAwait(false);
if (errorCounter > 0)
{
Log.Debug($"Awaited Send successfully");
}
break;
}
catch (ArgumentException ae) when (ae.ParamName == "bytes")
{
Log.Warning($"Write buffer size ({_writeBuffer.Length}) too small for message. Doubling..");
_writeBuffer = new byte[_writeBuffer.Length * 2];
}
catch (Exception e)
{
Log.Error(e, $"Error writing message {message} to web socket");
if (_webSocket == null)
{
Log.Debug("Websocket was null. Initiating reconnection");
await Reconnect().ConfigureAwait(false);
}
if (errorCounter++ >= 10)
{
Log.Debug($"Error counter has reached 10. Reconnecting..");
await Reconnect().ConfigureAwait(false);
}
await Task.Delay(200).ConfigureAwait(false);
}
} while (true);
}
public Task Reset()
{
if (_disposed) return Task.CompletedTask;
if (!IsConnected)
{
Log.Debug("Reset() called when web socket is not connected. Ignoring.");
return _connectionWaitHandle.WaitTask();
}
Log.Information("Resetting websocket connection");
return Reconnect();
}
public Task ConnectionReadyAsync() => _connectionWaitHandle.WaitTask();
public void Dispose()
{
if (!_disposed)
{
_disposed = true;
Log.Debug("Disposing web socket");
EnsureClosed().Wait();
}
}
private async Task Reconnect()
{
if (_disposed) return;
if (Interlocked.CompareExchange(ref _reconnecting, 1, 0) == 0)
{
Log.Debug($"Initiating web socket reconnection to {_url}");
try
{
_connectionWaitHandle.Reset();
while (!IsConnected)
{
try
{
await EnsureClosed().ConfigureAwait(false);
_webSocket = new ClientWebSocket
{
Options = {KeepAliveInterval = TimeSpan.FromSeconds(30)}
};
Log.Information($"Opening connection to {_url}");
await _webSocket.ConnectAsync(_url, CancellationToken.None).TimeoutAfter(OpenTimeoutMs).ConfigureAwait(false);
Log.Debug("Web socket opened");
_connectionWaitHandle.Set();
_ = Task.Run(() => ReceiveLoop(_webSocket));
}
catch (Exception e)
{
Log.Error(e, $"Error reconnecting to {_url}");
}
if (!IsConnected)
{
Log.Information("Connection unsuccessful. Sleeping for 5 seconds before retrying..");
await Task.Delay(RetryOpenDelayMs).ConfigureAwait(false);
}
}
}
finally
{
_reconnecting = 0;
}
try
{
Connected?.Invoke(this, EventArgs.Empty);
}
catch (Exception ex)
{
Log.Error(ex, "Error in connection opened event handler");
}
}
else
{
Log.Debug("Reconnection already in progress. Ignoring Reconnect request");
}
}
private async void ReceiveLoop(WebSocket webSocket)
{
try
{
var offset = 0;
while (webSocket != null && webSocket.State != WebSocketState.Closed && webSocket.State != WebSocketState.Aborted && !_disposed)
{
var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(_readBuffer, offset, _readBuffer.Length - offset),
CancellationToken.None).ConfigureAwait(false);
var dateTime = DateTime.UtcNow;
if (result.MessageType == WebSocketMessageType.Close)
{
await EnsureClosed().ConfigureAwait(false);
}
else if (result.CloseStatus == null)
{
if (result.EndOfMessage)
{
var message = Encoding.Default.GetString(_readBuffer, 0, offset + result.Count);
if (Log.IsEnabled(LogEventLevel.Verbose))
{
Log.Verbose($"Message received: {message}");
}
try
{
MessageReceived?.Invoke(this, (message, dateTime));
}
catch (Exception e)
{
Log.Error(e, "Error in message received event handler");
}
offset = 0;
}
else
{
offset += result.Count;
if (offset >= _readBuffer.Length - 1)
{
Log.Warning($"Read buffer size ({_readBuffer.Length}) too small for message. Doubling..");
var newReadBuffer = new byte[_readBuffer.Length * 2];
Array.Copy(_readBuffer, newReadBuffer, offset);
_readBuffer = newReadBuffer;
}
}
}
else
{
Log.Debug($"Web socket closed. Status: {result.CloseStatus} Reason: {result.CloseStatusDescription}");
Closed?.Invoke(this, EventArgs.Empty);
}
}
}
catch (Exception e)
{
Log.Error(e, "Web socket error during receive");
}
await Reconnect().ConfigureAwait(false);
}
private async Task EnsureClosed()
{
try
{
if (_webSocket != null && _webSocket.State != WebSocketState.Aborted && _webSocket.State != WebSocketState.CloseReceived && _webSocket.State != WebSocketState.Closed)
{
Log.Debug($"Closing web socket in state {_webSocket.State}");
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None).TimeoutAfter(CloseTimeoutMs);
}
}
catch (Exception e)
{
Log.Error(e, "Error closing web socket");
}
_webSocket = null;
}
}
}
I want to have the timeout logic .TimeoutAfter(...)
out of this web socket wrapper class, so it's customizable. I won't paste the whole class because it's not needed, but it can be found on GitHub.
await _webSocket.ConnectAsync(_url, CancellationToken.None).TimeoutAfter(OpenTimeoutMs).ConfigureAwait(false);
public static class TaskExtensions
{
public static async Task TimeoutAfter(this Task task, int timeoutMs)
{
if (await Task.WhenAny(task, Task.Delay(timeoutMs)) != task)
{
throw new TimeoutException(quot;Operation timed out after {timeoutMs} ms");
}
}
}
My thoughts
Is the best to use Func<something>
?
public Func<Task> DoTimeout { get; set; }
await _webSocket.ConnectAsync(_url, CancellationToken.None).DoTimeout().ConfigureAwait(false);
The web socket wrapper
using System;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Serilog;
using Serilog.Events;
namespace DeribitDotNet.Utils
{
public class ResilientWebSocket : IDisposable
{
private const int CloseTimeoutMs = 5000;
private const int OpenTimeoutMs = 5000;
private const int SendTimeoutMs = 5000;
private const int RetryOpenDelayMs = 5000;
private readonly AsyncManualResetEvent _connectionWaitHandle = new AsyncManualResetEvent();
private ClientWebSocket _webSocket;
private int _reconnecting;
private bool _disposed;
private readonly Uri _url;
private byte[] _writeBuffer = new byte[4096];
private byte[] _readBuffer = new byte[262144];
public bool IsConnected => _connectionWaitHandle.WaitTask().IsCompleted;
public event EventHandler Connected;
public event EventHandler Closed;
public event EventHandler<ValueTuple<string, DateTime>> MessageReceived;
public ResilientWebSocket(string url) => _url = new Uri(url);
public async ValueTask Initialise()
{
if (_disposed) throw new ObjectDisposedException("ResilientWebSocket", "Object has been disposed");
await Reconnect().ConfigureAwait(false);
}
public async Task Send(string message)
{
if (Log.IsEnabled(LogEventLevel.Verbose))
{
Log.Verbose(quot;Sending message: {message}");
}
var errorCounter = 0;
do
{
if (_disposed) throw new ObjectDisposedException("ResilientWebSocket", "Object has been disposed");
try
{
int length;
unsafe
{
fixed (char* pMessage = message)
fixed (byte* pBuffer = _writeBuffer)
{
// Will throw if buffer is too small
length = Encoding.Default.GetBytes(pMessage, message.Length, pBuffer, _writeBuffer.Length);
}
}
if (errorCounter > 0)
{
Log.Debug(quot;About to send length: {length} Message: {message}");
}
var sendTask = _webSocket.SendAsync(new ArraySegment<byte>(_writeBuffer, 0, length), WebSocketMessageType.Text, true,
CancellationToken.None);
if (errorCounter > 0)
{
Log.Debug(quot;Send task: {sendTask}. About to await with timeout");
}
await sendTask.TimeoutAfter(SendTimeoutMs).ConfigureAwait(false);
if (errorCounter > 0)
{
Log.Debug(quot;Awaited Send successfully");
}
break;
}
catch (ArgumentException ae) when (ae.ParamName == "bytes")
{
Log.Warning(quot;Write buffer size ({_writeBuffer.Length}) too small for message. Doubling..");
_writeBuffer = new byte[_writeBuffer.Length * 2];
}
catch (Exception e)
{
Log.Error(e, quot;Error writing message {message} to web socket");
if (_webSocket == null)
{
Log.Debug("Websocket was null. Initiating reconnection");
await Reconnect().ConfigureAwait(false);
}
if (errorCounter++ >= 10)
{
Log.Debug(quot;Error counter has reached 10. Reconnecting..");
await Reconnect().ConfigureAwait(false);
}
await Task.Delay(200).ConfigureAwait(false);
}
} while (true);
}
public Task Reset()
{
if (_disposed) return Task.CompletedTask;
if (!IsConnected)
{
Log.Debug("Reset() called when web socket is not connected. Ignoring.");
return _connectionWaitHandle.WaitTask();
}
Log.Information("Resetting websocket connection");
return Reconnect();
}
public Task ConnectionReadyAsync() => _connectionWaitHandle.WaitTask();
public void Dispose()
{
if (!_disposed)
{
_disposed = true;
Log.Debug("Disposing web socket");
EnsureClosed().Wait();
}
}
private async Task Reconnect()
{
if (_disposed) return;
if (Interlocked.CompareExchange(ref _reconnecting, 1, 0) == 0)
{
Log.Debug(quot;Initiating web socket reconnection to {_url}");
try
{
_connectionWaitHandle.Reset();
while (!IsConnected)
{
try
{
await EnsureClosed().ConfigureAwait(false);
_webSocket = new ClientWebSocket
{
Options = {KeepAliveInterval = TimeSpan.FromSeconds(30)}
};
Log.Information(quot;Opening connection to {_url}");
await _webSocket.ConnectAsync(_url, CancellationToken.None).TimeoutAfter(OpenTimeoutMs).ConfigureAwait(false);
Log.Debug("Web socket opened");
_connectionWaitHandle.Set();
_ = Task.Run(() => ReceiveLoop(_webSocket));
}
catch (Exception e)
{
Log.Error(e, quot;Error reconnecting to {_url}");
}
if (!IsConnected)
{
Log.Information("Connection unsuccessful. Sleeping for 5 seconds before retrying..");
await Task.Delay(RetryOpenDelayMs).ConfigureAwait(false);
}
}
}
finally
{
_reconnecting = 0;
}
try
{
Connected?.Invoke(this, EventArgs.Empty);
}
catch (Exception ex)
{
Log.Error(ex, "Error in connection opened event handler");
}
}
else
{
Log.Debug("Reconnection already in progress. Ignoring Reconnect request");
}
}
private async void ReceiveLoop(WebSocket webSocket)
{
try
{
var offset = 0;
while (webSocket != null && webSocket.State != WebSocketState.Closed && webSocket.State != WebSocketState.Aborted && !_disposed)
{
var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(_readBuffer, offset, _readBuffer.Length - offset),
CancellationToken.None).ConfigureAwait(false);
var dateTime = DateTime.UtcNow;
if (result.MessageType == WebSocketMessageType.Close)
{
await EnsureClosed().ConfigureAwait(false);
}
else if (result.CloseStatus == null)
{
if (result.EndOfMessage)
{
var message = Encoding.Default.GetString(_readBuffer, 0, offset + result.Count);
if (Log.IsEnabled(LogEventLevel.Verbose))
{
Log.Verbose(quot;Message received: {message}");
}
try
{
MessageReceived?.Invoke(this, (message, dateTime));
}
catch (Exception e)
{
Log.Error(e, "Error in message received event handler");
}
offset = 0;
}
else
{
offset += result.Count;
if (offset >= _readBuffer.Length - 1)
{
Log.Warning(quot;Read buffer size ({_readBuffer.Length}) too small for message. Doubling..");
var newReadBuffer = new byte[_readBuffer.Length * 2];
Array.Copy(_readBuffer, newReadBuffer, offset);
_readBuffer = newReadBuffer;
}
}
}
else
{
Log.Debug(quot;Web socket closed. Status: {result.CloseStatus} Reason: {result.CloseStatusDescription}");
Closed?.Invoke(this, EventArgs.Empty);
}
}
}
catch (Exception e)
{
Log.Error(e, "Web socket error during receive");
}
await Reconnect().ConfigureAwait(false);
}
private async Task EnsureClosed()
{
try
{
if (_webSocket != null && _webSocket.State != WebSocketState.Aborted && _webSocket.State != WebSocketState.CloseReceived && _webSocket.State != WebSocketState.Closed)
{
Log.Debug(quot;Closing web socket in state {_webSocket.State}");
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", CancellationToken.None).TimeoutAfter(CloseTimeoutMs);
}
}
catch (Exception e)
{
Log.Error(e, "Error closing web socket");
}
_webSocket = null;
}
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我不确定我是否完全理解您的用例,但我会尝试一下。也许您可以在这里尝试模板设计模式,因为在我看来您只想自定义超时行为而不是调用/执行顺序。虽然我可能没有正确理解这个问题。
使用委托或 Func 类型也可以,但根据我的经验,它经常会让开发人员感到困惑,特别是当逻辑变得更加复杂时。
I'm not sure that I fully understand your use case, but I'll give it a shot. Perhaps you can try the Template design pattern here, as it looks to me that you only want to customize the time-out behavior and not the call/execution order. Though I may not be understanding the problem correctly.
Using delegates or Func types can also work, but in my experience it often confuses developers down the line, especially if the logic gets more complicated.