Потоки Kafka: как получить первую и последнюю запись SessionWindow?

#java #apache-kafka #apache-kafka-streams

#java #apache-kafka #apache-kafka-streams

Вопрос:

По умолчанию .windowedBy(SessionWindows.with(Duration.ofSeconds(60)) возвращает запись для каждой входящей записи.

В сочетании с .count() и a .filter() легко получить первую запись.

С помощью .suppress(Suppressed.untilWindowCloses(unbounded())) также легко восстановить последнюю запись.

Итак… Я выполняю обработку дважды, как вы можете видеть в адаптированном примере подсчета слов:

 
final KStream<String, String> streamsBranches = builder.<String,String>stream("streams-plaintext-input");

streamsBranches
  .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W ")))
  .groupBy((key, value) -> "" value)
  .windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
  .count(Materialized.with(Serdes.String(), Serdes.Long()))
  .toStream()
  .map((wk, v) -> new KeyValue<>(wk.key(), v == null ? -1l : v))
  .filter((wk, v) -> v == 1)
  .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

streamsBranches
  .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W ")))
  .groupBy((key, value) -> "" value)
  .windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
  .count(Materialized.with(Serdes.String(), Serdes.Long()))
  .suppress(Suppressed.untilWindowCloses(unbounded()))
  .toStream()
  .map((wk, v) -> new KeyValue<>(wk.key(), v))
  .filter((wk, v) -> v != null)
  .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
  

Но мне интересно, есть ли более простой и красивый способ сделать то же самое.

Комментарии:

1. Какую первую и последнюю запись вы хотели бы получить? Вы используете группировку на основе разных ключей , поэтому я думаю, что это не сработает так, как вы ожидали.

2. Разные ключи используются для иллюстрации того, что я хочу. Я отредактировал сообщение, чтобы удалить их, чтобы сделать его более понятным. Я просто хочу получить первую и последнюю запись из sessionwindow.

3. Я думаю, что ваш код по-прежнему не выполняет то, что вы хотите.

Ответ №1:

Я думаю, вам следует использовать SessionWindowedKStream::aggregate(...) и, основываясь на вашей логике, накапливать результат в агрегаторе (первое и последнее значение)

Пример кода может выглядеть следующим образом:

 streamsBranches.groupByKey()
        .windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
        .aggregate(
                AggClass::new,
                (key, value, oldAgg) -> oldAgg.update(value),
                (key, agg1, agg2) -> agg1.merge(agg2),
                Materialized.with(Serdes.String(), new AggClassSerdes())
        ).suppress(Suppressed.untilWindowCloses(unbounded()))
        .toStream().map((wk, v) -> new KeyValue<>(wk.key(), v))
.to("streams-wordcount-output", Produced.with(Serdes.String(), new AggClassSerdes()));
  

Где AggClass находится аккумулятор и AggClassSerdes есть Serdes для этого аккумулятора

 public class AggClass {
    private String first;
    private String last;

    public AggClass() {}

    public AggClass(String first, String last) {
        this.first = first;
        this.last = last;
    }

    public AggClass update(String value) {
        if (first == null)
            first = value;
        last = value;
        return this;
    }

    public AggClass merge(AggClass other) {
        if (this.first == null)
            return other;
        else return new AggClass(this.first, other.last);
    }
}