Как обработать сбой публикации kafka надежным способом

#java #cassandra #redis #apache-kafka #kafka-consumer-api

#java #cassandra #redis #apache-kafka #kafka-consumer-api

Вопрос:

Я использую Kafka, и у нас есть вариант использования для создания отказоустойчивой системы, в которой не должно быть пропущено ни одного сообщения. Итак, вот проблема: если публикация в Kafka завершается неудачей по какой-либо причине (не работает ZooKeeper, Kafka broker и т.д.), Как мы можем надежно обрабатывать эти сообщения и воспроизводить их, как только все снова будет восстановлено. Опять же, как я уже говорил, мы не можем позволить себе сбой даже одного сообщения. Другой вариант использования заключается в том, что нам также необходимо знать в любой данный момент времени, сколько сообщений не удалось опубликовать в Kafka по какой-либо причине, т. Е. что-то вроде функции счетчика, и теперь эти сообщения необходимо повторно опубликовать снова.

Одним из решений является отправка этих сообщений в какую-либо базу данных (например, Cassandra, где записи выполняются очень быстро, но нам также нужна функциональность счетчика, и я предполагаю, что функциональность счетчика Cassandra не так уж велика, и мы не хотим это использовать.), которая может обрабатывать такого рода нагрузку, а также предоставлять нам средство счетчика, которое является очень точным.

Этот вопрос больше с точки зрения архитектуры, а затем какую технологию использовать, чтобы это произошло.

PS: Мы обрабатываем некоторые из них, например, 3000TPS. Таким образом, при сбое запуска системы эти сообщения с ошибкой могут расти очень быстро за очень короткое время. Мы используем фреймворки на основе Java.

Спасибо за вашу помощь!

Комментарии:

1. Привет @ Nishant, ты нашел «решение»? Хотите поделиться с сообществом? Заранее спасибо.

2. Возможно, вам нужна база данных только для добавления, например timescaledb или influxdb. Для них 3 тыс. событий в секунду не имеет большого значения.

3. Я мало что знаю об этой теме, но, кажется, проще сделать это с помощью подхода вытягивания, а не выталкивания. Таким образом, вы можете добавить веб-сервис на стороне отправителя, и вы можете опросить веб-сервис со стороны получателя. Таким образом, получатель будет отвечать за получение сообщения, а не отправитель или другой компонент в середине будет отвечать за доставку его всем получателям, ведение списка получателей, повторные попытки и т.д… Но я думаю, что это не всегда вариант, потому что это недостаточно быстро или, возможно, по другим причинам, о которых я не знаю.

Ответ №1:

Причина, по которой Kafka был создан распределенным, отказоустойчивым способом, заключается в том, что для решения проблем, точно подобных вашей, множественные сбои основных компонентов должны избегать прерываний обслуживания. Чтобы избежать сбоя Zookeeper, разверните как минимум 3 экземпляра Zookeepers (если это в AWS, разверните их в зонах доступности). Чтобы избежать сбоев брокера, разверните несколько брокеров и убедитесь, что вы указали несколько брокеров в своем свойстве producer bootstrap.servers . Чтобы убедиться, что кластер Kafka записал ваше сообщение в надежное хранилище, убедитесь, что acks=all свойство установлено в производителе. Это подтвердит запись клиента, когда все синхронизированные реплики подтвердят получение сообщения (за счет пропускной способности). Вы также можете установить ограничения на очередь, чтобы гарантировать, что если записи в брокер начнут резервное копирование, вы сможете перехватить исключение, обработать его и, возможно, повторить попытку.

Использование Cassandra (другой хорошо продуманной распределенной отказоустойчивой системы) для «stage» ваших записей, похоже, не добавляет надежности вашей архитектуре, но увеличивает сложность, плюс Cassandra не была написана как очередь сообщений для очереди сообщений, я бы этого избежал.

Правильно настроенный Kafka должен быть доступен для обработки всех записей вашего сообщения и предоставлять подходящие гарантии.

Комментарии:

1. Спасибо, Крис! Я понимаю, что Kafka был разработан таким образом, чтобы справиться с такой ситуацией, но приводить это в качестве аргумента, чтобы сказать, что все всегда будет работать так, как должно, — это немного смелое заявление, и для меня оно рано или поздно обречено на провал. Просто чтобы привести вам пример, как, даже если у вас достаточно экземпляров broker и zookeeper, ситуация все равно может выйти из-под контроля. Например: если в одной теме есть 3 реплики и значение min.insync.replicas равно 2, т.е. запись в broker будет успешной только тогда, когда 2 из 3 реплик синхронизированы. Теперь, в этом случае, если реплики не синхронизированы, они не примут новый запрос.

2. @Coder это может быть полезный блог о том, как убедиться, что ваш кластер правильно настроен, чтобы помочь сохранить ваши запаздывающие реплики в качестве членов ISR: confluent.io/blog /…

3. Спасибо @Chris, это полезно!

4. как насчет сбоя сети, и kafka становится недоступной.

5. @Lovin этого можно избежать, развернув по крайней мере в трех зонах доступности

Ответ №2:

Я очень опаздываю на вечеринку. Но я вижу, что в приведенных выше ответах чего-то не хватает 🙂

Стратегия выбора некоторой распределенной системы, такой как Cassandra, является достойной идеей. Как только Kafka будет запущена и станет нормальной, вы можете повторить все сообщения, которые были записаны в это.

Я хотел бы ответить со стороны «зная, сколько сообщений не удалось опубликовать в данный момент времени»

Из тегов я вижу, что вы используете apache-kafka и kafka-consumer-api .Вы можете написать пользовательский обратный вызов для вашего производителя, и этот обратный вызов может сообщить вам, произошел сбой сообщения или оно было успешно опубликовано. При сбое запишите метаданные для сообщения.

Теперь вы можете использовать инструменты анализа журналов для анализа своих сбоев. Одним из таких достойных инструментов является Splunk.

Ниже приведен небольшой фрагмент кода, который может лучше объяснить обратный вызов, о котором я говорил:

 public class ProduceToKafka {

  private ProducerRecord<String, String> message = null;

 // TracerBulletProducer class has producer properties
  private KafkaProducer<String, String> myProducer = TracerBulletProducer
      .createProducer();

  public void publishMessage(String string) {

    ProducerRecord<String, String> message = new ProducerRecord<>(
        "topicName", string);

    myProducer.send(message, new MyCallback(message.key(), message.value()));
  }

  class MyCallback implements Callback {

    private final String key;
    private final String value;

    public MyCallback(String key, String value) {
      this.key = key;
      this.value = value;
    }


    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
      if (exception == null) {
        log.info("--------> All good !!");
      } else {
        log.info("--------> not so good  !!");
        log.info(metadata.toString());
        log.info(""   metadata.serializedValueSize());
        log.info(exception.getMessage());

      }
    }
  }

}
  

Если вы проанализируете количество "--------> not so good !!" журналов за единицу времени, вы можете получить необходимую информацию.

Боже, скорость!

Комментарии:

1. Я думаю, что в строке if (exception != null) { нужно сказать if (exception == null) {

Ответ №3:

Крис уже рассказывал о том, как поддерживать отказоустойчивость системы.

Kafka по умолчанию поддерживает at-least once семантику доставки сообщений, это означает, что при попытке отправить сообщение что-то происходит, оно попытается отправить его повторно.

При создании Kafka Producer свойств вы можете настроить это, установив retries параметр больше 0.

  Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:4242");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
  

Для получения дополнительной информации проверьте это.

Комментарии:

1. Спасибо @Shankar. По сути, существует два вида сбоев, которые могут быть повторены и не могут быть повторены. Это свойство для повторной попытки полезно только при повторяемом сбое. Например, при ошибке брокера, когда лидер отключился, а zooKeeper занят назначением нового лидера и т.д. Такие виды сбоев могут быть повторены, и вышеуказанное свойство будет работать. Но если существует непроверяемое значение, то независимо от того, насколько выше мы установили это свойство, оно не будет работать. Хотя спасибо за информацию!

2. @Coder : Спасибо за информацию.. не могли бы вы, пожалуйста, сообщить мне, что это за неисправимые сбои?