Перезапустить обработку с момента последнего зафиксированного смещения в Кафке

#java #apache-kafka

Вопрос:

Отговорка

У меня есть:

  • Подключенный потребитель Кафки
  • Потребитель является частью группы потребителей.
  • Прослушиватель перебалансировки, подключенный к потребителю
  • Автоматическая фиксация отключена

Кроме того, у меня есть метод, который принимает два параметра: потребитель и прослушиватель перебалансировки, который отслеживает, какие разделы были назначены потребителю

 void aggregateProcessing(ConsumerRecords<String, SomeClass> consumer, RebalanceListener listener)
 
 public class RebalanceListener implements ConsumerRebalanceListener {
    private Set<TopicPartition> assignedPartitions = new LinkedHashSet<>();

    @Override
    public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
       // keep track of assigned partitions
    }   
}
 

Цель

Этот метод запускается по таймеру, и его цель состоит в обработке записей до тех пор, пока не останется ни одной для чтения или до некоторого максимального времени во всех разделах.

Поскольку перебалансировка может произойти в середине потребления (после consumer.poll() того, как она уже сработала несколько раз), я хотел бы обнаружить это, сбросить и перезапустить обработку с последнего зафиксированного смещения для всех назначенных разделов (даже если это уже было назначено).

Есть ли способ сбросить внутреннее смещение потребителя для каждого раздела до последнего зафиксированного смещения для списка назначенных разделов?

Боковая заметка

Я понимаю, что повторная обработка для всех разделов (а не только для тех, которые были изменены) менее эффективна, чем выборочное удаление части обработки, но, вероятно, будет значительно проще, чем отслеживание того, какие данные необходимо удалить при удалении раздела.

Спасибо!

Ответ №1:

Как описано в документации Кафки в KafkaConsumer

Взаимозачеты и Позиция потребителя

Кафка поддерживает числовое смещение для каждой записи в разделе. Это смещение действует как уникальный идентификатор записи в этом разделе, а также обозначает положение потребителя в разделе. Например, потребитель, находящийся в позиции 5, потребил записи со смещениями от 0 до 4 и затем получит запись со смещением 5. На самом деле существует два понятия позиции, имеющих отношение к пользователю потребителя: Позиция потребителя дает смещение следующей записи, которая будет выдана. Это будет на единицу больше, чем максимальное смещение, которое потребитель видел в этом разделе. Он автоматически продвигается каждый раз, когда потребитель получает сообщения при вызове на опрос(продолжительность).

Зафиксированная позиция-это последнее смещение, которое было надежно сохранено. В случае сбоя процесса и перезапуска это смещение, до которого восстановится потребитель. Потребитель может либо периодически автоматически фиксировать смещения; либо он может выбрать управление этой фиксированной позицией вручную, вызвав один из API-интерфейсов фиксации (например, commitSync и commitAsync).

Таким образом, если вам нужно начать с последнего зафиксированного смещения и вы отключили enable.auto.commit его , вы можете вручную зафиксировать смещение вашего сообщения proceesed.

вы можете выбрать управление этой зафиксированной позицией вручную, вызвав один из API-интерфейсов фиксации (например, commitSync и commitAsync).

Затем после перезапуска и перебалансировки Kafka Потребители начнут потреблять с последнего зафиксированного (обработанного) смещения.

Описанный выше сценарий-это когда вы используете хранилище Kafka для смещений потребителей. Если у вас уже есть смещения, которые вы хотите начать использовать, вы можете контролировать начальное смещение потребителя Consumer.Seek() до начала потребления.

Контроль Позиции Потребителя

В большинстве случаев потребитель будет просто потреблять записи от начала до конца, периодически фиксируя свою позицию (автоматически или вручную). Однако Кафка позволяет потребителю вручную контролировать свое положение, перемещаясь вперед или назад в разделе по желанию. Это означает, что потребитель может повторно использовать старые записи или перейти к самым последним записям, фактически не потребляя промежуточные записи. Есть несколько случаев, когда ручное управление положением потребителя может быть полезным.

В одном случае обработка записей с учетом времени может иметь смысл для потребителя, который отстает достаточно далеко, чтобы не пытаться догнать обработку всех записей, а просто перейти к самым последним записям.

Другой вариант использования-для системы, которая поддерживает локальное состояние, как описано в предыдущем разделе. В такой системе потребитель захочет инициализировать свою позицию при запуске для всего, что содержится в локальном магазине. Аналогично, если локальное состояние уничтожено (скажем, из-за потери диска), состояние может быть воссоздано на новой машине путем повторного использования всех данных и воссоздания состояния (при условии, что Кафка сохраняет достаточную историю).

Кафка позволяет указывать позицию с помощью поиска(TopicPartition, long) для указания новой позиции. Также доступны специальные методы поиска самого раннего и последнего смещения, поддерживаемого сервером ( seekToBeginning(Коллекция) и seekToEnd(Коллекция) соответственно).

Ответ №2:

Да, Вы Можете

Для этой цели вам необходимо вручную отслеживать зафиксированное смещение.

  1. Всякий раз, когда разделы повторно вызываются у потребителя, вы должны сохранять разделы и их зафиксированные смещения в БД.
  2. При переназначении разделов потребителю необходимо выполнить поиск по определенному смещению, хранящемуся в вашем хранилище данных.

Ваш слушатель перебалансировки будет прослушивать, когда происходят эти события отзыва и назначения. Пример реализации прослушивателя перебалансировки

 public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
           commitDBTransaction();
    }
    public void onPartitionsAssigned(Collection<TopicPartition>  partitions){
            for(TopicPartition partition: partitions) {
                consumer.seek(partition, getOffsetFromDB(partition));
            }
    }
}