#apache-kafka #kafka-consumer-api #apache-kafka-streams
#apache-kafka #kafka-consumer-api #apache-kafka-streams
Вопрос:
Недавно в интервью мне задали вопросы о потоках Kafka, более конкретно, интервьюер хотел знать, почему / когда вы будете использовать потоки Kafka DSL поверх простого пользовательского API Kafka для чтения и обработки потоков сообщений? Я не смог дать убедительный ответ и задаюсь вопросом, могут ли другие, использующие эти два стиля потоковой обработки, поделиться своими мыслями / мнениями. Спасибо.
Комментарии:
1. Простой ответ заключается в том, что потребители не могут легко присоединиться к разным темам.
Ответ №1:
Как обычно, это зависит от варианта использования, когда использовать KafkaStreams API, а когда использовать обычный KafkaProducer / Consumer. Я бы не осмелился выбирать один над другим в общих чертах.
Прежде всего, KafkaStreams построен поверх KafkaProducers / Consumers, поэтому все, что возможно с KafkaStreams, также возможно с обычными потребителями / производителями.
Я бы сказал, что API KafkaStreams менее сложный, но и менее гибкий по сравнению с обычными потребителями / производителями. Теперь мы могли бы начать долгие дискуссии о том, что означает «меньше».
Когда дело доходит до разработки Kafka Streams API, вы можете напрямую перейти к своей бизнес-логике, применяя такие методы, как filter
, map
, join
или aggregate
, потому что вся потребляющая и производящая часть абстрагируется за кулисами.
Когда вы разрабатываете приложения с использованием простых потребителей / производителей, вам нужно подумать о том, как вы создаете своих клиентов на уровне subscribe
, poll
, send
flush
и т.д.
Если вы хотите иметь еще меньшую сложность (но и меньшую гибкость), ksqldb — это еще один вариант, который вы можете выбрать для создания своих приложений Kafka.
Ответ №2:
Вот некоторые из сценариев, в которых вы можете предпочесть потоки Kafka базовому API производителя / потребителя:
-
Это позволяет вам с легкостью создавать сложный конвейер обработки. Итак. давайте предположим (надуманный пример), что у вас есть раздел, содержащий заказы клиентов, и вы хотите отфильтровать заказы на основе города доставки и сохранить их в таблице DB для сохранения и индексе Elasticsearch для быстрого поиска. В таком сценарии вы должны использовать сообщения из исходной темы, отфильтровывать ненужные заказы на основе города с помощью функции Streams DSL
filter
, сохранять данные фильтра в отдельной теме Kafka (используяKStream.to()
илиKTable.to()
) и, наконец, используя Kafka Connect, сообщения будут сохранены в таблице базы данных и Elasticsearch. Вы также можете сделать то же самое, используя core Producer / Consumer API, но это потребует гораздо больше кодирования. -
В конвейере обработки данных вы можете выполнять потреблять, обрабатывать и производить в одной транзакции. Итак, в приведенном выше примере Kafka обеспечит однократную семантику и транзакцию от исходного раздела до базы данных и Elasticsearch. Из-за сбоев в сети и повторных попыток не будет никаких повторяющихся сообщений. Эта функция особенно полезна, когда вы выполняете агрегированные данные, такие как количество заказов на уровне отдельного продукта. В таких сценариях дубликаты всегда будут давать неверный результат.
-
Вы также можете обогатить свои входящие данные с гораздо меньшей задержкой. Давайте предположим, что в приведенном выше примере вы хотите дополнить данные заказа адресом электронной почты клиента из ваших сохраненных данных клиента. Что бы вы сделали в отсутствие потоков Kafka? Вероятно, вы бы вызывали REST API для каждого входящего заказа по сети, что, безусловно, будет дорогостоящей операцией, влияющей на вашу пропускную способность. В таком случае вы можете захотеть сохранить требуемые данные клиента в сжатом разделе Kafka и загрузить их в потоковое приложение с помощью
KTable
илиGlobalKTable
. И теперь все, что вам нужно, это выполнить простой локальный поиск в KTable для адреса электронной почты клиента. Обратите внимание, что данные KTable здесь будут храниться во встроенной базе данных RocksDB, которая поставляется с Kafka Streams, а также, поскольку KTable поддерживается разделом Kafka, ваши данные в потоковом приложении будут постоянно обновляться в режиме реального времени. Другими словами, устаревших данных не будет. По сути, это пример шаблона материализованного представления. -
Допустим, вы хотите объединить два разных потока данных. Итак, в приведенном выше примере вы хотите обрабатывать только заказы с успешными платежами, а платежные данные поступают через другой раздел Kafka. Теперь может случиться так, что платеж задерживается или событие платежа происходит до события заказа. В таком случае вы можете захотеть выполнить оконное объединение на один час. Таким образом, если заказ и соответствующие платежные события поступают в течение одного часа, заказ будет отправлен по конвейеру для дальнейшей обработки. Как вы можете видеть, вам нужно сохранить состояние для одночасового окна, и это состояние будет сохранено в Rocks DB Kafka Streams.