#java #apache-kafka #kafka-consumer-api #confluent-platform
#java #apache-kafka #kafka-consumer-api #confluent-платформа
Вопрос:
я довольно новичок в kafka и пытаюсь объединить несколько потоков по уникальному ключу. У меня есть два разных потока, которые я получаю от прослушивания двух разных тем. Я перераспределяю и объединяю их по соответствующим ключам. Затем я слушаю другую тему C и соединяю предыдущие объединенные данные со свежим streamC. Итак, подведем итог
A-B=>AB
C->
AB-C>ABC
Это работает и объединяет данные именно так, как я хочу, и я доволен этим. По какой-то причине, если, скажем, в раздел C поступают только данные, он будет извлекать данные из хранилища состояний (я полагаю, что он записывает предыдущие данные в хранилище состояний). Таким образом, он выполнит объединение со старой записью. Насколько я понимаю, у kafka есть собственный механизм кэширования с хранилищем состояний, что делает его более эффективным, но в моем случае мои данные довольно динамичны и меняются, поэтому я не хочу, чтобы он использовал старые записи для объединения. По сути, я хочу объединить эти данные только в том случае, если все эти разделы получили новую полезную нагрузку отдельно, и каждый из них также завершает свое собственное объединение. — ни в коем случае не использовать какую-либо предыдущую запись.
Вот соответствующий код
protected Topology createStreamTopology(){
StreamsBuilder builder = new StreamsBuilder();
//Stream A
KStream<String, A> streamA = builder.stream(
aTopic,
Consumed.with(Serdes.String(), aSerde)
).selectKey((k, v) -> A.getOrderId(v));
//Stream B
//Make a table of the salesforce order line item events
KStream<String, B> streamB = builder.stream(
bTopic,
Consumed.with(Serdes.String(), bSerde)
).filter(
(k1, v1) -> v1.getPayload().getPlatformC() != null
).selectKey(
(k, v) -> v.getOrder()
);
//Join stream a and stream b
KStream<String, AB> abData = streamA.join(
streamB,
joinedAB::new,
JoinWindows.of(Duration.ofHours(2)),
Joined.with(Serdes.String(), aSerde, bSerde)
).selectKey(
(k, v) -> v.getPayload.getUniqueID());
// Stream c
KStream<String, C>stream streamC=builder.stream(cTopic, Consumed.with(Serdes.String(), serde))
.mapValues((k,v)->transformToC(v))
.filter((key, value) -> Objects.nonNull(value))
.selectKey((key, value) -> value.getID());
//join stream c and joined abData
KStream<String, abcData> abcData = streamC.join(
abData,
joinedABC::new,
JoinWindows.of(Duration.ofHours(2)),
Joined.with(Serdes.String(), cSerde, abSerde)
);
}