#java #spring-boot #apache-kafka #spring-kafka #testcontainers
#Ява #пружинный ботинок #апач-кафка #весна-кафка #тестовые контейнеры
Вопрос:
у нас есть интеграционные тесты на springboot, написанные с использованием TestContainers. Приложение зависит от кафки и имеет различных производителей и потребителей для реализации обработки, управляемой событиями, поэтому использует контейнер со следующей средой:
@ClassRule public static final KafkaContainer kafkaContainer= new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")) .withEnv("KAFKA_MAX_POLL_INTERVAL_MS", "600000") .withEnv("KAFKA_SESSION_TIMEOUT_MS","660000") .withEnv("KAFKA_REQUEST_TIMEOUT_MS", "120000") .withEnv("KAFKA_AUTO_OFFSET_RESET", "earliest") .withEnv("KAFKA_MAX_POLL_RECORDS", "1") .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1") .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
Пожалуйста, обратите внимание, что каждая реализация потребителя занимает менее минуты для получения сообщения, если слушатель получает запись. Проверка наличия активных потребителей в группе:
docker exec -it c3a824644d40 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-test-group
Проверка наличия каких-либо сообщений в теме :
docker exec -it c3a824644d40 /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testTopic --from-beginning H10283e4c-a1f3-4d78-9926-91d4404bf2f1H9bd9905f-0e7a-4f22-b1eb-892f615f67e6~lt;someMessagegt;
Итак, в теме есть сообщение, которое было создано предыдущим производителем:
2021-12-11 12:46:20.723 INFO 20229 --- [ntainer#2-0-C-1] c.n.e.n.s.k.p.TestTopicProducer : {"level":"INFO","log":{"classname":"xxxxx","message":"request: xxxxx Sent SuccessFully on Partition: 0","threadcontext":{},"threadname":"org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1"},"time":"2021-12-11T07:16:20.722Z","timezone":"xxxx","type":"log"}
но ни один потребительский слушатель не получает это сообщение:
$ docker exec -it c3a824644d40 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-test-group $
Мой Слушатель имплицирует:
public class TestServiceConsumer implements AcknowledgingMessageListenerlt;String, TestPojoRequestgt; { @Autowired private TestService testService; @Override @KafkaListener( topicPattern = "testTopic", containerFactory = "testGroupListenerContainerFactory", groupId = "my-test-group") public void onMessage(ConsumerRecordlt;String, TestPojoRequestgt; record, Acknowledgment acknowledgment) { log.info("request: {}. listening To TestPojoRequest Thread ID: {}", record.value().getRequestOperationId(), Thread.currentThread().getId()); try { testService.consumerMyMessage(record.value()); } catch (JDBCConnectionException e) { log.error("request: {} Thread ID: {} for TestPojoRequest has an Unhandled error:", record.value().getRequestOperationId(), Thread.currentThread().getId(), e); throw new RecoverableDataAccessException(e.getMessage(), e); } finally { acknowledgment.acknowledge(); } } }
It takes about 4 minutes for the consumer to become active and consume the message after which when checking console:
2021-12-11 12:50:42.655 INFO 20229 --- [ntainer#3-0-C-1] o.s.k.l.KafkaMessageListenerContainer : my-test-group: partitions revoked: [] 2021-12-11 12:50:42.698 INFO 20229 --- [ntainer#3-0-C-1] o.s.k.l.KafkaMessageListenerContainer : my-test-group: partitions assigned: [testTopic-0] 2021-12-11 12:50:42.706 DEBUG 20229 --- [ntainer#3-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records 2021-12-11 12:50:42.708 DEBUG 20229 --- [ntainer#3-0-C-1] o.s.retry.support.RetryTemplate : Retry: count=0 2021-12-11 12:50:42.709 DEBUG 20229 --- [ntainer#3-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=org.springframework.kafka.support.KafkaNull@77d67a19, headers={id=087068d5-dc08-ab1b-cc33-7b92cdb82d16, timestamp=1639206940507}]] GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT my-test-group consumer-my-test-group-6-ecf3ba58-4577-4b7c-9037-f5cdb7c00717 /172.17.0.1 consumer-my-test-group-6 0 - GROUP COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS my-test-group 457cd1f2680e:9092 (1) range Stable 1 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my-test-group testTopic 0 - - - - - -
Running single broker , embedded zookeeper , with single partitions and concurrency set to 1.
My producer config:
@Bean public Maplt;String, Objectgt; producerConfigs() { Maplt;String, Objectgt; props = new HashMaplt;gt;(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceHostWithPort); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName()); props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryHostWithPort); props.put(ProducerConfig.RETRIES_CONFIG, 5); props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 5000); props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 4194304); return props; }
My consumer config:
public Maplt;String, Objectgt; setCommonConsumerConfigs() { Maplt;String, Objectgt; propsMap = new HashMaplt;gt;(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceHostWithPort); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); propsMap.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName()); propsMap.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryHostWithPort); propsMap.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, (1000 * 60 * 60 * getIntegerValue(maxPollingDurationHours))); propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EARLIEST); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class); propsMap.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class); propsMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class.getName()); return propsMap; } @Bean KafkaListenerContainerFactorylt;ConcurrentMessageListenerContainerlt;String, TestPojoRequestgt;gt; testGroupListenerContainerFactory() { ConcurrentKafkaListenerContainerFactorylt;String, TestPojoRequestgt; factory = new ConcurrentKafkaListenerContainerFactorylt;gt;(); Maplt;String, Objectgt; propsMap = setCommonConsumerConfigs(); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "my-test-group"); factory.setConsumerFactory(DefaultKafkaConsumerFactorylt;gt;(propsMap)); factory.setConcurrency(1); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.setAutoStartup(true); factory.setAckDiscarded(true); factory.setErrorHandler(((thrownException, consumerRecord) -gt; log.error("request: {} - Exception occurred when processing TestPojoRequest event {} ", ((TestPojoRequest) Objects.requireNonNull(consumerRecord).value()).getRequestOperationId(), consumerRecord, thrownException))); final Maplt;Classlt;? extends Throwablegt;, Booleangt; exceptionsMap = new HashMaplt;gt;(); exceptionsMap.put(RecoverableDataAccessException.class, true); factory.setRetryTemplate(getRetryTemplate(exceptionsMap)); addFailureCallback(factory); return factory; }
Дополнительно установили конфигурацию во всех потребителях:
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.setAutoStartup(true); factory.setAckDiscarded(true);
И среди многих моих групп потребителей по различным темам этот слушатель-единственный, в котором я вижу 4-минутную задержку в получении сообщения. Я вижу , что в журналах zookeper для этой группы происходит некоторое изменение баланса, даже несмотря на то, что мы установили время опроса, интервал сердцебиения и тайм-ауты сеанса на очень высокие значения.
В тестовом примере создается 1 сообщение на тему, и у меня есть 1 группа потребителей на тему. В идеале, если сообщения используются правильно, общий тестовый случай, в котором 6 сообщений в 6 разных темах последовательно используются разными группами, займет всего около 4-5 минут, но из-за задержки(задержка 4 минуты) в потреблении сообщений тест выполняется в течение 9 минут. Нужна помощь, чтобы выяснить, почему перебалансировка разделов и почему сообщение не используется мгновенно и только после 4-минутной задержки.
Комментарии:
1. Вам нужно будет показать свой потребительский код. Вы отключили автоматическую фиксацию, поэтому
kafka-consumer-groups
не будет отображаться никаких смещений. Также, возможно, связаны, ноKAFKA_OFFSETS.TOPIC*
не являются допустимой переменной env (нужны все подчеркивания). Максимальные записи опроса и автоматический сброс смещения не являются конфигурациями брокера, следовательно, также не являются настройками брокера2. Кстати, показанные вами журналы происходят в течение одной и той же минуты
3. @OneCricketeer Я зарегистрировал журналы производителя и потребителя, когда было получено сообщение (2021-12-11 12:46:20.723), и когда слушатель может его получить(2021-12-11 12:50:42.655), это всегда составляет 4 минуты. На самом деле я также запускаю всю настройку в режиме отладки, но все еще не могу понять, почему происходит перебалансировка, так как для их предотвращения были установлены максимальные записи опроса, максимальная сессия, максимальное количество мс опроса , интервал биения сердца.
4. @OneCricketeer я могу обновить код потребителя, а также подтвердить его вручную, и что касается конфигураций, похоже, что это была опечатка, она исправит их
5. В целом я не уверен в продолжительности времени, но, поскольку похоже, что вы передаете запись в какую-то внешнюю службу JDBC, вы пробовали вместо этого использовать приемник Kafka Connect JDBC?