#apache-kafka #apache-kafka-streams #spring-kafka
#apache-kafka #apache-kafka-streams #spring-kafka
Вопрос:
Для событий за последние 24 часа я хочу создать статистическое событие, чтобы отобразить его где-нибудь в режиме реального времени. Итак, текущее поведение таково:
- Каждую минуту я собираю последнее 24-часовое событие для добавления объекта списка
- Я временно вычисляю полный список и создаю конечный статистический объект
- Я переношу этот окончательный статистический объект в новую тему
Я использую для этого поток Kafka и Spring Boot.
Это работает хорошо, у меня хороший расчет, и событие хорошо подготовлено в разработке. Проблема в том, что я нахожусь в рабочей среде, а тема исходного события содержит слишком много данных.
Если мое приложение остановится на один день или несколько минут. При перезапуске приложения мое приложение пытается восстановить историю. Поток Kafka продолжает обрабатываться с последнего смещения, и ему требуется огромное время, чтобы наверстать его задержку. На самом деле, меня не волнует история. Мне не нужен объект статистики со вчерашнего дня или за последние 24 часа минус 1 час, я просто хочу пересчитать с ЭТОГО момента до последних 24 часов, и все.
То же самое, если приложение работает правильно, но с некоторой задержкой для обработки статистического события. Задержка увеличивается и увеличивается. Я бы Kafka Stream автоматически пропустил временные окна и просто вычислил последнее, если задержка становится слишком важной.
Как вы думаете, Kafka Stream может справиться с этим? Заранее спасибо.
/**
* Every minute, we collect all events on the last day and we publish a new statistic event.
*
* @param streamsBuilder
* @return
*/
@Bean
public KStream<String, MySourceEvent> kstreamMySourceEventStatistique(final StreamsBuilder streamsBuilder) {
// We create the stream to consume machine-state topic.
KStream<String, MySourceEvent> kstreamStat = streamsBuilder
.<String, MySourceEvent>stream("my-source-topic", Consumed
.with(Serdes.String(), KafkaUtils
.jsonSerdeForClass(MySourceEvent.class)));
// For this stream, every minute, we take all events in the last 24h, and we aggregate them into TemporaryStatistiqueEvent
KTable<Windowed<String>, TemporaryStatistiqueEvent> aggregatedStream = kstreamStat
.groupByKey(Grouped
.with(Serdes.String(), KafkaUtils
.jsonSerdeForClass(MySourceEvent.class)))
.windowedBy(TimeWindows
.of(Duration.ofDays(1))
.advanceBy(Duration.ofMinutes(1))
.grace(Duration.ofSeconds(0)))
.<TemporaryStatistiqueEvent>aggregate(() -> new TemporaryStatistiqueEvent(), (key, value, logAgg) -> {
logAgg.add(value); //I add the event in my TemporaryStatistiqueEvent object
return logAgg;
}, Materialized
.<String, TemporaryStatistiqueEvent, WindowStore<Bytes, byte[]>>as("temporary-stats-store")
.withKeySerde(Serdes.String())
.withValueSerde(KafkaUtils.jsonSerdeForClass(TemporaryStatistiqueEvent.class))
.withRetention(Duration.ofDays(1)))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()));
// Now, we gave an aggregate on last 24h, we compute the statistic and push FinalStatisticEvent object in a new topic
aggregatedStream
.toStream()
.map(new KeyValueMapper<Windowed<String>, TemporaryStatistiqueEvent, KeyValue<String, PlcStatMachineState>>() {
@Override
public KeyValue<String, FinalStatisticEvent> apply(final Windowed<String> key, final TemporaryStatistiqueEvent temporaryStatistiqueEvent) {
ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(key.window().end()), ZoneOffset.UTC);
return new KeyValue<>(key.key(), temporaryStatistiqueEvent.computeFinalStatisticEvent(zdt));
}
})
.to("final-stat-topic", Produced.with(Serdes.String(), KafkaUtils.jsonSerdeForClass(FinalStatisticEvent .class)));
return kstreamStat;
}
Ответ №1:
Это сложный вопрос…
В случае автономного режима и перезапуска вы можете попробовать манипулировать начальными смещениями (т. Е. фиксированными смещениями) перед перезапуском приложения. Используя bin/kafka-consumer-group.sh
, вы можете «искать по времени» и, таким образом, «пропустить вперед до сейчас-минус-24 часа».
В случае, когда приложение отстает, это сложнее. Может быть, вы могли бы использовать «динамический фильтр» (для доступа к метаданным записи, таким как ее временная метка, которую вы могли бы использовать flatTransformValues
для реализации фильтра) в качестве первого шага в вашей программе, который удаляет слишком старые записи?