#c# #.net-core #websocket
#c# #.net-core #websocket
Вопрос:
Проблема в том, что некоторые сообщения удваиваются, утраиваются и так далее. Потому что каналы чтения и записи не синхронизированы. Как мне это исправить? По сути, это оболочка веб-сокета вокруг ClientWebSocket. Вероятно, мне нужно реализовать некоторую логику разделителя. Я взял большую часть кода из реализации Microsoft SignalR.
Обратите внимание на следующее:
[10:36:47 INF] Message received: {"e":"trade","E":1638952607313,"s":"BNBUSDT","t":481675406,"p":"585.10000000","q":"0.02200000","b":3414633304,"a":3414634268,"T":1638952607313,"m":true,"M":true}
[10:36:47 INF] Message received: {"e":"trade","E":1638952607313,"s":"BNBUSDT","t":481675407,"p":"585.10000000","q":"1.18400000","b":3414633657,"a":3414634268,"T":1638952607313,"m":true,"M":true}
{"e":"trade","E":1638952607313,"s":"BNBUSDT","t":481675408,"p":"585.10000000","q":"0.45200000","b":3414634033,"a":3414634268,"T":1638952607313,"m":true,"M":true}
{"e":"trade","E":1638952607313,"s":"BNBUSDT","t":481675409,"p":"585.10000000","q":"0.76100000","b":3414634061,"a":3414634268,"T":1638952607313,"m":true,"M":true}
[10:36:47 INF] Message received: {"e":"trade","E":1638952607778,"s":"BNBUSDT","t":481675410,"p":"585.20000000","q":"0.03300000","b":3414634286,"a":3414633982,"T":1638952607778,"m":false,"M":true}
[10:36:48 INF] Message received: {"e":"trade","E":1638952608019,"s":"BNBUSDT","t":481675411,"p":"585.20000000","q":"0.01700000","b":3414634290,"a":3414633982,"T":1638952608018,"m":false,"M":true}
{"e":"trade","E":1638952608019,"s":"BNBUSDT","t":481675412,"p":"585.20000000","q":"0.15700000","b":3414634291,"a":3414633982,"T":1638952608019,"m":false,"M":true}
[10:36:48 INF] Message received: {"e":"trade","E":1638952608163,"s":"BNBUSDT","t":481675413,"p":"585.20000000","q":"0.26600000","b":3414634298,"a":3414633982,"T":1638952608162,"m":false,"M":true}
async Task IWebSocketClient.StartAsync(CancellationToken cancellationToken)
{
ThrowIfDisposed();
Guard.Against.Null(_options.BaseAddress, nameof(_options.BaseAddress));
if (State == WebSocketState.Open)
throw new InvalidOperationException("Client was already connected");
try
{
await _webSocket.ConnectAsync(_options.BaseAddress, cancellationToken).ConfigureAwait(false);
Log.Write(LogLevel.Debug, $"Socket {ClientId} connected.");
}
catch (Exception ex) when (ex is WebSocketException)
{
Log.Write(LogLevel.Error, ex.Message);
}
var receiving = StartReceivingAsync(cancellationToken);
var sending = StartSendingAsync();
await Task.Delay(500000, cancellationToken).ConfigureAwait(false);
var trigger = await Task.WhenAny(receiving, sending);
}
/// <summary>
/// Factory method.
/// </summary>
/// <returns></returns>
public static IWebSocketClient Create(WebSocketClientOptions options) => new WebSocketClient(options);
private static ArraySegment<byte> GetArraySegment(ReadOnlyMemory<byte> memory)
{
if (!MemoryMarshal.TryGetArray(memory, out var result))
{
throw new InvalidOperationException("Buffer backed by array was expected");
}
return resu<
}
private async Task StartReceivingAsync(CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
// Do a 0 byte read so that idle connections don't allocate a buffer when waiting for a read
var result = await _webSocket.ReceiveAsync(GetArraySegment(Memory<byte>.Empty), cancellationToken)
.ConfigureAwait(false);
if (result.MessageType == WebSocketMessageType.Close)
{
return;
}
var memory = Writer.GetMemory();
var arraySegment = GetArraySegment(memory);
var receiveResult =
await _webSocket.ReceiveAsync(arraySegment, cancellationToken).ConfigureAwait(false);
// Need to check again for netcoreapp3.0 and later because a close can happen between a 0-byte read and the actual read
if (receiveResult.MessageType == WebSocketMessageType.Close)
{
return;
}
Log.Write(LogLevel.Trace,
$"Message received. Type: {receiveResult.MessageType}, size: {receiveResult.Count}, EndOfMessage: {receiveResult.EndOfMessage}.");
Writer.Advance(receiveResult.Count);
var flushResult = await Writer.FlushAsync(cancellationToken).ConfigureAwait(false);
// We canceled in the middle of applying back pressure
// or if the consumer is done
if (flushResult.IsCanceled || flushResult.IsCompleted)
{
break;
}
}
}
catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)
{
// Client has closed the WebSocket connection without completing the close handshake
Log.Write(LogLevel.Debug, "Socket connection closed prematurely.");
}
catch (OperationCanceledException)
{
// Ignore aborts, don't treat them like transport errors
}
catch (Exception ex)
{
if (!_aborted amp;amp; !cancellationToken.IsCancellationRequested)
{
await Writer.CompleteAsync(ex).ConfigureAwait(false);
Log.Write(LogLevel.Error, "Transport error detected.");
}
}
finally
{
// We're done writing.
await Writer.CompleteAsync().ConfigureAwait(false);
Log.Write(LogLevel.Information, "Socket transport receiving task completed.");
}
}
private bool WebSocketCanSend()
=> _webSocket.State is not (WebSocketState.Aborted or WebSocketState.Closed or WebSocketState.CloseSent);
private async Task StartSendingAsync()
{
Exception? error = null;
try
{
while (true)
{
var result = await Reader.ReadAsync().ConfigureAwait(false);
if (result.IsCanceled)
{
break;
}
var buffer = result.Buffer;
// Get a frame from the application
try
{
// Process the line.
_messageReceivedSubject.OnNext(Encoding.UTF8.GetString(buffer));
}
finally
{
Reader.AdvanceTo(buffer.End);
}
}
}
catch (Exception? ex)
{
error = ex;
}
finally
{
// Send the close frame before calling into user code
if (WebSocketCanSend())
{
try
{
// We're done sending, send the close frame to the client if the websocket is still open
await _webSocket
.CloseOutputAsync(
error != null
? WebSocketCloseStatus.InternalServerError
: WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None)
.ConfigureAwait(false);
}
catch (Exception)
{
Log.Write(LogLevel.Debug, "Closing webSocket failed.");
}
}
Log.Write(LogLevel.Information, "Socket transport sending task completed.");
await Reader.CompleteAsync().ConfigureAwait(false);
}
}