Регулирование Kafka connect

#apache-kafka #apache-kafka-connect

#apache-kafka #apache-kafka-connect

Вопрос:

У меня есть требование использовать сообщения от имени группы ленивых потребителей, которые просто предоставляют REST API. Поэтому я планирую использовать соединители приемника, которые извлекают сообщения из разделов Kafka и выполняют операцию HTTP POST для открытых API.

Одним из ключевых факторов для рассмотрения является регулирование. Какой механизм вы предлагаете для регулирования задач приемника в соответствии с уровнем SLA API. Я понимаю, что у Kafka есть функция клиентской квоты, однако, каков оптимальный механизм для отслеживания запросов API в минуту или секунду, который позволил бы динамически корректировать клиентскую квоту?

Ответ №1:

Я думаю, что лучший способ реализовать ограничение скорости для вашего REST API был бы в вашем коде соединителя, заблокировав при необходимости в SinkTask.put() . Возможно, вы захотите подумать о том, достаточно ли ограничения скорости на уровне вашего SinkTask s или вам нужно, чтобы оно было глобальным (более сложным, поскольку требуется координация).

Преимущество использования квот Kafka, которые вы рассматривали, заключается в том, что распределенный аспект обрабатывается за вас, однако я полагаю, что в настоящее время они могут быть настроены только в терминах передаваемых байтов.

Комментарии:

1. Это верно. Клиентские квоты Kafka настраиваются на передаваемые байты, тогда как регулирование REST API осуществляется на количество запросов. В моем случае у меня было бы несколько задач приемника, запущенных в нескольких workers в облаке. Когда вы говорите «глобальный», вы имеете в виду сохранение значения no. запросов в чем-то вроде кэша (redis)? Есть ли элегантный способ реализовать это через Kafka connect?

2. Да, используя что-то вроде Redis или Memcached для поддержания общего состояния, необходимого для запросов API с ограничением скорости. Если вас устраивает потенциально слишком консервативное ограничение скорости, вы могли бы вместо этого просто инициализировать Guava RateLimiter для каждого SinkTask , который вы получаете в put() методе. Количество разрешений, которые вы предоставляете RateLimiter может быть (overall_limit / num_tasks) (вы могли бы увеличить ограничение на уровне задачи при создании конфигурации задачи из SinkConnector.taskConfigs() ).

3. Я полагаю, что при использовании Guava RateLimiter нам понадобится один и тот же экземпляр RateLimiter для всех задач приемника, которые выполняются несколькими рабочими, скажем, на нескольких стойках. То есть задачи 1 и 2 должны иметь возможность доступа к единственному экземпляру RateLimiter, для которого будет вызван метод acquire(). Кстати, я полагаю, вы предлагаете Guava RateLimiter в качестве альтернативы механизму кэширования?

4. Да, использование для каждой задачи RateLimiter было бы альтернативой использованию глобального общего состояния в Memcached / Redis / etc. Вы не должны предполагать, что задачи выполняются на одном и том же работнике, поэтому я бы предложил сохранить RateLimiter для каждого экземпляра задачи разрешения, равномерно распределенные между ними в зависимости от количества задач. Это должно хорошо работать, если работа равномерно распределена по задачам.