Проверка фиксации смещения потребителя Kafka, чтобы избежать фиксации меньших смещений

#apache-kafka #kafka-consumer-api

#apache-kafka #kafka-consumer-api

Вопрос:

Мы предполагаем, что у нас есть потребитель, который отправляет запрос на фиксацию смещения 10. Если возникла проблема со связью, и брокер не получил запрос и, конечно, не ответил. После этого у нас есть другой потребитель, обрабатывающий другой пакет и успешно зафиксировавший смещение 20.

В: Я хочу знать, есть ли способ или свойство для обработки, чтобы мы могли проверить, зафиксировано ли предыдущее смещение в журнале или нет, прежде чем фиксировать в нашем случае смещение 20?

Ответ №1:

Сценарий, который вы описываете, может произойти только при использовании асинхронных коммитов.

Имейте в виду, что один конкретный раздел TopicPartition может использоваться только одним потребителем в пределах одной и той же группы потребителей. Если у вас есть два потребителя, читающих одну и ту же TopicPartition, это возможно только

  1. если у них разные группы потребителей или
  2. если у них одинаковая группа потребителей и происходит перебалансировка. Но, тем не менее, только один потребитель будет читать эту TopicPartition одновременно, никогда оба параллельно.

Случай № 1 довольно ясен: если у них разные группы потребителей, они используют раздел параллельно и независимо. Также их установленные смещения управляются отдельно.

Случай № 2: если первому потребителю не удается зафиксировать смещение 10, потому что потребитель потерпел неудачу / умер и не восстанавливается, произойдет перебалансировка потребителя, и другой активный потребитель получит этот раздел. Поскольку смещение 10 не было зафиксировано, новый потребитель снова начнет считывать смещение 10 перед переходом к следующему пакету и, возможно, зафиксирует смещение 20. Это приводит к семантике «по крайней мере один раз» и может привести к дублированию.

Теперь перейдем к единственному сценарию, в котором вы могли бы зафиксировать меньшее смещение после фиксации большего смещения. Как было сказано в начале, это действительно может произойти, если вы асинхронно фиксируете смещения (используя commitAsync ). Представьте следующий сценарий, упорядоченный по времени:

  • Потребитель считывает смещение 0 (фоновый поток пытается зафиксировать смещение 0)
  • успешно зафиксировано смещение 0
  • Потребитель считывает смещение 1 (фоновый поток пытается зафиксировать смещение 1)
  • ошибка фиксации смещения 1, повторите попытку позже
  • Потребитель считывает смещение 2 (фоновый поток пытается зафиксировать смещение 2)
  • фиксация смещения 2 выполнена успешно
  • Теперь, что делать (повторная попытка фиксации смещения 1?)

Если вы позволите механизму повторной попытки снова зафиксировать смещение 1, похоже, что ваш потребитель зафиксировал только до смещения 1. Это связано с тем, что информация для каждой группы потребителей о последнем параметре смещения TopicPartition хранится во внутренней сжатой теме Kafka __consumer_offsets, которая предназначена для хранения только последнего значения (в нашем случае: смещение 1) для нашей группы потребителей.

В книге «Kafka — The Definitive Guide» есть подсказка о том, как смягчить эту проблему:

Повторение асинхронных коммитов: простой способ получить правильный порядок фиксации для асинхронных попыток — использовать монотонно увеличивающийся порядковый номер. Увеличивайте порядковый номер каждый раз при фиксации и добавляйте порядковый номер во время фиксации к обратному вызову commitAsync. Когда вы будете готовы отправить повторную попытку, проверьте, равен ли порядковый номер фиксации, полученный обратным вызовом, переменной экземпляра; если это так, новой фиксации не было, и повторная попытка безопасна. Если порядковый номер экземпляра выше, не повторяйте попытку, потому что более новая фиксация уже была отправлена.

В качестве примера вы можете увидеть реализацию этой идеи в Scala ниже:

 import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._

object AsyncCommitWithCallback extends App {

  // define topic
  val topic = "myOutputTopic"

  // set properties
  val props = new Properties()
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  // [set more properties...]
  

  // create KafkaConsumer and subscribe
  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(List(topic).asJavaCollection)

  // initialize global counter
  val atomicLong = new AtomicLong(0)

  // consume message
  try {
    while(true) {
      val records = consumer.poll(Duration.ofMillis(1)).asScala

      if(records.nonEmpty) {
        for (data <- records) {
          // do something with the records
        }
        consumer.commitAsync(new KeepOrderAsyncCommit)
      }

    }
  } catch {
    case ex: KafkaException => ex.printStackTrace()
  } finally {
    consumer.commitSync()
    consumer.close()
  }


  class KeepOrderAsyncCommit extends OffsetCommitCallback {
    // keeping position of this callback instance
    val position = atomicLong.incrementAndGet()

    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
      // retrying only if no other commit incremented the global counter
      if(exception != null){
        if(position == atomicLong.get) {
          consumer.commitAsync(this)
        }
      }
    }
  }

}