Использование живых сообщений в Kafka

#java #apache-kafka #kafka-consumer-api #kafka-producer-api

#java #apache-kafka #kafka-consumer-api #kafka-producer-api

Вопрос:

Я запустил свой zookeeper и сервер Kafka. Я запустил свой Kafka producer, который отправляет 10 сообщений с темой «xxx». Затем остановил мой производитель Kafka. Теперь я запустил своего потребителя Kafka и подписался с темой «xxx». Мой потребитель использует эти 10 сообщений, отправленных моим производителем Kafka, который сейчас не запущен. Мне нужно, чтобы мой потребитель Kafka должен получать сообщения только с запущенного сервера Kafka. Есть ли какой-нибудь способ добиться этого? Следующие вещи в моих потребительских свойствах.

 props.put("bootstrap.servers", "localhost:9092");
    String consumeGroup = "cg1";
    props.put("group.id", consumeGroup);
    props.put("enable.auto.commit", "true");
    props.put("auto.offset.reset", "earliest");
    props.put("auto.commit.interval.ms", "100");
    props.put("heartbeat.interval.ms", "3000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  

Комментарии:

1. Вы можете отбросить все ожидающие сообщения и принимать только те, которые поступают после запуска вашей программы. Kafka предназначен для сохранения сообщений (вы используете Kafka, потому что вам нужна эта функция), если вам нужны только непродолжительные сообщения, любое другое решение для обмена сообщениями может быть лучшим выбором.

2. Есть ли какой-либо флаг для идентификации этих сообщений, которые ждали долгое время?

3. Удалите это свойство props.put("auto.offset.reset", "earliest"); , оно такое же, как ConsumerConfig . AUTO_OFFSET_RESET_CONFIG. Сохраняйте только последнее свойство.

4. Да, «auto.offset.reset» совпадает с ConsumerConfig. AUTO_OFFSET_RESET_CONFIG. Я сделал это. Теперь мои свойства содержат «auto.offset.reset», «latest». Все та же проблема существует.

5. Мой конечный потребительский реквизит: bootstrap.servers= localhost:9092 group.id = cg1 enable.auto.commit= true auto.offset.reset= последняя версия auto.commit.interval.ms = 100 сердцебиений.интервал.ms = 3000 сеансов.тайм-аут.ms = 30000 ключ. десериализатор = org.apache. kafka.common.serialization. Значение StringDeserializer. десериализатор = org.apache. kafka.common.serialization. ByteArrayDeserializer

Ответ №1:

Задайте следующее свойство :

 consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  

Он сообщает потребителю читать только последние сообщения, то есть сообщения, которые были опубликованы после запуска потребителя.

Комментарии:

1. Используется предоставленное свойство для моего потребителя. Все еще я получаю данные неживого производителя.

2. Какую версию API kafka вы используете?

3. Я использовал следующие зависимости. Также я обновил свои потребительские свойства в своем вопросе. <зависимость> <Идентификатор группы> ch.qos.logback</groupId> <artifactId>logback-классический</artifactId> <версия>1.0.13</версия> </dependency> <зависимость> <groupId>org.apache. kafka</groupId> <artifactId>kafka-клиенты</artifactId> <версия> 0.9.0.0</ версия> </ зависимость> <зависимость> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <версия> 1.7.7</версия> </зависимость>

Ответ №2:

Пожалуйста, создайте новую тему и сохраните свойство ConsumerConfig.AUTO_OFFSET_RESET_CONFIG в «последнюю версию». убедитесь, что не зафиксировано смещение. т.е. Мы не должны использовать commitSync()

По умолчанию получатели начинают использовать записи с последнего зафиксированного смещения каждого назначенного раздела. Если зафиксированное смещение недоступно, стратегия сброса смещения ConsumerConfig#AUTO_OFFSET_RESET_CONFIG, настроенная для KafkaConsumer, используется для установки начального смещения на самое раннее или последнее смещение в разделе.

Я думаю, что в вашем случае вы фиксируете смещение или есть смещение фиксации, доступное для данной темы.