#java #apache-kafka #apache-kafka-streams
#java #apache-kafka #apache-kafka-streams
Вопрос:
У меня есть рабочее приложение Kafka Streams, которое в настоящее время создает два KStreams из двух разных тем. Эта часть работает просто отлично.
Теперь я хочу присоединиться к ним и получить «агрегированную запись» значения в первом и значения во втором. Ключи представляют собой простые строки Java, а значения представляют собой GenericRecords в кодировке avro.
Основываясь на документации, я должен быть в состоянии сделать что-то вроде этого:
KStream<String, GenericAvroSerde> joined =
inputTopicStartKStream.leftJoin(inputTopicEndKStream,
(left, right) -> { ??? }
JoinWindows.of(Duration.ofHours(24)),
Joined.with(
stringSerde,
genericAvroSerde,
genericAvroSerde)
);
Однако из документов или руководств, которые я нашел в Интернете, неясно, что я могу сделать в приведенном выше разделе, в котором говорится { ??? }
. Я пробовал несколько вариантов выше, но безуспешно. Я использую Kakfa Streams версии 2.2.0, если это имеет значение.
Я просто хочу иметь выходной поток <key, merge value1 value2>
для записей, которые поступают в оба потока с одним и тем же ключом. Я могу выполнить объединение значений вручную, но неясно, как даже получить доступ к значениям в правой части лямбды.
Ответ №1:
В ValueJoiner (left, right) -> { ??? }
left представляет значение из левого потока, а right представляет значение из правого потока
Все, что вам нужно сделать, это добавить свой код в ValueJoiner, как показано ниже:
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
KStream<String, GenericAvroSerde> joined =
inputTopicStartKStream.leftJoin(inputTopicEndKStream,
(left, right) -> {
// You can get access to the generic Avro record by
// casting both left and right values
Record leftRecord = (Record) left;
Record rightRecord = (Record) right;
// For the original question, you can simply create a new GenericRecord
// with the contents of left and right records
GenericRecord record = new GenericData.Record(schema);
record.put("left", left);
record.put("right", right);
}
JoinWindows.of(Duration.ofHours(24)),
Joined.with(
stringSerde,
genericAvroSerde,
genericAvroSerde)
);