Перенаправление Внутренних Тем Потоков Кафки

#apache-kafka #apache-kafka-streams #kafka-topic

Вопрос:

В настоящее время работает с потоками Кафки для агрегирования событий в системе клиента. При запуске нашего прототипа с поддельными событиями все работает идеально. Однако при использовании фактических данных мы заметили, что в процессе агрегирования Потоки автоматически создают внутренние темы. Хотя теоретически это нормально, наш клиент обладает необходимой, сверхпрочной безопасностью и не желает предоставлять моей команде разработчиков привилегии на создание тем. Это означает, что мы не можем запускать нашу программу потоков как есть.

Однако мы можем создавать темы для себя и использовать их вместо потоков, создающих свои собственные темы Кафки. Возможно ли/как бы начать перенаправлять создание внутренней темы потоков для использования существующих тем?

Примечание: Мы можем называть внутренние темы как угодно. Он просто должен быть создан командой, у которой есть эти привилегии.

Комментарии:

1. Что такое ваш потоковый клиент? Используете ли вы какие-либо концепции объединения потоков?

Ответ №1:

В потоках Кафки теперь есть перегруженные методы как для KStream, так и для KTable, которые принимают новый параметр с именем. Используя именованный класс DSL, пользователи могут предоставлять значимые имена процессорам в их топологии.

 KStream<String,String> stream =
builder.stream("input", Consumed.as("Customer_transactions_input_topic"));
stream.filter((k,v) -> !v.equals("invalid_txn"), Named.as("filter_out_invalid_txns"))
      .mapValues((v) -> v.substring(0,5), Named.as("Map_values_to_first_6_characters"))
      .to("output", Produced.as("Mapped_transactions_output_topic"));
Topologies:
  Sub-topology: 0
   Source: Customer_transactions_input_topic (topics: [input])
     --> filter_out_invalid_txns
   Processor: filter_out_invalid_txns (stores: [])
     --> Map_values_to_first_6_characters
     <-- Customer_transactions_input_topic
   Processor: Map_values_to_first_6_characters (stores: [])
     --> Mapped_transactions_output_topic
     <-- filter_out_invalid_txns
   Sink: Mapped_transactions_output_topic (topic: output)
     <-- Map_values_to_first_6_characters
 

Теперь взгляните на свою топологию со всеми процессорами, названными:

Теперь вы можете посмотреть описание топологии и легко понять, какую роль в топологии играет каждый процессор. Но есть еще одна причина для присвоения имен узлам процессора, когда у вас есть операторы с отслеживанием состояния, которые остаются между перезапусками приложений потоков Кафки, хранилищ состояний, разделов журнала изменений и разделов перераспределения, что связано с потенциальным изменением имен узлов процессора, которые используют сгенерированные имена.