Kafka транслирует DSL через Kafka Consumer API

#apache-kafka #kafka-consumer-api #apache-kafka-streams

#apache-kafka #kafka-consumer-api #apache-kafka-streams

Вопрос:

Недавно в интервью мне задали вопросы о потоках Kafka, более конкретно, интервьюер хотел знать, почему / когда вы будете использовать потоки Kafka DSL поверх простого пользовательского API Kafka для чтения и обработки потоков сообщений? Я не смог дать убедительный ответ и задаюсь вопросом, могут ли другие, использующие эти два стиля потоковой обработки, поделиться своими мыслями / мнениями. Спасибо.

Комментарии:

1. Простой ответ заключается в том, что потребители не могут легко присоединиться к разным темам.

Ответ №1:

Как обычно, это зависит от варианта использования, когда использовать KafkaStreams API, а когда использовать обычный KafkaProducer / Consumer. Я бы не осмелился выбирать один над другим в общих чертах.

Прежде всего, KafkaStreams построен поверх KafkaProducers / Consumers, поэтому все, что возможно с KafkaStreams, также возможно с обычными потребителями / производителями.

Я бы сказал, что API KafkaStreams менее сложный, но и менее гибкий по сравнению с обычными потребителями / производителями. Теперь мы могли бы начать долгие дискуссии о том, что означает «меньше».

Когда дело доходит до разработки Kafka Streams API, вы можете напрямую перейти к своей бизнес-логике, применяя такие методы, как filter , map , join или aggregate , потому что вся потребляющая и производящая часть абстрагируется за кулисами.

Когда вы разрабатываете приложения с использованием простых потребителей / производителей, вам нужно подумать о том, как вы создаете своих клиентов на уровне subscribe , poll , send flush и т.д.

Если вы хотите иметь еще меньшую сложность (но и меньшую гибкость), ksqldb — это еще один вариант, который вы можете выбрать для создания своих приложений Kafka.

Ответ №2:

Вот некоторые из сценариев, в которых вы можете предпочесть потоки Kafka базовому API производителя / потребителя:

  1. Это позволяет вам с легкостью создавать сложный конвейер обработки. Итак. давайте предположим (надуманный пример), что у вас есть раздел, содержащий заказы клиентов, и вы хотите отфильтровать заказы на основе города доставки и сохранить их в таблице DB для сохранения и индексе Elasticsearch для быстрого поиска. В таком сценарии вы должны использовать сообщения из исходной темы, отфильтровывать ненужные заказы на основе города с помощью функции Streams DSL filter , сохранять данные фильтра в отдельной теме Kafka (используя KStream.to() или KTable.to() ) и, наконец, используя Kafka Connect, сообщения будут сохранены в таблице базы данных и Elasticsearch. Вы также можете сделать то же самое, используя core Producer / Consumer API, но это потребует гораздо больше кодирования.

  2. В конвейере обработки данных вы можете выполнять потреблять, обрабатывать и производить в одной транзакции. Итак, в приведенном выше примере Kafka обеспечит однократную семантику и транзакцию от исходного раздела до базы данных и Elasticsearch. Из-за сбоев в сети и повторных попыток не будет никаких повторяющихся сообщений. Эта функция особенно полезна, когда вы выполняете агрегированные данные, такие как количество заказов на уровне отдельного продукта. В таких сценариях дубликаты всегда будут давать неверный результат.

  3. Вы также можете обогатить свои входящие данные с гораздо меньшей задержкой. Давайте предположим, что в приведенном выше примере вы хотите дополнить данные заказа адресом электронной почты клиента из ваших сохраненных данных клиента. Что бы вы сделали в отсутствие потоков Kafka? Вероятно, вы бы вызывали REST API для каждого входящего заказа по сети, что, безусловно, будет дорогостоящей операцией, влияющей на вашу пропускную способность. В таком случае вы можете захотеть сохранить требуемые данные клиента в сжатом разделе Kafka и загрузить их в потоковое приложение с помощью KTable или GlobalKTable . И теперь все, что вам нужно, это выполнить простой локальный поиск в KTable для адреса электронной почты клиента. Обратите внимание, что данные KTable здесь будут храниться во встроенной базе данных RocksDB, которая поставляется с Kafka Streams, а также, поскольку KTable поддерживается разделом Kafka, ваши данные в потоковом приложении будут постоянно обновляться в режиме реального времени. Другими словами, устаревших данных не будет. По сути, это пример шаблона материализованного представления.

  4. Допустим, вы хотите объединить два разных потока данных. Итак, в приведенном выше примере вы хотите обрабатывать только заказы с успешными платежами, а платежные данные поступают через другой раздел Kafka. Теперь может случиться так, что платеж задерживается или событие платежа происходит до события заказа. В таком случае вы можете захотеть выполнить оконное объединение на один час. Таким образом, если заказ и соответствующие платежные события поступают в течение одного часа, заказ будет отправлен по конвейеру для дальнейшей обработки. Как вы можете видеть, вам нужно сохранить состояние для одночасового окна, и это состояние будет сохранено в Rocks DB Kafka Streams.