Проблема с совместимостью TPL .Net с производителем Azure EventHub

#c# #task-parallel-library #azure-eventhub

#c# #задача-параллельная библиотека #azure-eventhub

Вопрос:

Я работаю с клиентом Azure Eventhub producer и считываю сообщения из потока kafka, затем передаю его для десериализации / сопоставления, а затем передаю в Event Hub. У меня есть цикл потребления, который создает задачу для каждого потребления, а затем два метода для выполнения обработки (это, похоже, значительно улучшило скорость с точки зрения задержки Кафки. Однако Event hub заставляет вас создавать пакет событий, который я не обязательно хочу использовать. Пока я просто хочу отправлять данные по одному сообщению за раз. Чтобы создать новый пакет, я должен вызвать Dispose(). Я столкнулся с проблемой, когда к моменту вызова Dispose() происходит еще один вызов функции, и я получаю сообщение об ошибке, в котором говорится, что объект используется event hub.

Я также пытался использовать перегрузку для eventHubProducerClient.SendAsync, который позволяет передавать IEnumerable, но я сталкиваюсь с той же проблемой с этим.

Итак, я считаю, что это проблема синхронизации, или, может быть, мне нужно где-то заблокировать?

Любая помощь будет оценена.

        public void Execute()
                {
                    using (_consumer)
                    {
                        try
                        {
                            _consumer.Subscribe(_streamConsumerSettings.Topic);
                            while (true)
                            {
                                var result = _consumer.Consume(1000);
        
                                if (result == null)
                                {
                                    continue;
                                }
                                var process = Task.Factory.StartNew(() => ProcessMessage(result?.Message?.Value));
                                var send = process.ContinueWith(t => SendMessage(process.Result));                        
                            }
        
                        }
                        catch (ConsumeException e)
                        {
                            _logger.LogError(e, e.StackTrace ?? e.Message);
                            _cancelConsume = true;
                            _consumer.Close();
                            RestartConsumer();
                        }
                    }
                }
    
         public static EquipmentJson ProcessMessage(byte[] result)
         {
                    var json = _messageProcessor.DeserializeAndMap(result);
                    return json;
         }
        
         public static void SendMessage(EquipmentJson message)
         {
                    try 
                    {   
        
                        _eventHubClient.AddToBatch(message);             
                        
                    }
                    catch (Exception e)
                    {
                        _logger.LogError(e, e.StackTrace ?? e.Message);
                    }
          }
    
     
    
        public async Task AddToBatch(EquipmentJson message)
                {
                    if 
      (!string.IsNullOrEmpty(message.EquipmentLocation))
                    {
                        try
                        {
                            var batch = await _equipmentLocClient.CreateBatchAsync();
                            batch.TryAdd(new EventData(Encoding.UTF8.GetBytes(message.EquipmentLocation)));
                            await _eventHubProducerClient.SendAsync(batch);
                            batch.Dispose();
                            _logger.LogInformation($"Data sent {DateTimeOffset.UtcNow}");
                        }
                        catch (Exception e)
                        {
                            _logger.LogError(e, e.StackTrace ?? e.Message);
                        }
                    }
                }

 public class EventHubClient : IEventHubClient
    {
        private readonly ILoggerAdapter<EventHubClient> _logger;
        private readonly EventHubClientSettings _eventHubClientSettings;
        private IMapper _mapper;

        
        private static EventHubProducerClient _equipmentLocClient;


        public EventHubClient(ILoggerAdapter<EventHubClient> logger, EventHubClientSettings eventHubClientSettings, IMapper mapper)
        {
            _logger = logger;
            _eventHubClientSettings = eventHubClientSettings;
            _mapper = mapper;
            _equipmentLocClient = new EventHubProducerClient(_eventHubClientSettings.ConnectionString, _eventHubClientSettings.EquipmentLocation);

        }
    }
}
  

Комментарии:

1. Можете ли вы помочь мне понять, какую клиентскую библиотеку Eventhubs вы используете, и включить создание клиента в свой фрагмент?

2. Я использую последнюю библиотеку из MSFT quickstart. Azure. Обмен сообщениями. EventHubs; Обновил сообщение выше с созданием клиента. Я регистрирую класс EventHubClient как одноэлементный, что, вероятно, является моей проблемой. Поскольку я несколько раз сталкиваюсь с этим классом в разных задачах.

3. Система. Исключение InvalidOperationException: пакет событий в настоящее время используется для связи со службой Event Hubs; события не могут быть добавлены до завершения активной операции. в Azure. Обмен сообщениями. EventHubs. Производитель. EventDataBatch. AssertNotLocked() в Azure. Обмен сообщениями. EventHubs. Производитель. EventDataBatch. TryAdd(EventData EventData)

4. Это интересно. Я не вижу ничего очевидного; ваше использование клиента и поток вашей отправки в полном порядке. Клиент безопасен для одновременного использования и как долговременный объект, и каждая операция отправки независима. Несколько вызовов для отправки могут быть активны одновременно без необходимости синхронизации. Эта трассировка стека указывает на то, что что-то пытается изменить пакет во время выполнения операции отправки. Я не могу воспроизвести поведение в консольном приложении, используя фрагменты вашего кода.

5. Единственное предположение, которое я могу предложить на данный момент, заключается в том, что, похоже, что-то в вашем потоке выполняет два ожидаемых вызова подряд, не уступая TryAdd.

Ответ №1:

Основываясь на моих предположениях в комментариях, мне любопытно, может ли помочь рефакторинг для использования, async/await а не явное продолжение в основном цикле. Возможно, что-то похожее на следующий фрагмент LINQPad:

 async Task Main()
{
    while (true)
    {
        var message = await Task.Factory.StartNew(() => GetText());
        var events = new[] { new EventData(Encoding.UTF8.GetBytes(message)) };
        
        await Send(events).ConfigureAwait(false);
    }
}

public EventHubProducerClient client = new EventHubProducerClient("<< CONNECTION STRING >>");

public async Task Send(EventData[] events)
{
    try
    {
        await client.SendAsync(events).ConfigureAwait(false);
        "Sent".Dump();
    }
    catch (Exception ex)
    {
        ex.Dump();
    }
}

public string GetText()
{
    Thread.Sleep(250);
    return "Test";
}
  

Если вы настроены на сохранение продолжения, мне интересно, может ли помочь небольшой структурный рефакторинг в продолжении, как для ускорения создания событий, так и для соблюдения await инструкций. Возможно, что-то похожее на следующий фрагмент LINQPad:

 async Task Main()
{
    while(true)
    {
        var t = Task.Factory.StartNew(() => GetText());
        var _ = t.ContinueWith(async q =>
        {
            var events = new[] { new EventData(Encoding.UTF8.GetBytes(t.Result)) };
            await Send(events).ConfigureAwait(false);
        });
        
        await Task.Yield();
    }
}

public EventHubProducerClient client = new EventHubProducerClient("<< CONNECTION STRING >>");

public async Task Send(EventData[] events)
{
    try
    {
        await client.SendAsync(events).ConfigureAwait(false);
        "Sent".Dump();
    }
    catch (Exception ex)
    {
        ex.Dump();
    }
}

public string GetText()
{
    Thread.Sleep(250);
    return "Test";
}

  

Комментарии:

1. Я рассмотрю ваше предложение и дам вам знать. Спасибо!