Kafka как предотвратить использование старых записей при объединении нескольких потоков

#java #apache-kafka #kafka-consumer-api #confluent-platform

#java #apache-kafka #kafka-consumer-api #confluent-платформа

Вопрос:

я довольно новичок в kafka и пытаюсь объединить несколько потоков по уникальному ключу. У меня есть два разных потока, которые я получаю от прослушивания двух разных тем. Я перераспределяю и объединяю их по соответствующим ключам. Затем я слушаю другую тему C и соединяю предыдущие объединенные данные со свежим streamC. Итак, подведем итог

 A-B=>AB
C->
AB-C>ABC
 

Это работает и объединяет данные именно так, как я хочу, и я доволен этим. По какой-то причине, если, скажем, в раздел C поступают только данные, он будет извлекать данные из хранилища состояний (я полагаю, что он записывает предыдущие данные в хранилище состояний). Таким образом, он выполнит объединение со старой записью. Насколько я понимаю, у kafka есть собственный механизм кэширования с хранилищем состояний, что делает его более эффективным, но в моем случае мои данные довольно динамичны и меняются, поэтому я не хочу, чтобы он использовал старые записи для объединения. По сути, я хочу объединить эти данные только в том случае, если все эти разделы получили новую полезную нагрузку отдельно, и каждый из них также завершает свое собственное объединение. — ни в коем случае не использовать какую-либо предыдущую запись.

Вот соответствующий код

 protected Topology createStreamTopology(){

        StreamsBuilder builder = new StreamsBuilder();


        //Stream A
        KStream<String, A> streamA = builder.stream(
                aTopic,
                Consumed.with(Serdes.String(), aSerde)
        ).selectKey((k, v) -> A.getOrderId(v));


        //Stream B
        //Make a table of the salesforce order line item events
        KStream<String, B> streamB = builder.stream(
                bTopic,
                Consumed.with(Serdes.String(), bSerde)
        ).filter(
                (k1, v1) -> v1.getPayload().getPlatformC() != null
        ).selectKey(
                (k, v) -> v.getOrder()
        );


        //Join stream a and stream b
        KStream<String, AB> abData = streamA.join(
                streamB,
                joinedAB::new,
                JoinWindows.of(Duration.ofHours(2)),
                Joined.with(Serdes.String(), aSerde, bSerde)
        ).selectKey(
                (k, v) -> v.getPayload.getUniqueID());  


        // Stream c
        KStream<String, C>stream streamC=builder.stream(cTopic, Consumed.with(Serdes.String(), serde))
                .mapValues((k,v)->transformToC(v))
                .filter((key, value) -> Objects.nonNull(value))
                .selectKey((key, value) -> value.getID());


        //join stream c and joined abData
        KStream<String, abcData> abcData = streamC.join(
                abData,
                joinedABC::new,
                JoinWindows.of(Duration.ofHours(2)),
                Joined.with(Serdes.String(), cSerde, abSerde)
        );
}