Как я могу получить доступ к исходным записям при присоединении к потокам Kafka

#java #apache-kafka #apache-kafka-streams

#java #apache-kafka #apache-kafka-streams

Вопрос:

У меня есть рабочее приложение Kafka Streams, которое в настоящее время создает два KStreams из двух разных тем. Эта часть работает просто отлично.

Теперь я хочу присоединиться к ним и получить «агрегированную запись» значения в первом и значения во втором. Ключи представляют собой простые строки Java, а значения представляют собой GenericRecords в кодировке avro.

Основываясь на документации, я должен быть в состоянии сделать что-то вроде этого:

     KStream<String, GenericAvroSerde> joined =
        inputTopicStartKStream.leftJoin(inputTopicEndKStream,
        (left, right) -> { ??? }
        JoinWindows.of(Duration.ofHours(24)),
        Joined.with(
            stringSerde,
            genericAvroSerde,
            genericAvroSerde)
    );
 

Однако из документов или руководств, которые я нашел в Интернете, неясно, что я могу сделать в приведенном выше разделе, в котором говорится { ??? } . Я пробовал несколько вариантов выше, но безуспешно. Я использую Kakfa Streams версии 2.2.0, если это имеет значение.

Я просто хочу иметь выходной поток <key, merge value1 value2> для записей, которые поступают в оба потока с одним и тем же ключом. Я могу выполнить объединение значений вручную, но неясно, как даже получить доступ к значениям в правой части лямбды.

Ответ №1:

В ValueJoiner (left, right) -> { ??? } left представляет значение из левого потока, а right представляет значение из правого потока

Все, что вам нужно сделать, это добавить свой код в ValueJoiner, как показано ниже:

 import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;

KStream<String, GenericAvroSerde> joined =
    inputTopicStartKStream.leftJoin(inputTopicEndKStream,
    (left, right) -> {
             // You can get access to the generic Avro record by
             // casting both left and right values 
             Record leftRecord = (Record) left;
             Record rightRecord = (Record) right;

             // For the original question, you can simply create a new GenericRecord 
             // with the contents of left and right records
             GenericRecord record = new GenericData.Record(schema);
             record.put("left", left);
             record.put("right", right);
    }
    JoinWindows.of(Duration.ofHours(24)),
    Joined.with(
        stringSerde,
        genericAvroSerde,
        genericAvroSerde)
);