#apache-kafka-streams
Вопрос:
Я реализую приложение с использованием потоков Кафки, и у меня есть следующий сценарий:
Тематические заказы
{"id":100}:{"id":100, "clientId":1, "statusId"=1}
{"id":101}:{"id":101, "clientId":2, "statusId"=1}
{"id":102}:{"id":102, "clientId":3, "statusId"=1}
Создайте KStream из темы
final KStream<Integer, OrderSource> orderStream = builder
.stream("orders", Consumed.with(CustomSerdesFactory.keySerde(), CustomSerdesFactory.orderSerde()))
.map((key, value) -> new KeyValue<>(value.getId(), value));
Результат выполнения заказов KStream
[KStream]: 100, OrderSource(id=100, statusId=1, clientId=1)
[KStream]: 101, OrderSource(id=101, statusId=1, clientId=2)
[KStream]: 102, OrderSource(id=102, statusId=1, clientId=3)
Я хотел бы обогатить этот поток данными, хранящимися в таблице K.
Эта таблица KTable создана из другой темы:
Тематические клиенты
{"id":1}:{"id":1, "name":"name1", "email"="name1@test.com"}
{"id":2}:{"id":2, "name":"name2", "email"="name2@test.com"}
{"id":3}:{"id":3, "name":"name3", "email"="name3@test.com"}
Создайте таблицу K из темы
final KTable<Integer, ClientSource> clientTable = builder
.stream("clients", Consumed.with(CustomSerdesFactory.keySerde(), CustomSerdesFactory.clientSerde()))
.map((key, value) -> new KeyValue<>(value.getId(), value))
.toTable(Materialized.<Integer, ClientSource, KeyValueStore<Bytes, byte[]>>as("client-table")
.withKeySerde(Serdes.Integer())
.withValueSerde(CustomSerdesFactory.clientSerde())
);
Результат клиента KTable
[KTable]: 1, ClientSource(id=1, name=name1, email=name1@test.com)
[KTable]: 2, ClientSource(id=2, name=name2, email=name2@test.com)
[KTable]: 3, ClientSource(id=3, name=name3, email=name3@test.com)
Ожидаемым результатом будет KStream, подобный этому:
[KStream]: 100, OrderEnrichedSource(id=100, statusId=1, name=name1, email=name1@test.com)
[KStream]: 101, OrderEnrichedSource(id=101, statusId=1, name=name2, email=name2@test.com)
[KStream]: 102, OrderEnrichedSource(id=102, statusId=1, name=name3, email=name3@test.com)
Прежде всего, можно ли создать соединение (или левое соединение), которое позволяет это сделать? Я прочитал документацию и не вижу подобного примера, хотя, возможно, я плохо понимаю концепции.
Если возможно, как это соединение будет выполнено в коде Java?
С уважением, друзья =)