Канал чтения веб-сокетов одновременно считывает несколько сообщений, вероятно, потому, что разделитель не указан

#c# #.net-core #websocket

#c# #.net-core #websocket

Вопрос:

Проблема в том, что некоторые сообщения удваиваются, утраиваются и так далее. Потому что каналы чтения и записи не синхронизированы. Как мне это исправить? По сути, это оболочка веб-сокета вокруг ClientWebSocket. Вероятно, мне нужно реализовать некоторую логику разделителя. Я взял большую часть кода из реализации Microsoft SignalR.

Обратите внимание на следующее:

 [10:36:47 INF] Message received: {"e":"trade","E":1638952607313,"s":"BNBUSDT","t":481675406,"p":"585.10000000","q":"0.02200000","b":3414633304,"a":3414634268,"T":1638952607313,"m":true,"M":true}
[10:36:47 INF] Message received: {"e":"trade","E":1638952607313,"s":"BNBUSDT","t":481675407,"p":"585.10000000","q":"1.18400000","b":3414633657,"a":3414634268,"T":1638952607313,"m":true,"M":true}
{"e":"trade","E":1638952607313,"s":"BNBUSDT","t":481675408,"p":"585.10000000","q":"0.45200000","b":3414634033,"a":3414634268,"T":1638952607313,"m":true,"M":true}
{"e":"trade","E":1638952607313,"s":"BNBUSDT","t":481675409,"p":"585.10000000","q":"0.76100000","b":3414634061,"a":3414634268,"T":1638952607313,"m":true,"M":true}
[10:36:47 INF] Message received: {"e":"trade","E":1638952607778,"s":"BNBUSDT","t":481675410,"p":"585.20000000","q":"0.03300000","b":3414634286,"a":3414633982,"T":1638952607778,"m":false,"M":true}
[10:36:48 INF] Message received: {"e":"trade","E":1638952608019,"s":"BNBUSDT","t":481675411,"p":"585.20000000","q":"0.01700000","b":3414634290,"a":3414633982,"T":1638952608018,"m":false,"M":true}
{"e":"trade","E":1638952608019,"s":"BNBUSDT","t":481675412,"p":"585.20000000","q":"0.15700000","b":3414634291,"a":3414633982,"T":1638952608019,"m":false,"M":true}
[10:36:48 INF] Message received: {"e":"trade","E":1638952608163,"s":"BNBUSDT","t":481675413,"p":"585.20000000","q":"0.26600000","b":3414634298,"a":3414633982,"T":1638952608162,"m":false,"M":true}
 
 async Task IWebSocketClient.StartAsync(CancellationToken cancellationToken)
{
    ThrowIfDisposed();

    Guard.Against.Null(_options.BaseAddress, nameof(_options.BaseAddress));

    if (State == WebSocketState.Open)
        throw new InvalidOperationException("Client was already connected");

    try
    {
        await _webSocket.ConnectAsync(_options.BaseAddress, cancellationToken).ConfigureAwait(false);

        Log.Write(LogLevel.Debug, $"Socket {ClientId} connected.");
    }
    catch (Exception ex) when (ex is WebSocketException)
    {
        Log.Write(LogLevel.Error, ex.Message);
    }

    var receiving = StartReceivingAsync(cancellationToken);
    var sending = StartSendingAsync();

    await Task.Delay(500000, cancellationToken).ConfigureAwait(false);

    var trigger = await Task.WhenAny(receiving, sending);
}

/// <summary>
///     Factory method.
/// </summary>
/// <returns></returns>
public static IWebSocketClient Create(WebSocketClientOptions options) => new WebSocketClient(options);

private static ArraySegment<byte> GetArraySegment(ReadOnlyMemory<byte> memory)
{
    if (!MemoryMarshal.TryGetArray(memory, out var result))
    {
        throw new InvalidOperationException("Buffer backed by array was expected");
    }

    return resu<
}

private async Task StartReceivingAsync(CancellationToken cancellationToken)
{
    try
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            // Do a 0 byte read so that idle connections don't allocate a buffer when waiting for a read
            var result = await _webSocket.ReceiveAsync(GetArraySegment(Memory<byte>.Empty), cancellationToken)
                .ConfigureAwait(false);

            if (result.MessageType == WebSocketMessageType.Close)
            {
                return;
            }

            var memory = Writer.GetMemory();

            var arraySegment = GetArraySegment(memory);
            var receiveResult =
                await _webSocket.ReceiveAsync(arraySegment, cancellationToken).ConfigureAwait(false);

            // Need to check again for netcoreapp3.0 and later because a close can happen between a 0-byte read and the actual read
            if (receiveResult.MessageType == WebSocketMessageType.Close)
            {
                return;
            }

            Log.Write(LogLevel.Trace,
                $"Message received. Type: {receiveResult.MessageType}, size: {receiveResult.Count}, EndOfMessage: {receiveResult.EndOfMessage}.");

            Writer.Advance(receiveResult.Count);
            var flushResult = await Writer.FlushAsync(cancellationToken).ConfigureAwait(false);

            // We canceled in the middle of applying back pressure
            // or if the consumer is done
            if (flushResult.IsCanceled || flushResult.IsCompleted)
            {
                break;
            }
        }
    }
    catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)
    {
        // Client has closed the WebSocket connection without completing the close handshake
        Log.Write(LogLevel.Debug, "Socket connection closed prematurely.");
    }
    catch (OperationCanceledException)
    {
        // Ignore aborts, don't treat them like transport errors
    }
    catch (Exception ex)
    {
        if (!_aborted amp;amp; !cancellationToken.IsCancellationRequested)
        {
            await Writer.CompleteAsync(ex).ConfigureAwait(false);
            Log.Write(LogLevel.Error, "Transport error detected.");
        }
    }
    finally
    {
        // We're done writing.
        await Writer.CompleteAsync().ConfigureAwait(false);
        Log.Write(LogLevel.Information, "Socket transport receiving task completed.");
    }
}

private bool WebSocketCanSend()
    => _webSocket.State is not (WebSocketState.Aborted or WebSocketState.Closed or WebSocketState.CloseSent);

private async Task StartSendingAsync()
{
    Exception? error = null;

    try
    {
        while (true)
        {
            var result = await Reader.ReadAsync().ConfigureAwait(false);

            if (result.IsCanceled)
            {
                break;
            }

            var buffer = result.Buffer;

            // Get a frame from the application
            try
            {
                // Process the line.
                _messageReceivedSubject.OnNext(Encoding.UTF8.GetString(buffer));
            }
            finally
            {
                Reader.AdvanceTo(buffer.End);
            }
        }
    }
    catch (Exception? ex)
    {
        error = ex;
    }
    finally
    {
        // Send the close frame before calling into user code
        if (WebSocketCanSend())
        {
            try
            {
                // We're done sending, send the close frame to the client if the websocket is still open
                await _webSocket
                    .CloseOutputAsync(
                        error != null
                            ? WebSocketCloseStatus.InternalServerError
                            : WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None)
                    .ConfigureAwait(false);
            }
            catch (Exception)
            {
                Log.Write(LogLevel.Debug, "Closing webSocket failed.");
            }
        }

        Log.Write(LogLevel.Information, "Socket transport sending task completed.");
        await Reader.CompleteAsync().ConfigureAwait(false);
    }
}