Как присоединиться к потоку, созданному из одной темы, к KTable, производному (как агрегатная операция) из другой темы

#apache-kafka-streams

#apache-kafka-streams

Вопрос:

Проблема: Как присоединиться к потоку, созданному из TOPIC_2(на шаге 2), к KTable stateTable (на шаге 1 формата).

Цель: после операции объединения, если мы изменим состояние объекта AlarmState (значение KTable stateTable), то то же состояние должно быть отражено в таблице состояний (часть шага 1)

Существует KTable (как stateTable), описанный в Step1 (созданный из TOPIC_1) Существует еще одна тема TOPIC_2, в которой генерируются данные (на шаге 2) Ключ stateTable и сгенерированные данные в TOPIC_2 одинаковы

Шаг 1.

 final KStream<String, MetricBasicMessage> basicMsgStream = builder.stream("TOPIC_1",
                Consumed.with(Serdes.String(), new JSONSerde<>()));

KTable <String, AlarmState> stateTable = 
         builder.stream("TOPIC_1",Consumed.with(Serdes.String(), new JSONSerde<>()))
                .flatMapValues(...)
                .filter(...)
                .map(...)
                .groupByKey(...)
                .aggregate(...);

final KafkaStreams streams = new KafkaStreams(builder.build(), <streamsConfiguration>);
        streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
 

Шаг2.

 String keyToJoinWithState = key.substring(0, index);

producer.send("TOPIC_2", keyToJoinWithState, new NotificationMessage(taskType, thresh),"NOTIIFCATION_MESSAGE");
 

Ответ №1:

Если вы хотите присоединиться к потоку с некоторой таблицей, вам просто нужно вызвать

KStream::join(final KTable<K, VT> table, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner);

Это будет что-то вроде этого:

 KStream<String, String> stream2 = builder.<String, NotificationMessage >stream("TOPIC_2", Consumed.with(Serdes.String(), new NotificationMessageSerdes()));
stream2.join(stateTable, (v1, v2) -> ??? /* How to join values from Stream and KTable */).to("output2");
 

Комментарии:

1. Ваше предложение, которое я уже пробовал, и оно работает, если мы объединим KTable и Stream, созданные из темы, но здесь KTable является производным от другой темы как агрегатная операция. Получение следующего исключения «org.apache. kafka.streams.errors. Исключение TopologyException: недопустимая топология: StateStore AlarmState еще не добавлен.» если я хочу реализовать свой вариант использования.

2. @OmSpatel, я думаю, что это не связано с объединением. Кажется, что ваши процессоры используют хранилища состояний, которые не зарегистрированы, не могли бы вы прикрепить исходный код?

3. Спасибо Wardziniak за ваш ответ… Это сработало после добавления объекта хранилища состояний. конечный конструктор StreamsBuilder = новый StreamsBuilder(); KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(«Состояние тревоги»); StoreBuilder<KeyValueStore<Строка, JSONSERD несовместимая>> storeBuilder = Stores.keyValueStoreBuilder(storeSupplier, Serdes. String(), new JSONSerde<>()); builder.addStateStore(storeBuilder);

4. @OmSpatel, если это поможет, вы можете проголосовать и / или принять ответ.