#apache-kafka #kafka-consumer-api
#apache-kafka #kafka-consumer-api
Вопрос:
Мы предполагаем, что у нас есть потребитель, который отправляет запрос на фиксацию смещения 10. Если возникла проблема со связью, и брокер не получил запрос и, конечно, не ответил. После этого у нас есть другой потребитель, обрабатывающий другой пакет и успешно зафиксировавший смещение 20.
В: Я хочу знать, есть ли способ или свойство для обработки, чтобы мы могли проверить, зафиксировано ли предыдущее смещение в журнале или нет, прежде чем фиксировать в нашем случае смещение 20?
Ответ №1:
Сценарий, который вы описываете, может произойти только при использовании асинхронных коммитов.
Имейте в виду, что один конкретный раздел TopicPartition может использоваться только одним потребителем в пределах одной и той же группы потребителей. Если у вас есть два потребителя, читающих одну и ту же TopicPartition, это возможно только
- если у них разные группы потребителей или
- если у них одинаковая группа потребителей и происходит перебалансировка. Но, тем не менее, только один потребитель будет читать эту TopicPartition одновременно, никогда оба параллельно.
Случай № 1 довольно ясен: если у них разные группы потребителей, они используют раздел параллельно и независимо. Также их установленные смещения управляются отдельно.
Случай № 2: если первому потребителю не удается зафиксировать смещение 10, потому что потребитель потерпел неудачу / умер и не восстанавливается, произойдет перебалансировка потребителя, и другой активный потребитель получит этот раздел. Поскольку смещение 10 не было зафиксировано, новый потребитель снова начнет считывать смещение 10 перед переходом к следующему пакету и, возможно, зафиксирует смещение 20. Это приводит к семантике «по крайней мере один раз» и может привести к дублированию.
Теперь перейдем к единственному сценарию, в котором вы могли бы зафиксировать меньшее смещение после фиксации большего смещения. Как было сказано в начале, это действительно может произойти, если вы асинхронно фиксируете смещения (используя commitAsync
). Представьте следующий сценарий, упорядоченный по времени:
- Потребитель считывает смещение 0 (фоновый поток пытается зафиксировать смещение 0)
- успешно зафиксировано смещение 0
- Потребитель считывает смещение 1 (фоновый поток пытается зафиксировать смещение 1)
- ошибка фиксации смещения 1, повторите попытку позже
- Потребитель считывает смещение 2 (фоновый поток пытается зафиксировать смещение 2)
- фиксация смещения 2 выполнена успешно
- Теперь, что делать (повторная попытка фиксации смещения 1?)
Если вы позволите механизму повторной попытки снова зафиксировать смещение 1, похоже, что ваш потребитель зафиксировал только до смещения 1. Это связано с тем, что информация для каждой группы потребителей о последнем параметре смещения TopicPartition хранится во внутренней сжатой теме Kafka __consumer_offsets, которая предназначена для хранения только последнего значения (в нашем случае: смещение 1) для нашей группы потребителей.
В книге «Kafka — The Definitive Guide» есть подсказка о том, как смягчить эту проблему:
Повторение асинхронных коммитов: простой способ получить правильный порядок фиксации для асинхронных попыток — использовать монотонно увеличивающийся порядковый номер. Увеличивайте порядковый номер каждый раз при фиксации и добавляйте порядковый номер во время фиксации к обратному вызову commitAsync. Когда вы будете готовы отправить повторную попытку, проверьте, равен ли порядковый номер фиксации, полученный обратным вызовом, переменной экземпляра; если это так, новой фиксации не было, и повторная попытка безопасна. Если порядковый номер экземпляра выше, не повторяйте попытку, потому что более новая фиксация уже была отправлена.
В качестве примера вы можете увидеть реализацию этой идеи в Scala ниже:
import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._
object AsyncCommitWithCallback extends App {
// define topic
val topic = "myOutputTopic"
// set properties
val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter5")
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
// [set more properties...]
// create KafkaConsumer and subscribe
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJavaCollection)
// initialize global counter
val atomicLong = new AtomicLong(0)
// consume message
try {
while(true) {
val records = consumer.poll(Duration.ofMillis(1)).asScala
if(records.nonEmpty) {
for (data <- records) {
// do something with the records
}
consumer.commitAsync(new KeepOrderAsyncCommit)
}
}
} catch {
case ex: KafkaException => ex.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
class KeepOrderAsyncCommit extends OffsetCommitCallback {
// keeping position of this callback instance
val position = atomicLong.incrementAndGet()
override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
// retrying only if no other commit incremented the global counter
if(exception != null){
if(position == atomicLong.get) {
consumer.commitAsync(this)
}
}
}
}
}