Потребитель концентратора событий Camel Azure с помощью контрольной точки

#apache-camel #azure-eventhub #spring-camel

Вопрос:

Я пытался использовать события из концентратора событий Azure с помощью Camel. В соответствии с документацией camel https://camel.apache.org/components/latest/azure-eventhubs-component.html#_consumer_example, есть ли у нас какие-либо рекомендации о том, как реализовать расположение событий или хранилище контрольных точек в памяти ? Используя приведенный ниже код (AzureEventHubRouteBuilder), я смог опубликовать сообщение в концентраторе событий, но при использовании смещения я получаю предупреждение ниже в консоли.

[ИНФОРМАЦИЯ ] 2021-10-04 14:36:12.472 — [пул-7-поток-1] Подсистема распределения нагрузки по разделам — — — Подсистема балансировки нагрузки уже запущена [ПРЕДУПРЕДИТЬ ] 2021-10-04 14:36:22.400 — [параллель-3] PartitionBasedLoadBalancer — — — Сбой балансировки нагрузки для процессора событий — Не наблюдалось ни одного элемента или сигнала терминала в течение 60000 мс в «фильтре» (и резерв не был настроен) Не наблюдал никакого сигнала элемента или терминала в течение 60000 мс в «фильтре» (и не было настроено никаких резервных возможностей) [ПРЕДУПРЕЖДЕНИЕ ] 2021-10-04 14:36:22.400 — [параллельный-3] Пользователь событий — — — Обмен обработкой ошибок. Обмен[]. Вызвано: [java.util.concurrent.Исключение TimeoutException — Не наблюдал ни одного элемента или сигнала терминала в течение 60000 мс в «фильтре» (и не был настроен запасной вариант)] java.util.параллельный.Исключение TimeoutException: Не наблюдал никакого элемента или сигнала терминала в течение 60000 мс в «фильтре» (и запасной вариант не был настроен) в reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.Обработчик времени(FluxTimeout.java:295)

Любые указания о том, как использовать события из Azure Event Hub из Camel? Если мы используем хранилище BlobCheckpoint, есть ли у нас какие-либо указатели/фрагменты кода для использования того же самого (как обновить контрольную точку и т.д.) ?

Фрагмент кода для маршрутов производителя и потребителя приведен ниже:

 @Component
 

открытый класс AzureEventHubRouteBuilder расширяет RouteBuilder {

 @Override
public void configure() throws Exception {

    //producer
    from("timer://runOnce?repeatCount=1")
            .log("Event hub route")
            .process(exchange -> {
                exchange.getIn().setHeader(EventHubsConstants.PARTITION_ID, "0");
                exchange.getIn().setHeader(EventHubsConstants.OFFSET, 1);
                exchange.getIn().setBody("test event");
            })
            .to("azure-eventhubs://?connectionString=Endpoint=sb://XXX.servicebus.windows.net/;SharedAccessKeyName=ABC;SharedAccessKey=ZZZ;EntityPath=yyy")
            .log("Success from event hub");

     //consumer
    from("azure-eventhubs://?connectionString=Endpoint=sb://XXX.servicebus.windows.net/;SharedAccessKeyName=ABC;SharedAccessKey=ZZZ;EntityPath=yyy" 
            "amp;blobAccountName=QQQamp;blobAccessKey=ABC=amp;blobContainerName=yyyamp;eventPosition=#eventPosition")
                   .log("${body}")
                    .log("SUCCESS");

}

@Bean
public Map<String,EventPosition> eventPosition() {
   return Map.of("testPartition",EventPosition.fromOffset(1));
}
 

}