Потребитель концентратора событий в сервисной сети

#azure #azure-service-fabric #azure-eventhub

#azure #azure-service-fabric #azure-eventhub

Вопрос:

Я пытаюсь заставить service fabric последовательно извлекать сообщения из azure event hub. Кажется, у меня все подключено, но я замечаю, что мой потребитель просто перестает извлекать события.

У меня есть концентратор с парой тысяч событий, которые я к нему подключил. Настроил концентратор с 1 разделом, и у моей службы service fabric также есть только 1 раздел, чтобы упростить отладку.

Служба запускается, создает EventHubClient, оттуда использует его для создания PartitionReceiver. Получатель передается в «EventLoop», который вводит «бесконечный», когда вызывает receiver.ReceiveAsync. Ниже приведен код для EventLoop.

Что я наблюдаю, так это то, что в первый раз в цикле я почти всегда получаю 1 сообщение. Во второй раз я получаю где-то от 103 до 200 сообщений. После этого я не получаю никаких сообщений. Также кажется, что если я перезапущу службу, я снова получу те же сообщения, но это потому, что при перезапуске службы она запускается с начала потока.

Ожидал бы, что это будет продолжаться до тех пор, пока мои 2000 сообщений не будут использованы, а затем он будет ждать меня (опрос случайно).

Есть ли что-то конкретное, что мне нужно сделать с Azure.Обмен сообщениями.Пакет EventHubs 5.3.0, позволяющий ему продолжать извлекать события?

 //Here is how I am creating the EventHubClient:
var connectionString = "something secret";
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString)
{
   EntityPath = "NameOfMyEventHub"
};
try
{
   m_eventHubClient = EventHubClient.Create(connectionStringBuilder);
}

//Here is how I am getting the partition receiver
var receiver = m_eventHubClient.CreateReceiver("$Default", m_partitionId, EventPosition.FromStart());

//The event loop which the receiver is passed to
private async Task EventLoop(PartitionReceiver receiver)
  {
     m_started = true;
     while (m_keepRunning)
     {
        var events = await receiver.ReceiveAsync(m_options.BatchSize, TimeSpan.FromSeconds(5));
        if (events != null) //First 2/3 times events aren't null. After that, always null and I know there are more in the partition/
        {
           var eventsArray = events as EventData[] ?? events.ToArray();
           m_state.NumProcessedSinceLastSave  = eventsArray.Count();

           foreach (var evt in eventsArray)
           {
              //Process the event
              await m_options.Processor.ProcessMessageAsync(evt, null);

              string lastOffset = evt.SystemProperties.Offset;

              if (m_state.NumProcessedSinceLastSave >= m_options.BatchSize)
              {
                 m_state.Offset = lastOffset;
                 m_state.NumProcessedSinceLastSave = 0;
                 await m_state.SaveAsync();
              }
           }
        }
     }

     m_started = false;
  }
 

** РЕДАКТИРОВАТЬ, был задан вопрос о количестве разделов. Концентратор событий имеет один раздел, и служба SF также имеет один раздел.

Намереваюсь использовать состояние service fabric для отслеживания моего смещения в концентратор, но сейчас это не проблема.

Прослушиватели разделов создаются для каждого раздела. Я получаю разделы, подобные этому:

 public async Task StartAsync()
  {
     // slice the pie according to distribution
     // this partition can get one or more assigned Event Hub Partition ids
     string[] eventHubPartitionIds = (await m_eventHubClient.GetRuntimeInformationAsync()).PartitionIds;
     string[] resolvedEventHubPartitionIds = m_options.ResolveAssignedEventHubPartitions(eventHubPartitionIds);

     foreach (var resolvedPartition in resolvedEventHubPartitionIds)
     {
        var partitionReceiver = new EventHubListenerPartitionReceiver(m_eventHubClient, resolvedPartition, m_options);
        await partitionReceiver.StartAsync();
        m_partitionReceivers.Add(partitionReceiver);
     }
  }
 

Когда partitionListener.Вызывается startAsync, он фактически создает PartitionListener, например, так (на самом деле это немного больше, чем это, но выбранная ветвь — это эта:

 m_eventHubClient.CreateReceiver(m_options.EventHubConsumerGroupName, m_partitionId, EventPosition.FromStart());
 

Спасибо за любые советы.
Будет

Комментарии:

1. Можете ли вы помочь мне понять, как m_keepRunning это регулируется? Я вижу, что он используется как часть управления циклом, но было бы полезно посмотреть, как приложение управляет его значением.

2. Для него установлено значение false при CloseAsync, когда service fabric отключает службу.

3. Я не вижу ничего очевидного в коде, который был передан. Интересно, есть ли ненаблюдаемое исключение, которое вызывает сбой задачи. Цикл не имеет обработки ошибок в версии, которая была общей. В чем заключается стратегия обработки исключений ProcessMessageAsync ?

4. >> Есть ли что-то конкретное, что мне нужно сделать с Azure. Обмен сообщениями. Пакет EventHubs 5.3.0, позволяющий ему продолжать извлекать события? Я не уверен, о чем вы там спрашиваете. Фрагменты, которыми вы поделились, используют устаревший Microsoft.Azure.EventHubs пакет, а не текущее поколение Azure.Messaging.EventHubs .

5. @JesseSquire это интересное наблюдение. Пакет, на который ссылается Azure. Обмен сообщениями. EventHubs (5.3.0-бета-версия. 4) но вы действительно правы, когда я перехожу к определению в EventHubClient, оно находится в // C:UsersUser.nugetpackagesmicrosoft.azure.eventhubs4.3.1libnetstandard2.0Microsoft . Azure. EventHubs.dll , несмотря на то, что этот пакет даже не упоминается в моем проекте. Без сомнения, когда-то это было. Позвольте мне разобраться с этим, и, возможно, проблема разрешится сама собой.

Ответ №1:

Сколько у вас разделов? Я не вижу в вашем коде, как вы убедитесь, что прочитали все разделы в группе потребителей по умолчанию.

Любая конкретная причина, по которой вы используете PartitionReceiver вместо использования EventProcessorHost?

Мне кажется, SF идеально подходит для использования узла обработки событий. Я вижу, что уже существует интегрированное решение SF, которое использует службы с отслеживанием состояния для контрольных точек.

Комментарии:

1. Чтобы ответить на вопрос о разделе, в event hub есть 1 раздел, а в службе SF также есть только один — думал, что это также может быть причиной, поэтому я исключил это. Я добавлю некоторые из этих деталей в свой основной пост.

2. Упомянутый и связанный пакет интеграции SF использует службу с отслеживанием состояния вместо учетной записи хранилища .

3. Просто чтобы упомянуть, ссылка выше повторно рекомендовала использовать библиотеку текущего поколения, Azure.Messaging.EventHubs. EventProcessorHost Ссылка в ответе указывает на правильную библиотеку, но имя типа будет EventProcessorClient в этом пакете. Для этого сценария лучшим подходом будет вывод из EventProcesor<T> , позволяющий использовать состояние Service Fabric в качестве хранилища контрольных точек.

4. Спасибо всем, я изучаю эти рекомендации. Следует разобраться.

5. Спасибо всем. После выполнения этого упражнения по обеспечению использования правильных пакетов я обнаружил проблему. Оказывается, это было в моем глупом модульном тесте, который не ожидал. Тест заканчивался и завершал задачу до того, как все сообщения попадали на концентратор, и он постоянно заканчивался после того, как точно такая же сумма просто случайно… Рад быть в новых пакетах, хотя ваша помощь в целом улучшила ситуацию для меня.