Группа потребителей Кафки стабильна, но не потребляется при сообщении в теме при использовании TestContainer с embeddedZookeeper

#java #spring-boot #apache-kafka #spring-kafka #testcontainers

#Ява #пружинный ботинок #апач-кафка #весна-кафка #тестовые контейнеры

Вопрос:

у нас есть интеграционные тесты на springboot, написанные с использованием TestContainers. Приложение зависит от кафки и имеет различных производителей и потребителей для реализации обработки, управляемой событиями, поэтому использует контейнер со следующей средой:

 @ClassRule  public static final KafkaContainer kafkaContainer= new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"))  .withEnv("KAFKA_MAX_POLL_INTERVAL_MS", "600000")  .withEnv("KAFKA_SESSION_TIMEOUT_MS","660000")  .withEnv("KAFKA_REQUEST_TIMEOUT_MS", "120000")  .withEnv("KAFKA_AUTO_OFFSET_RESET", "earliest")  .withEnv("KAFKA_MAX_POLL_RECORDS", "1")  .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")  .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");  

Пожалуйста, обратите внимание, что каждая реализация потребителя занимает менее минуты для получения сообщения, если слушатель получает запись. Проверка наличия активных потребителей в группе:

 docker exec -it c3a824644d40 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-test-group  

Проверка наличия каких-либо сообщений в теме :

 docker exec -it c3a824644d40 /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testTopic --from-beginning H10283e4c-a1f3-4d78-9926-91d4404bf2f1H9bd9905f-0e7a-4f22-b1eb-892f615f67e6~lt;someMessagegt;  

Итак, в теме есть сообщение, которое было создано предыдущим производителем:

 2021-12-11 12:46:20.723 INFO 20229 --- [ntainer#2-0-C-1] c.n.e.n.s.k.p.TestTopicProducer : {"level":"INFO","log":{"classname":"xxxxx","message":"request: xxxxx Sent SuccessFully on Partition: 0","threadcontext":{},"threadname":"org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1"},"time":"2021-12-11T07:16:20.722Z","timezone":"xxxx","type":"log"}  

но ни один потребительский слушатель не получает это сообщение:

 $ docker exec -it c3a824644d40 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-test-group $   

Мой Слушатель имплицирует:

 public class TestServiceConsumer implements AcknowledgingMessageListenerlt;String, TestPojoRequestgt; { @Autowired private TestService testService;  @Override @KafkaListener(  topicPattern = "testTopic",  containerFactory = "testGroupListenerContainerFactory",  groupId = "my-test-group") public void onMessage(ConsumerRecordlt;String, TestPojoRequestgt; record, Acknowledgment acknowledgment) {  log.info("request: {}. listening To TestPojoRequest Thread ID: {}",  record.value().getRequestOperationId(),  Thread.currentThread().getId());  try {  testService.consumerMyMessage(record.value());  } catch (JDBCConnectionException e) {  log.error("request: {} Thread ID: {} for TestPojoRequest has an Unhandled error:",  record.value().getRequestOperationId(),  Thread.currentThread().getId(), e);  throw new RecoverableDataAccessException(e.getMessage(), e);  } finally {  acknowledgment.acknowledge();  }  } }  

It takes about 4 minutes for the consumer to become active and consume the message after which when checking console:

 2021-12-11 12:50:42.655 INFO 20229 --- [ntainer#3-0-C-1] o.s.k.l.KafkaMessageListenerContainer : my-test-group: partitions revoked: []  2021-12-11 12:50:42.698 INFO 20229 --- [ntainer#3-0-C-1] o.s.k.l.KafkaMessageListenerContainer : my-test-group: partitions assigned: [testTopic-0]  2021-12-11 12:50:42.706 DEBUG 20229 --- [ntainer#3-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records  2021-12-11 12:50:42.708 DEBUG 20229 --- [ntainer#3-0-C-1] o.s.retry.support.RetryTemplate : Retry: count=0  2021-12-11 12:50:42.709 DEBUG 20229 --- [ntainer#3-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=org.springframework.kafka.support.KafkaNull@77d67a19, headers={id=087068d5-dc08-ab1b-cc33-7b92cdb82d16, timestamp=1639206940507}]]    GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT my-test-group consumer-my-test-group-6-ecf3ba58-4577-4b7c-9037-f5cdb7c00717 /172.17.0.1 consumer-my-test-group-6 0 -   GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS my-test-group 457cd1f2680e:9092 (1) range Stable 1  GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my-test-group testTopic 0 - - - - - -  

Running single broker , embedded zookeeper , with single partitions and concurrency set to 1.

My producer config:

 @Bean public Maplt;String, Objectgt; producerConfigs() {  Maplt;String, Objectgt; props = new HashMaplt;gt;();  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceHostWithPort);  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());  props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryHostWithPort);  props.put(ProducerConfig.RETRIES_CONFIG, 5);  props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 5000);  props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);  props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");  props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);  props.put(ProducerConfig.LINGER_MS_CONFIG, 1);  props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);  props.put(ProducerConfig.ACKS_CONFIG, "all");  props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 4194304);  return props; }  

My consumer config:

 public Maplt;String, Objectgt; setCommonConsumerConfigs() {  Maplt;String, Objectgt; propsMap = new HashMaplt;gt;();  propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceHostWithPort);  propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);  propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);  propsMap.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());  propsMap.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryHostWithPort);  propsMap.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);  propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);  propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, (1000 * 60 * 60 * getIntegerValue(maxPollingDurationHours)));  propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");  propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EARLIEST);  propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);  propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);  propsMap.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);  propsMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class.getName());  return propsMap; }   @Bean KafkaListenerContainerFactorylt;ConcurrentMessageListenerContainerlt;String, TestPojoRequestgt;gt; testGroupListenerContainerFactory() {  ConcurrentKafkaListenerContainerFactorylt;String, TestPojoRequestgt; factory = new ConcurrentKafkaListenerContainerFactorylt;gt;();  Maplt;String, Objectgt; propsMap = setCommonConsumerConfigs();  propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "my-test-group");  factory.setConsumerFactory(DefaultKafkaConsumerFactorylt;gt;(propsMap));  factory.setConcurrency(1);  factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);  factory.setAutoStartup(true);  factory.setAckDiscarded(true);  factory.setErrorHandler(((thrownException, consumerRecord) -gt; log.error("request: {} - Exception occurred when processing TestPojoRequest event {} ", ((TestPojoRequest) Objects.requireNonNull(consumerRecord).value()).getRequestOperationId(), consumerRecord, thrownException)));  final Maplt;Classlt;? extends Throwablegt;, Booleangt; exceptionsMap = new HashMaplt;gt;();  exceptionsMap.put(RecoverableDataAccessException.class, true);  factory.setRetryTemplate(getRetryTemplate(exceptionsMap));  addFailureCallback(factory);  return factory; }  

Дополнительно установили конфигурацию во всех потребителях:

 factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);  factory.setAutoStartup(true);  factory.setAckDiscarded(true);  

И среди многих моих групп потребителей по различным темам этот слушатель-единственный, в котором я вижу 4-минутную задержку в получении сообщения. Я вижу , что в журналах zookeper для этой группы происходит некоторое изменение баланса, даже несмотря на то, что мы установили время опроса, интервал сердцебиения и тайм-ауты сеанса на очень высокие значения.

В тестовом примере создается 1 сообщение на тему, и у меня есть 1 группа потребителей на тему. В идеале, если сообщения используются правильно, общий тестовый случай, в котором 6 сообщений в 6 разных темах последовательно используются разными группами, займет всего около 4-5 минут, но из-за задержки(задержка 4 минуты) в потреблении сообщений тест выполняется в течение 9 минут. Нужна помощь, чтобы выяснить, почему перебалансировка разделов и почему сообщение не используется мгновенно и только после 4-минутной задержки.

Комментарии:

1. Вам нужно будет показать свой потребительский код. Вы отключили автоматическую фиксацию, поэтому kafka-consumer-groups не будет отображаться никаких смещений. Также, возможно, связаны, но KAFKA_OFFSETS.TOPIC* не являются допустимой переменной env (нужны все подчеркивания). Максимальные записи опроса и автоматический сброс смещения не являются конфигурациями брокера, следовательно, также не являются настройками брокера

2. Кстати, показанные вами журналы происходят в течение одной и той же минуты

3. @OneCricketeer Я зарегистрировал журналы производителя и потребителя, когда было получено сообщение (2021-12-11 12:46:20.723), и когда слушатель может его получить(2021-12-11 12:50:42.655), это всегда составляет 4 минуты. На самом деле я также запускаю всю настройку в режиме отладки, но все еще не могу понять, почему происходит перебалансировка, так как для их предотвращения были установлены максимальные записи опроса, максимальная сессия, максимальное количество мс опроса , интервал биения сердца.

4. @OneCricketeer я могу обновить код потребителя, а также подтвердить его вручную, и что касается конфигураций, похоже, что это была опечатка, она исправит их

5. В целом я не уверен в продолжительности времени, но, поскольку похоже, что вы передаете запись в какую-то внешнюю службу JDBC, вы пробовали вместо этого использовать приемник Kafka Connect JDBC?