#apache-kafka #apache-kafka-connect
#apache-kafka #apache-kafka-connect
Вопрос:
У меня есть требование использовать сообщения от имени группы ленивых потребителей, которые просто предоставляют REST API. Поэтому я планирую использовать соединители приемника, которые извлекают сообщения из разделов Kafka и выполняют операцию HTTP POST для открытых API.
Одним из ключевых факторов для рассмотрения является регулирование. Какой механизм вы предлагаете для регулирования задач приемника в соответствии с уровнем SLA API. Я понимаю, что у Kafka есть функция клиентской квоты, однако, каков оптимальный механизм для отслеживания запросов API в минуту или секунду, который позволил бы динамически корректировать клиентскую квоту?
Ответ №1:
Я думаю, что лучший способ реализовать ограничение скорости для вашего REST API был бы в вашем коде соединителя, заблокировав при необходимости в SinkTask.put()
. Возможно, вы захотите подумать о том, достаточно ли ограничения скорости на уровне вашего SinkTask
s или вам нужно, чтобы оно было глобальным (более сложным, поскольку требуется координация).
Преимущество использования квот Kafka, которые вы рассматривали, заключается в том, что распределенный аспект обрабатывается за вас, однако я полагаю, что в настоящее время они могут быть настроены только в терминах передаваемых байтов.
Комментарии:
1. Это верно. Клиентские квоты Kafka настраиваются на передаваемые байты, тогда как регулирование REST API осуществляется на количество запросов. В моем случае у меня было бы несколько задач приемника, запущенных в нескольких workers в облаке. Когда вы говорите «глобальный», вы имеете в виду сохранение значения no. запросов в чем-то вроде кэша (redis)? Есть ли элегантный способ реализовать это через Kafka connect?
2. Да, используя что-то вроде Redis или Memcached для поддержания общего состояния, необходимого для запросов API с ограничением скорости. Если вас устраивает потенциально слишком консервативное ограничение скорости, вы могли бы вместо этого просто инициализировать Guava
RateLimiter
для каждогоSinkTask
, который вы получаете вput()
методе. Количество разрешений, которые вы предоставляетеRateLimiter
может быть(overall_limit / num_tasks)
(вы могли бы увеличить ограничение на уровне задачи при создании конфигурации задачи изSinkConnector.taskConfigs()
).3. Я полагаю, что при использовании Guava RateLimiter нам понадобится один и тот же экземпляр RateLimiter для всех задач приемника, которые выполняются несколькими рабочими, скажем, на нескольких стойках. То есть задачи 1 и 2 должны иметь возможность доступа к единственному экземпляру RateLimiter, для которого будет вызван метод acquire(). Кстати, я полагаю, вы предлагаете Guava RateLimiter в качестве альтернативы механизму кэширования?
4. Да, использование для каждой задачи
RateLimiter
было бы альтернативой использованию глобального общего состояния в Memcached / Redis / etc. Вы не должны предполагать, что задачи выполняются на одном и том же работнике, поэтому я бы предложил сохранитьRateLimiter
для каждого экземпляра задачи разрешения, равномерно распределенные между ними в зависимости от количества задач. Это должно хорошо работать, если работа равномерно распределена по задачам.