Двухфазная фиксация потока данных TPL

#c# #.net #dataflow #tpl-dataflow #bufferblock

#c# #.net #поток данных #tpl-поток данных #bufferblock

Вопрос:

Я хотел реализовать что-то вроде протокола двухфазной фиксации для приема сообщений.

Для того, чтобы сделать это, я реализовал ITargetBlock сам:

   public class Worker : ITargetBlock<Message>
  {
    // Is connected to remote server
    // Maintaining connection removed for brevity in this example
    private bool _isConnectionAlive;
    private readonly ActionBlock<MessageWithSource> _action;

    public Worker()
    {
      _action = new ActionBlock<MessageWithSource>(DoWork);
    }

    public DataflowMessageStatus OfferMessage(
      DataflowMessageHeader messageHeader, Message messageValue,
      ISourceBlock<Message> source, bool consumeToAccept)
    {
      if (consumeToAccept || source == null)
      {
        return DataflowMessageStatus.Declined;
      }

      if (!_isConnectionAlive)
      {
        return DataflowMessageStatus.Postponed;
      }

      var reservedMessage = source.ReserveMessage(messageHeader, this);
      if (reservedMessage)
      {
        _action.Post(new MessageWithSource(messageValue, source, messageHeader));
      }

      return DataflowMessageStatus.Postponed;
    }

    // Other methods removed for brevity

    private async Task DoWork(MessageWithSource value)
    {
      try
      {
        // sending message to the server removed for brevity


        // commit that we finished processing without error
        var message = value.SourceBlock.ConsumeMessage(value.MessageHeader, this, out _);

        if (message != value.Message)
        {
          // In which cases can we get here?
          throw new InvalidOperationException("Consumed some other message... oh my");
        }
      }
      catch (WebSocketException)
      {
        // Release reservation if we can't finish work, so other Workers can pickup this message and process it
        value.SourceBlock.ReleaseReservation(value.MessageHeader, this);
      }
    }

    private class MessageWithSource
    {
      public Message Message { get; }
      public ISourceBlock<Message> SourceBlock { get; }
      public DataflowMessageHeader MessageHeader { get; }
    }
  }
  

В документах говорится, ConsumeMessage что может возвращать экземпляр, отличный от ранее предложенного.

Интересно, в каких случаях и каким образом это происходит?

Комментарии:

1. @StephenCleary будет очень признателен, если вы сможете взглянуть

2. Зачем использовать семантику транзакции в потоке данных , а тем более двухфазную фиксацию? Какие инициаторы транзакций задействованы? Способ обработки проблем с потоком данных заключается в перенаправлении сообщения о сбое или ошибки в другой блок, например, с использованием предиката в LinkTo() . Таким образом, вы избегаете блокирования других сообщений во входной очереди.

3. Перенаправление становится намного проще, если сообщения упаковываются в «конверт», который указывает, является ли это хорошим или «плохим» сообщением. Конверт может включать счетчик повторных попыток, чтобы гарантировать, что одно и то же сообщение не повторяется бесконечно.

4. @PanagiotisKanavos Я хотел семантику транзакции, потому что мне нужно обеспечить упорядоченность, и некоторые из этих Worker экземпляров могут быть на некоторое время нарушены. Может быть, есть другой хороший способ сделать это?

5. То, что вы описываете, — это постановка в очередь, обмен сообщениями и повторные попытки. Это не имеет ничего общего с семантикой транзакции. Семантика транзакции означает, что либо все серверы принимают и фиксируют операцию, либо все серверы отбрасывают ее. 2PC — вот как они это делают.