Транзакционный производитель против просто идемпотентного производителя Java (исключение OutOfOrderSequenceException)

#java #apache-kafka #kafka-producer-api #spring-kafka

#java #apache-kafka #kafka-producer-api #spring-kafka

Вопрос:

Я использую spring-kafka с конфигурацией идемпотентного производителя:

это мои реквизиты конфигурации:

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(",").join(appProps.getBrokers()));
  
     //configure the following three settings for SSL Encryption
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, appProps.getJksLocation());
    props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  appProps.getJksPassword());
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
    props.put(ProducerConfig.RETRIES_CONFIG, 5);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
  

Мой производитель kafka выдает исключение OutOfOrderSequenceException:

2019-03-06 21:25:47 Отправитель [ОШИБКА] [Producer ClientID=producer-1] Брокер вернул org.apache.kafka.common.errors.Исключение OutOfOrderSequenceException: Брокер получил порядковый номер не по порядку для темы-раздела topic-1 со смещением -1. Это указывает на потерю данных у брокера и должно быть расследовано. 2019-03-06 21:25:47 TransactionManager [INFO] [Producer ClientID=producer-1] ProducerId установлен в -1 с epoch-1 2019-03-06 21:25:47 ProducerKafka [ОШИБКА] мы столкнулись с ошибкой при отправке в kafka, пожалуйста, повторите задание

Я не уверен, почему генерируется это исключение. Я не смог найти конкретного ответа на это. Официальный javadoc для исключения гласит следующее:

Это исключение указывает на то, что брокер получил неожиданный порядковый номер от производителя, что означает, что данные, возможно, были потеряны. Если производитель настроен только на идемпотентность (т. Е. Если enable.idempotence установлен и нет transactional.id настроен), можно продолжить отправку с тем же экземпляром producer, но это чревато изменением порядка отправленных записей. Для транзакционных производителей это неустранимая ошибка, и вам следует закрыть производителя.

Означает ли это, что мне нужно использовать транзакционный производитель, чтобы избежать этой проблемы?

В документе KafkaProducer указано то, что делает приведенное выше утверждение неоднозначным: https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

Чтобы включить идемпотентность, для конфигурации enable.idempotence должно быть установлено значение true. Если задано, конфигурация повторных попыток по умолчанию будет иметь значение Integer.MAX_VALUE, конфигурация max.in.flight.requests.per.connection по умолчанию будет равна 1, а конфигурация acks по умолчанию будет равна all. Для идемпотентного производителя нет изменений в API, поэтому существующие приложения не нужно будет модифицировать, чтобы воспользоваться преимуществами этой функции.

Чтобы воспользоваться преимуществами идемпотентного производителя, крайне важно избегать повторных отправок на уровне приложения, поскольку они не могут быть де-дублированы. Таким образом, если приложение включает идемпотентность, рекомендуется оставить конфигурацию повторных попыток отключенной, поскольку по умолчанию она будет иметь значение Integer .МАКСИМАЛЬНОЕ ЗНАЧЕНИЕ. Кроме того, если отправка (ProducerRecord) возвращает ошибку даже при бесконечных повторных попытках (например, если срок действия сообщения истекает в буфере перед отправкой), тогда рекомендуется завершить работу производителя и проверить содержимое последнего созданного сообщения, чтобы убедиться, что оно не дублируется. Наконец, производитель может гарантировать идемпотентность только для сообщений, отправленных в течение одного сеанса.

В приведенном выше заявлении четко указано, что все, что мне нужно для идемпотентного производителя, это просто использовать enable.idempotence свойство. Однако в исключении указано, что я должен использовать это transactional.id свойство.

Каков правильный способ создать идемпотентный асинхронный производитель без необходимости иметь дело с фатальным OutOfOrderSequenceException .

Ответ №1:

если вы явно задаете повторные попытки, то вы должны установить

 max.in.flight.requests.per.connection=1
  

чтобы избежать проблемы с outoforder

в документе очень четко указано, что:

Установка значения, большего нуля, приведет к повторной отправке клиентом любой записи, отправка которой завершается ошибкой с потенциально временной ошибкой. Обратите внимание, что эта повторная попытка ничем не отличается от повторной отправки записи клиентом при получении ошибки. Разрешение повторных попыток без установки значения MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION равным 1 потенциально изменит порядок записей, потому что, если два пакета отправляются в один раздел, и первый завершается неудачей и повторяется, но второй выполняется успешно, тогда записи во втором пакете могут появиться первыми.

Комментарии:

1. Однако в документах также указано, что если у вас есть enable.idempotence = true и acks = all, вы можете разрешить значение от 1 до 5 для max.in.flight.requests.per.connection и по-прежнему сохранять порядок при повторных попытках > 0.

Ответ №2:

Мне кажется, это совершенно ясно; из вашей второй цитаты…

Чтобы воспользоваться преимуществами идемпотентного производителя, крайне важно избегать повторных отправок на уровне приложения, поскольку они не могут быть де-дублированы. Таким образом, если приложение включает идемпотентность, рекомендуется оставить конфигурацию повторных попыток отключенной, поскольку по умолчанию она будет иметь значение Integer .МАКСИМАЛЬНОЕ ЗНАЧЕНИЕ.

И у вас есть

 props.put(ProducerConfig.RETRIES_CONFIG, 5);