Добавление глобального хранилища для использования трансформатором

#apache-kafka-streams

#apache-kafka-streams

Вопрос:

Есть ли способ добавить глобальное хранилище для использования Transformer? В документах для transformer говорится:

«Преобразуйте каждую запись входного потока в ноль или более записей в выходном потоке (как ключ, так и тип значения могут быть изменены произвольно). Трансформатор (предоставляемый данным TransformerSupplier) применяется к каждой входной записи и вычисляет ноль или более выходных записей. Чтобы назначить состояние, состояние должно быть создано и зарегистрировано заранее через хранилища, добавленные через addStateStore или addGlobalStore, прежде чем их можно будет подключить к трансформатору»

тем не менее, API для addGlobalStore использует processssupplier?

 addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
                     topic: String,
                     consumed: Consumed[_, _],
                     stateUpdateSupplier: ProcessorSupplier[_, _])
  

Моя конечная цель — использовать DSL Kafka Streams с transformer, поскольку мне нужна flatMap и преобразовать оба ключа и значения в мою тему вывода. Хотя в моей топологии нет процессора.

Я бы ожидал чего-то подобного:

 addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore], topic: String, consumed: Consumed[_, ], stateUpdateSupplier: TransformerSupplier[, _])
  

Ответ №1:

Processor То, что передается в addGlobalStore() , используется для обслуживания (т. Е. записи) хранилища. Обратите внимание, что ожидается, что при этом Processor данные как есть будут скопированы в хранилище (см. https://issues.apache.org/jira/browse/KAFKA-7663).

После того, как вы добавили глобальное хранилище, вы также можете добавить a, Transformer и Transformer сможете получить доступ к хранилищу. Обратите внимание, что для его доступности не требуется подключать глобальное хранилище (потребуется добавить только «обычные» хранилища). Также обратите внимание, что a Transformer получает доступ на чтение только к глобальным хранилищам.

Комментарии:

1. Есть прогресс по этому запросу? Нашел это в kafka-streams 2.0.1.

2. Кажется, никто не подобрал его. Не стесняйтесь использовать его 🙂 — В конце концов, Apache Kafka — это проект сообщества 🙂

3. @MatthiasJ.Sax Когда можно использовать GlobalStateStore вместо GlobalKTable? Я хочу загрузить данные темы kafka в память, чтобы я мог посмотреть некоторые сопоставления статических идентификаторов в приложении kafka stream.

4. Оба в основном одинаковы. Кажется, что GlobalKTable предпочтительнее, поскольку он проще в использовании.

5. Как получить доступ к globalStateStore изнутри transformer, любой пример? При попытке доступа к тому же, он выдает исключение с не удалось заблокировать глобальное хранилище состояний, вероятно, несколько потоков, запущенных на одном хосте.

Ответ №2:

Используйте Processor вместо Transformer для всех преобразований, которые вы хотите выполнить в разделе ввода, всякий раз, когда есть возможность поиска данных из GlobalStateStore . Используется context.forward(key,value,childName) для отправки данных на нижестоящие узлы. context.forward(key,value,childName) может вызываться несколько раз в a process() и punctuate() , чтобы отправить несколько записей на нижестоящий узел. Если требуется обновить GlobalStateStore, делайте это только в процессоре, переданном addGlobalStore(..) , поскольку с GlobalStateStore связан GlobalStreamThread, который поддерживает согласованное состояние хранилища во всех запущенных экземплярах kstream.