#azure #azure-service-fabric #azure-eventhub
#azure #azure-service-fabric #azure-eventhub
Вопрос:
Я пытаюсь заставить service fabric последовательно извлекать сообщения из azure event hub. Кажется, у меня все подключено, но я замечаю, что мой потребитель просто перестает извлекать события.
У меня есть концентратор с парой тысяч событий, которые я к нему подключил. Настроил концентратор с 1 разделом, и у моей службы service fabric также есть только 1 раздел, чтобы упростить отладку.
Служба запускается, создает EventHubClient, оттуда использует его для создания PartitionReceiver. Получатель передается в «EventLoop», который вводит «бесконечный», когда вызывает receiver.ReceiveAsync. Ниже приведен код для EventLoop.
Что я наблюдаю, так это то, что в первый раз в цикле я почти всегда получаю 1 сообщение. Во второй раз я получаю где-то от 103 до 200 сообщений. После этого я не получаю никаких сообщений. Также кажется, что если я перезапущу службу, я снова получу те же сообщения, но это потому, что при перезапуске службы она запускается с начала потока.
Ожидал бы, что это будет продолжаться до тех пор, пока мои 2000 сообщений не будут использованы, а затем он будет ждать меня (опрос случайно).
Есть ли что-то конкретное, что мне нужно сделать с Azure.Обмен сообщениями.Пакет EventHubs 5.3.0, позволяющий ему продолжать извлекать события?
//Here is how I am creating the EventHubClient:
var connectionString = "something secret";
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString)
{
EntityPath = "NameOfMyEventHub"
};
try
{
m_eventHubClient = EventHubClient.Create(connectionStringBuilder);
}
//Here is how I am getting the partition receiver
var receiver = m_eventHubClient.CreateReceiver("$Default", m_partitionId, EventPosition.FromStart());
//The event loop which the receiver is passed to
private async Task EventLoop(PartitionReceiver receiver)
{
m_started = true;
while (m_keepRunning)
{
var events = await receiver.ReceiveAsync(m_options.BatchSize, TimeSpan.FromSeconds(5));
if (events != null) //First 2/3 times events aren't null. After that, always null and I know there are more in the partition/
{
var eventsArray = events as EventData[] ?? events.ToArray();
m_state.NumProcessedSinceLastSave = eventsArray.Count();
foreach (var evt in eventsArray)
{
//Process the event
await m_options.Processor.ProcessMessageAsync(evt, null);
string lastOffset = evt.SystemProperties.Offset;
if (m_state.NumProcessedSinceLastSave >= m_options.BatchSize)
{
m_state.Offset = lastOffset;
m_state.NumProcessedSinceLastSave = 0;
await m_state.SaveAsync();
}
}
}
}
m_started = false;
}
** РЕДАКТИРОВАТЬ, был задан вопрос о количестве разделов. Концентратор событий имеет один раздел, и служба SF также имеет один раздел.
Намереваюсь использовать состояние service fabric для отслеживания моего смещения в концентратор, но сейчас это не проблема.
Прослушиватели разделов создаются для каждого раздела. Я получаю разделы, подобные этому:
public async Task StartAsync()
{
// slice the pie according to distribution
// this partition can get one or more assigned Event Hub Partition ids
string[] eventHubPartitionIds = (await m_eventHubClient.GetRuntimeInformationAsync()).PartitionIds;
string[] resolvedEventHubPartitionIds = m_options.ResolveAssignedEventHubPartitions(eventHubPartitionIds);
foreach (var resolvedPartition in resolvedEventHubPartitionIds)
{
var partitionReceiver = new EventHubListenerPartitionReceiver(m_eventHubClient, resolvedPartition, m_options);
await partitionReceiver.StartAsync();
m_partitionReceivers.Add(partitionReceiver);
}
}
Когда partitionListener.Вызывается startAsync, он фактически создает PartitionListener, например, так (на самом деле это немного больше, чем это, но выбранная ветвь — это эта:
m_eventHubClient.CreateReceiver(m_options.EventHubConsumerGroupName, m_partitionId, EventPosition.FromStart());
Спасибо за любые советы.
Будет
Комментарии:
1. Можете ли вы помочь мне понять, как
m_keepRunning
это регулируется? Я вижу, что он используется как часть управления циклом, но было бы полезно посмотреть, как приложение управляет его значением.2. Для него установлено значение false при CloseAsync, когда service fabric отключает службу.
3. Я не вижу ничего очевидного в коде, который был передан. Интересно, есть ли ненаблюдаемое исключение, которое вызывает сбой задачи. Цикл не имеет обработки ошибок в версии, которая была общей. В чем заключается стратегия обработки исключений
ProcessMessageAsync
?4. >> Есть ли что-то конкретное, что мне нужно сделать с Azure. Обмен сообщениями. Пакет EventHubs 5.3.0, позволяющий ему продолжать извлекать события? Я не уверен, о чем вы там спрашиваете. Фрагменты, которыми вы поделились, используют устаревший
Microsoft.Azure.EventHubs
пакет, а не текущее поколениеAzure.Messaging.EventHubs
.5. @JesseSquire это интересное наблюдение. Пакет, на который ссылается Azure. Обмен сообщениями. EventHubs (5.3.0-бета-версия. 4) но вы действительно правы, когда я перехожу к определению в EventHubClient, оно находится в // C:UsersUser.nugetpackagesmicrosoft.azure.eventhubs4.3.1libnetstandard2.0Microsoft . Azure. EventHubs.dll , несмотря на то, что этот пакет даже не упоминается в моем проекте. Без сомнения, когда-то это было. Позвольте мне разобраться с этим, и, возможно, проблема разрешится сама собой.
Ответ №1:
Сколько у вас разделов? Я не вижу в вашем коде, как вы убедитесь, что прочитали все разделы в группе потребителей по умолчанию.
Любая конкретная причина, по которой вы используете PartitionReceiver
вместо использования EventProcessorHost?
Мне кажется, SF идеально подходит для использования узла обработки событий. Я вижу, что уже существует интегрированное решение SF, которое использует службы с отслеживанием состояния для контрольных точек.
Комментарии:
1. Чтобы ответить на вопрос о разделе, в event hub есть 1 раздел, а в службе SF также есть только один — думал, что это также может быть причиной, поэтому я исключил это. Я добавлю некоторые из этих деталей в свой основной пост.
2. Упомянутый и связанный пакет интеграции SF использует службу с отслеживанием состояния вместо учетной записи хранилища .
3. Просто чтобы упомянуть, ссылка выше повторно рекомендовала использовать библиотеку текущего поколения,
Azure.Messaging.EventHubs.
EventProcessorHost
Ссылка в ответе указывает на правильную библиотеку, но имя типа будетEventProcessorClient
в этом пакете. Для этого сценария лучшим подходом будет вывод изEventProcesor<T>
, позволяющий использовать состояние Service Fabric в качестве хранилища контрольных точек.4. Спасибо всем, я изучаю эти рекомендации. Следует разобраться.
5. Спасибо всем. После выполнения этого упражнения по обеспечению использования правильных пакетов я обнаружил проблему. Оказывается, это было в моем глупом модульном тесте, который не ожидал. Тест заканчивался и завершал задачу до того, как все сообщения попадали на концентратор, и он постоянно заканчивался после того, как точно такая же сумма просто случайно… Рад быть в новых пакетах, хотя ваша помощь в целом улучшила ситуацию для меня.