#apache-camel #azure-eventhub #spring-camel
Вопрос:
Я пытался использовать события из концентратора событий Azure с помощью Camel. В соответствии с документацией camel https://camel.apache.org/components/latest/azure-eventhubs-component.html#_consumer_example, есть ли у нас какие-либо рекомендации о том, как реализовать расположение событий или хранилище контрольных точек в памяти ? Используя приведенный ниже код (AzureEventHubRouteBuilder), я смог опубликовать сообщение в концентраторе событий, но при использовании смещения я получаю предупреждение ниже в консоли.
[ИНФОРМАЦИЯ ] 2021-10-04 14:36:12.472 — [пул-7-поток-1] Подсистема распределения нагрузки по разделам — — — Подсистема балансировки нагрузки уже запущена [ПРЕДУПРЕДИТЬ ] 2021-10-04 14:36:22.400 — [параллель-3] PartitionBasedLoadBalancer — — — Сбой балансировки нагрузки для процессора событий — Не наблюдалось ни одного элемента или сигнала терминала в течение 60000 мс в «фильтре» (и резерв не был настроен) Не наблюдал никакого сигнала элемента или терминала в течение 60000 мс в «фильтре» (и не было настроено никаких резервных возможностей) [ПРЕДУПРЕЖДЕНИЕ ] 2021-10-04 14:36:22.400 — [параллельный-3] Пользователь событий — — — Обмен обработкой ошибок. Обмен[]. Вызвано: [java.util.concurrent.Исключение TimeoutException — Не наблюдал ни одного элемента или сигнала терминала в течение 60000 мс в «фильтре» (и не был настроен запасной вариант)] java.util.параллельный.Исключение TimeoutException: Не наблюдал никакого элемента или сигнала терминала в течение 60000 мс в «фильтре» (и запасной вариант не был настроен) в reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.Обработчик времени(FluxTimeout.java:295)
Любые указания о том, как использовать события из Azure Event Hub из Camel? Если мы используем хранилище BlobCheckpoint, есть ли у нас какие-либо указатели/фрагменты кода для использования того же самого (как обновить контрольную точку и т.д.) ?
Фрагмент кода для маршрутов производителя и потребителя приведен ниже:
@Component
открытый класс AzureEventHubRouteBuilder расширяет RouteBuilder {
@Override
public void configure() throws Exception {
//producer
from("timer://runOnce?repeatCount=1")
.log("Event hub route")
.process(exchange -> {
exchange.getIn().setHeader(EventHubsConstants.PARTITION_ID, "0");
exchange.getIn().setHeader(EventHubsConstants.OFFSET, 1);
exchange.getIn().setBody("test event");
})
.to("azure-eventhubs://?connectionString=Endpoint=sb://XXX.servicebus.windows.net/;SharedAccessKeyName=ABC;SharedAccessKey=ZZZ;EntityPath=yyy")
.log("Success from event hub");
//consumer
from("azure-eventhubs://?connectionString=Endpoint=sb://XXX.servicebus.windows.net/;SharedAccessKeyName=ABC;SharedAccessKey=ZZZ;EntityPath=yyy"
"amp;blobAccountName=QQQamp;blobAccessKey=ABC=amp;blobContainerName=yyyamp;eventPosition=#eventPosition")
.log("${body}")
.log("SUCCESS");
}
@Bean
public Map<String,EventPosition> eventPosition() {
return Map.of("testPartition",EventPosition.fromOffset(1));
}
}