Создайте статистическое событие с потоком Kafka и оконным потоком, но пропустите какое-то оконное, если есть какое-то более недавнее

#apache-kafka #apache-kafka-streams #spring-kafka

#apache-kafka #apache-kafka-streams #spring-kafka

Вопрос:

Для событий за последние 24 часа я хочу создать статистическое событие, чтобы отобразить его где-нибудь в режиме реального времени. Итак, текущее поведение таково:

  1. Каждую минуту я собираю последнее 24-часовое событие для добавления объекта списка
  2. Я временно вычисляю полный список и создаю конечный статистический объект
  3. Я переношу этот окончательный статистический объект в новую тему

Я использую для этого поток Kafka и Spring Boot.

Это работает хорошо, у меня хороший расчет, и событие хорошо подготовлено в разработке. Проблема в том, что я нахожусь в рабочей среде, а тема исходного события содержит слишком много данных.

Если мое приложение остановится на один день или несколько минут. При перезапуске приложения мое приложение пытается восстановить историю. Поток Kafka продолжает обрабатываться с последнего смещения, и ему требуется огромное время, чтобы наверстать его задержку. На самом деле, меня не волнует история. Мне не нужен объект статистики со вчерашнего дня или за последние 24 часа минус 1 час, я просто хочу пересчитать с ЭТОГО момента до последних 24 часов, и все.

То же самое, если приложение работает правильно, но с некоторой задержкой для обработки статистического события. Задержка увеличивается и увеличивается. Я бы Kafka Stream автоматически пропустил временные окна и просто вычислил последнее, если задержка становится слишком важной.

Как вы думаете, Kafka Stream может справиться с этим? Заранее спасибо.

     /**
 * Every minute, we collect all events on the last day and we publish a new statistic event.
 * 
 * @param streamsBuilder
 * @return
 */
@Bean
public KStream<String, MySourceEvent> kstreamMySourceEventStatistique(final StreamsBuilder streamsBuilder) {

    // We create the stream to consume machine-state topic.
    KStream<String, MySourceEvent> kstreamStat = streamsBuilder
            .<String, MySourceEvent>stream("my-source-topic", Consumed
                    .with(Serdes.String(), KafkaUtils
                            .jsonSerdeForClass(MySourceEvent.class)));

    // For this stream, every minute, we take all events in the last 24h, and we aggregate them into TemporaryStatistiqueEvent
    KTable<Windowed<String>, TemporaryStatistiqueEvent> aggregatedStream = kstreamStat 
            .groupByKey(Grouped
                    .with(Serdes.String(), KafkaUtils
                            .jsonSerdeForClass(MySourceEvent.class)))
            .windowedBy(TimeWindows
                    .of(Duration.ofDays(1))
                    .advanceBy(Duration.ofMinutes(1))
                    .grace(Duration.ofSeconds(0)))
            .<TemporaryStatistiqueEvent>aggregate(() -> new TemporaryStatistiqueEvent(), (key, value, logAgg) -> {
                logAgg.add(value); //I add the event in my TemporaryStatistiqueEvent object
                return logAgg;
            }, Materialized
                    .<String, TemporaryStatistiqueEvent, WindowStore<Bytes, byte[]>>as("temporary-stats-store")
                    .withKeySerde(Serdes.String())
                    .withValueSerde(KafkaUtils.jsonSerdeForClass(TemporaryStatistiqueEvent.class))
                    .withRetention(Duration.ofDays(1)))
            .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()));
    // Now, we gave an aggregate on last 24h, we compute the statistic and push FinalStatisticEvent object in a new topic
    aggregatedStream
            .toStream()
            .map(new KeyValueMapper<Windowed<String>, TemporaryStatistiqueEvent, KeyValue<String, PlcStatMachineState>>() {
                @Override
                public KeyValue<String, FinalStatisticEvent> apply(final Windowed<String> key, final TemporaryStatistiqueEvent temporaryStatistiqueEvent) {
                    ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(key.window().end()), ZoneOffset.UTC);
                    return new KeyValue<>(key.key(), temporaryStatistiqueEvent.computeFinalStatisticEvent(zdt));
                }
            })
            .to("final-stat-topic", Produced.with(Serdes.String(), KafkaUtils.jsonSerdeForClass(FinalStatisticEvent .class)));

    return kstreamStat;
}
  

Ответ №1:

Это сложный вопрос…

В случае автономного режима и перезапуска вы можете попробовать манипулировать начальными смещениями (т. Е. фиксированными смещениями) перед перезапуском приложения. Используя bin/kafka-consumer-group.sh , вы можете «искать по времени» и, таким образом, «пропустить вперед до сейчас-минус-24 часа».

В случае, когда приложение отстает, это сложнее. Может быть, вы могли бы использовать «динамический фильтр» (для доступа к метаданным записи, таким как ее временная метка, которую вы могли бы использовать flatTransformValues для реализации фильтра) в качестве первого шага в вашей программе, который удаляет слишком старые записи?