#c# #task-parallel-library #azure-eventhub
#c# #задача-параллельная библиотека #azure-eventhub
Вопрос:
Я работаю с клиентом Azure Eventhub producer и считываю сообщения из потока kafka, затем передаю его для десериализации / сопоставления, а затем передаю в Event Hub. У меня есть цикл потребления, который создает задачу для каждого потребления, а затем два метода для выполнения обработки (это, похоже, значительно улучшило скорость с точки зрения задержки Кафки. Однако Event hub заставляет вас создавать пакет событий, который я не обязательно хочу использовать. Пока я просто хочу отправлять данные по одному сообщению за раз. Чтобы создать новый пакет, я должен вызвать Dispose(). Я столкнулся с проблемой, когда к моменту вызова Dispose() происходит еще один вызов функции, и я получаю сообщение об ошибке, в котором говорится, что объект используется event hub.
Я также пытался использовать перегрузку для eventHubProducerClient.SendAsync, который позволяет передавать IEnumerable, но я сталкиваюсь с той же проблемой с этим.
Итак, я считаю, что это проблема синхронизации, или, может быть, мне нужно где-то заблокировать?
Любая помощь будет оценена.
public void Execute()
{
using (_consumer)
{
try
{
_consumer.Subscribe(_streamConsumerSettings.Topic);
while (true)
{
var result = _consumer.Consume(1000);
if (result == null)
{
continue;
}
var process = Task.Factory.StartNew(() => ProcessMessage(result?.Message?.Value));
var send = process.ContinueWith(t => SendMessage(process.Result));
}
}
catch (ConsumeException e)
{
_logger.LogError(e, e.StackTrace ?? e.Message);
_cancelConsume = true;
_consumer.Close();
RestartConsumer();
}
}
}
public static EquipmentJson ProcessMessage(byte[] result)
{
var json = _messageProcessor.DeserializeAndMap(result);
return json;
}
public static void SendMessage(EquipmentJson message)
{
try
{
_eventHubClient.AddToBatch(message);
}
catch (Exception e)
{
_logger.LogError(e, e.StackTrace ?? e.Message);
}
}
public async Task AddToBatch(EquipmentJson message)
{
if
(!string.IsNullOrEmpty(message.EquipmentLocation))
{
try
{
var batch = await _equipmentLocClient.CreateBatchAsync();
batch.TryAdd(new EventData(Encoding.UTF8.GetBytes(message.EquipmentLocation)));
await _eventHubProducerClient.SendAsync(batch);
batch.Dispose();
_logger.LogInformation($"Data sent {DateTimeOffset.UtcNow}");
}
catch (Exception e)
{
_logger.LogError(e, e.StackTrace ?? e.Message);
}
}
}
public class EventHubClient : IEventHubClient
{
private readonly ILoggerAdapter<EventHubClient> _logger;
private readonly EventHubClientSettings _eventHubClientSettings;
private IMapper _mapper;
private static EventHubProducerClient _equipmentLocClient;
public EventHubClient(ILoggerAdapter<EventHubClient> logger, EventHubClientSettings eventHubClientSettings, IMapper mapper)
{
_logger = logger;
_eventHubClientSettings = eventHubClientSettings;
_mapper = mapper;
_equipmentLocClient = new EventHubProducerClient(_eventHubClientSettings.ConnectionString, _eventHubClientSettings.EquipmentLocation);
}
}
}
Комментарии:
1. Можете ли вы помочь мне понять, какую клиентскую библиотеку Eventhubs вы используете, и включить создание клиента в свой фрагмент?
2. Я использую последнюю библиотеку из MSFT quickstart. Azure. Обмен сообщениями. EventHubs; Обновил сообщение выше с созданием клиента. Я регистрирую класс EventHubClient как одноэлементный, что, вероятно, является моей проблемой. Поскольку я несколько раз сталкиваюсь с этим классом в разных задачах.
3. Система. Исключение InvalidOperationException: пакет событий в настоящее время используется для связи со службой Event Hubs; события не могут быть добавлены до завершения активной операции. в Azure. Обмен сообщениями. EventHubs. Производитель. EventDataBatch. AssertNotLocked() в Azure. Обмен сообщениями. EventHubs. Производитель. EventDataBatch. TryAdd(EventData EventData)
4. Это интересно. Я не вижу ничего очевидного; ваше использование клиента и поток вашей отправки в полном порядке. Клиент безопасен для одновременного использования и как долговременный объект, и каждая операция отправки независима. Несколько вызовов для отправки могут быть активны одновременно без необходимости синхронизации. Эта трассировка стека указывает на то, что что-то пытается изменить пакет во время выполнения операции отправки. Я не могу воспроизвести поведение в консольном приложении, используя фрагменты вашего кода.
5. Единственное предположение, которое я могу предложить на данный момент, заключается в том, что, похоже, что-то в вашем потоке выполняет два ожидаемых вызова подряд, не уступая TryAdd.
Ответ №1:
Основываясь на моих предположениях в комментариях, мне любопытно, может ли помочь рефакторинг для использования, async/await
а не явное продолжение в основном цикле. Возможно, что-то похожее на следующий фрагмент LINQPad:
async Task Main()
{
while (true)
{
var message = await Task.Factory.StartNew(() => GetText());
var events = new[] { new EventData(Encoding.UTF8.GetBytes(message)) };
await Send(events).ConfigureAwait(false);
}
}
public EventHubProducerClient client = new EventHubProducerClient("<< CONNECTION STRING >>");
public async Task Send(EventData[] events)
{
try
{
await client.SendAsync(events).ConfigureAwait(false);
"Sent".Dump();
}
catch (Exception ex)
{
ex.Dump();
}
}
public string GetText()
{
Thread.Sleep(250);
return "Test";
}
Если вы настроены на сохранение продолжения, мне интересно, может ли помочь небольшой структурный рефакторинг в продолжении, как для ускорения создания событий, так и для соблюдения await
инструкций. Возможно, что-то похожее на следующий фрагмент LINQPad:
async Task Main()
{
while(true)
{
var t = Task.Factory.StartNew(() => GetText());
var _ = t.ContinueWith(async q =>
{
var events = new[] { new EventData(Encoding.UTF8.GetBytes(t.Result)) };
await Send(events).ConfigureAwait(false);
});
await Task.Yield();
}
}
public EventHubProducerClient client = new EventHubProducerClient("<< CONNECTION STRING >>");
public async Task Send(EventData[] events)
{
try
{
await client.SendAsync(events).ConfigureAwait(false);
"Sent".Dump();
}
catch (Exception ex)
{
ex.Dump();
}
}
public string GetText()
{
Thread.Sleep(250);
return "Test";
}
Комментарии:
1. Я рассмотрю ваше предложение и дам вам знать. Спасибо!