#apache-kafka-streams
#apache-kafka-streams
Вопрос:
Есть ли способ добавить глобальное хранилище для использования Transformer? В документах для transformer говорится:
«Преобразуйте каждую запись входного потока в ноль или более записей в выходном потоке (как ключ, так и тип значения могут быть изменены произвольно). Трансформатор (предоставляемый данным TransformerSupplier) применяется к каждой входной записи и вычисляет ноль или более выходных записей. Чтобы назначить состояние, состояние должно быть создано и зарегистрировано заранее через хранилища, добавленные через addStateStore или addGlobalStore, прежде чем их можно будет подключить к трансформатору»
тем не менее, API для addGlobalStore использует processssupplier?
addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore],
topic: String,
consumed: Consumed[_, _],
stateUpdateSupplier: ProcessorSupplier[_, _])
Моя конечная цель — использовать DSL Kafka Streams с transformer, поскольку мне нужна flatMap и преобразовать оба ключа и значения в мою тему вывода. Хотя в моей топологии нет процессора.
Я бы ожидал чего-то подобного:
addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore], topic: String, consumed: Consumed[_, ], stateUpdateSupplier: TransformerSupplier[, _])
Ответ №1:
Processor
То, что передается в addGlobalStore()
, используется для обслуживания (т. Е. записи) хранилища. Обратите внимание, что ожидается, что при этом Processor
данные как есть будут скопированы в хранилище (см. https://issues.apache.org/jira/browse/KAFKA-7663).
После того, как вы добавили глобальное хранилище, вы также можете добавить a, Transformer
и Transformer
сможете получить доступ к хранилищу. Обратите внимание, что для его доступности не требуется подключать глобальное хранилище (потребуется добавить только «обычные» хранилища). Также обратите внимание, что a Transformer
получает доступ на чтение только к глобальным хранилищам.
Комментарии:
1. Есть прогресс по этому запросу? Нашел это в kafka-streams 2.0.1.
2. Кажется, никто не подобрал его. Не стесняйтесь использовать его 🙂 — В конце концов, Apache Kafka — это проект сообщества 🙂
3. @MatthiasJ.Sax Когда можно использовать GlobalStateStore вместо GlobalKTable? Я хочу загрузить данные темы kafka в память, чтобы я мог посмотреть некоторые сопоставления статических идентификаторов в приложении kafka stream.
4. Оба в основном одинаковы. Кажется, что GlobalKTable предпочтительнее, поскольку он проще в использовании.
5. Как получить доступ к globalStateStore изнутри transformer, любой пример? При попытке доступа к тому же, он выдает исключение с не удалось заблокировать глобальное хранилище состояний, вероятно, несколько потоков, запущенных на одном хосте.
Ответ №2:
Используйте Processor вместо Transformer для всех преобразований, которые вы хотите выполнить в разделе ввода, всякий раз, когда есть возможность поиска данных из GlobalStateStore . Используется context.forward(key,value,childName)
для отправки данных на нижестоящие узлы. context.forward(key,value,childName)
может вызываться несколько раз в a process()
и punctuate()
, чтобы отправить несколько записей на нижестоящий узел. Если требуется обновить GlobalStateStore, делайте это только в процессоре, переданном addGlobalStore(..)
, поскольку с GlobalStateStore связан GlobalStreamThread, который поддерживает согласованное состояние хранилища во всех запущенных экземплярах kstream.