#apache-kafka #apache-kafka-streams #kafka-topic
Вопрос:
В настоящее время работает с потоками Кафки для агрегирования событий в системе клиента. При запуске нашего прототипа с поддельными событиями все работает идеально. Однако при использовании фактических данных мы заметили, что в процессе агрегирования Потоки автоматически создают внутренние темы. Хотя теоретически это нормально, наш клиент обладает необходимой, сверхпрочной безопасностью и не желает предоставлять моей команде разработчиков привилегии на создание тем. Это означает, что мы не можем запускать нашу программу потоков как есть.
Однако мы можем создавать темы для себя и использовать их вместо потоков, создающих свои собственные темы Кафки. Возможно ли/как бы начать перенаправлять создание внутренней темы потоков для использования существующих тем?
Примечание: Мы можем называть внутренние темы как угодно. Он просто должен быть создан командой, у которой есть эти привилегии.
Комментарии:
1. Что такое ваш потоковый клиент? Используете ли вы какие-либо концепции объединения потоков?
Ответ №1:
В потоках Кафки теперь есть перегруженные методы как для KStream, так и для KTable, которые принимают новый параметр с именем. Используя именованный класс DSL, пользователи могут предоставлять значимые имена процессорам в их топологии.
KStream<String,String> stream =
builder.stream("input", Consumed.as("Customer_transactions_input_topic"));
stream.filter((k,v) -> !v.equals("invalid_txn"), Named.as("filter_out_invalid_txns"))
.mapValues((v) -> v.substring(0,5), Named.as("Map_values_to_first_6_characters"))
.to("output", Produced.as("Mapped_transactions_output_topic"));
Topologies:
Sub-topology: 0
Source: Customer_transactions_input_topic (topics: [input])
--> filter_out_invalid_txns
Processor: filter_out_invalid_txns (stores: [])
--> Map_values_to_first_6_characters
<-- Customer_transactions_input_topic
Processor: Map_values_to_first_6_characters (stores: [])
--> Mapped_transactions_output_topic
<-- filter_out_invalid_txns
Sink: Mapped_transactions_output_topic (topic: output)
<-- Map_values_to_first_6_characters
Теперь взгляните на свою топологию со всеми процессорами, названными:
Теперь вы можете посмотреть описание топологии и легко понять, какую роль в топологии играет каждый процессор. Но есть еще одна причина для присвоения имен узлам процессора, когда у вас есть операторы с отслеживанием состояния, которые остаются между перезапусками приложений потоков Кафки, хранилищ состояний, разделов журнала изменений и разделов перераспределения, что связано с потенциальным изменением имен узлов процессора, которые используют сгенерированные имена.