Соединение KStream — KTable: для добавления в поток данных Ktable

#apache-kafka-streams

Вопрос:

Я реализую приложение с использованием потоков Кафки, и у меня есть следующий сценарий:

Тематические заказы

 {"id":100}:{"id":100, "clientId":1, "statusId"=1}
{"id":101}:{"id":101, "clientId":2, "statusId"=1}
{"id":102}:{"id":102, "clientId":3, "statusId"=1}
 

Создайте KStream из темы

 final KStream<Integer, OrderSource> orderStream = builder
        .stream("orders", Consumed.with(CustomSerdesFactory.keySerde(), CustomSerdesFactory.orderSerde()))
        .map((key, value) -> new KeyValue<>(value.getId(), value));
 

Результат выполнения заказов KStream

 [KStream]: 100, OrderSource(id=100, statusId=1, clientId=1)
[KStream]: 101, OrderSource(id=101, statusId=1, clientId=2)
[KStream]: 102, OrderSource(id=102, statusId=1, clientId=3)
 

Я хотел бы обогатить этот поток данными, хранящимися в таблице K.

Эта таблица KTable создана из другой темы:

Тематические клиенты

 {"id":1}:{"id":1, "name":"name1", "email"="name1@test.com"}
{"id":2}:{"id":2, "name":"name2", "email"="name2@test.com"}
{"id":3}:{"id":3, "name":"name3", "email"="name3@test.com"}
 

Создайте таблицу K из темы

 final KTable<Integer, ClientSource> clientTable = builder
        .stream("clients", Consumed.with(CustomSerdesFactory.keySerde(), CustomSerdesFactory.clientSerde()))
        .map((key, value) -> new KeyValue<>(value.getId(), value))
        .toTable(Materialized.<Integer, ClientSource, KeyValueStore<Bytes, byte[]>>as("client-table")
          .withKeySerde(Serdes.Integer())
          .withValueSerde(CustomSerdesFactory.clientSerde())
        );
 

Результат клиента KTable

 [KTable]: 1, ClientSource(id=1, name=name1, email=name1@test.com)
[KTable]: 2, ClientSource(id=2, name=name2, email=name2@test.com)
[KTable]: 3, ClientSource(id=3, name=name3, email=name3@test.com)
 

Ожидаемым результатом будет KStream, подобный этому:

 [KStream]: 100, OrderEnrichedSource(id=100, statusId=1, name=name1, email=name1@test.com)
[KStream]: 101, OrderEnrichedSource(id=101, statusId=1, name=name2, email=name2@test.com)
[KStream]: 102, OrderEnrichedSource(id=102, statusId=1, name=name3, email=name3@test.com)
 

Прежде всего, можно ли создать соединение (или левое соединение), которое позволяет это сделать? Я прочитал документацию и не вижу подобного примера, хотя, возможно, я плохо понимаю концепции.

Если возможно, как это соединение будет выполнено в коде Java?

С уважением, друзья =)