#java #apache-kafka #apache-kafka-streams
#java #apache-kafka #apache-kafka-streams
Вопрос:
По умолчанию .windowedBy(SessionWindows.with(Duration.ofSeconds(60))
возвращает запись для каждой входящей записи.
В сочетании с .count()
и a .filter()
легко получить первую запись.
С помощью .suppress(Suppressed.untilWindowCloses(unbounded()))
также легко восстановить последнюю запись.
Итак… Я выполняю обработку дважды, как вы можете видеть в адаптированном примере подсчета слов:
final KStream<String, String> streamsBranches = builder.<String,String>stream("streams-plaintext-input");
streamsBranches
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W ")))
.groupBy((key, value) -> "" value)
.windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.toStream()
.map((wk, v) -> new KeyValue<>(wk.key(), v == null ? -1l : v))
.filter((wk, v) -> v == 1)
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
streamsBranches
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W ")))
.groupBy((key, value) -> "" value)
.windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map((wk, v) -> new KeyValue<>(wk.key(), v))
.filter((wk, v) -> v != null)
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
Но мне интересно, есть ли более простой и красивый способ сделать то же самое.
Комментарии:
1. Какую первую и последнюю запись вы хотели бы получить? Вы используете группировку на основе разных ключей , поэтому я думаю, что это не сработает так, как вы ожидали.
2. Разные ключи используются для иллюстрации того, что я хочу. Я отредактировал сообщение, чтобы удалить их, чтобы сделать его более понятным. Я просто хочу получить первую и последнюю запись из sessionwindow.
3. Я думаю, что ваш код по-прежнему не выполняет то, что вы хотите.
Ответ №1:
Я думаю, вам следует использовать SessionWindowedKStream::aggregate(...)
и, основываясь на вашей логике, накапливать результат в агрегаторе (первое и последнее значение)
Пример кода может выглядеть следующим образом:
streamsBranches.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
.aggregate(
AggClass::new,
(key, value, oldAgg) -> oldAgg.update(value),
(key, agg1, agg2) -> agg1.merge(agg2),
Materialized.with(Serdes.String(), new AggClassSerdes())
).suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream().map((wk, v) -> new KeyValue<>(wk.key(), v))
.to("streams-wordcount-output", Produced.with(Serdes.String(), new AggClassSerdes()));
Где AggClass
находится аккумулятор и AggClassSerdes
есть Serdes для этого аккумулятора
public class AggClass {
private String first;
private String last;
public AggClass() {}
public AggClass(String first, String last) {
this.first = first;
this.last = last;
}
public AggClass update(String value) {
if (first == null)
first = value;
last = value;
return this;
}
public AggClass merge(AggClass other) {
if (this.first == null)
return other;
else return new AggClass(this.first, other.last);
}
}