poll() и commitAsync() в Spring-Kafka

#spring-boot #apache-kafka #spring-kafka

#весенняя загрузка #apache-kafka #spring-кафка

Вопрос:

Я пытаюсь написать потребительское приложение kafka на Java на платформе Springboot. Ранее я писал код на обычном java, но теперь конвертирую в spring-kafka, поскольку это может дать некоторое преимущество перед обычной java. У меня есть несколько вопросов, которые я пытаюсь понять.

  • Кажется, мне не нужно явно выполнять цикл poll() в spring-kafka, и он будет автоматически обрабатываться @KafkaListener?

  • Я установил enable.auto.commit=’false’, так как мне нужно выполнить некоторую обработку перед фиксацией смещений, как я могу выполнить commitAsync() в Spring-Kafka?

    ConsumerConfig.java :

     @EnableKafka
    @Configuration
    public class KafkaConsumerConfig {
    
        @Value("${app.kafka_brokers}")
        private String KAFKA_BROKERS;
    
        @Value("${app.topic}")
        private String KAFKA_TOPIC;
    
        @Value("${app.group_id_config}")
        private String GROUP_ID_CONFIG;
    
        @Value("${app.schema_registry_url}")
        private String SCHEMA_REGISTRY_URL;
    
        @Value("${app.offset_reset}")
        private String OFFSET_RESET;
    
        @Value("${app.max_poll_records}")
        private String MAX_POLL_RECORDS;
    
        @Value("${app.security.protocol}")
        private String SSL_PROTOCOL;
    
        @Value("${app.ssl.truststore.password}")
        private String SSL_TRUSTSTORE_SECURE;
    
        @Value("${app.ssl.keystore.password}")
        private String SSL_KEYSTORE_SECURE;
    
        @Value("${app.ssl.key.password}")
        private String SSL_KEY_SECURE;
    
        @Value("${app.ssl.truststore.location}")
        private String SSL_TRUSTSTORE_LOCATION_FILE_NAME;
    
        @Value("${app.ssl.keystore.location}")
        private String SSL_KEYSTORE_LOCATION_FILE_NAME;
    
        @Bean
        public ConsumerFactory<String, String> consumerFactory(){
    
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
            props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
            props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
            props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
            props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
    
            return new DefaultKafkaConsumerFactory<>(props);
    
        }
    
        @Bean
        ConcurrentKafkaListenerContainerFactory<String, String> 
        kafkaListenerContainerFactory() {
    
          ConcurrentKafkaListenerContainerFactory<String, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
          factory.setConsumerFactory(consumerFactory());
          factory.setConcurrency(3);
          return factory;
      }
    
    }
      

KafkaConsumer.java :

 @Component
public class KafkaConsumer {
    
    @KafkaListener(topics = "topic", groupId = "group")
    public void run(ConsumerRecord<String, GenericRecord> record) {
        
        System.out.println(record);
        
    <-- how to asyncCommit()--> 
    }

}
  

Ответ №1:

Прежде всего, я предлагаю вам использовать свойства и автоконфигурацию, установленные Spring kafka, вместо того, чтобы создавать свои собственные, поскольку они следуют принципу DRY: не повторяйтесь.

 spring:
  kafka:
    bootstrap-servers: ${app.kafka_brokers}
    consumer:
      auto-offset-reset: ${app.offset_reset}
      enable-auto-commit: false   // <---- disable auto committing
    ssl:
      protocol: ${app.security.protocol}
      key-store-location: ${app.ssl.keystore.location}
      key-store-password:  ${app.ssl.keystore.password}
      trust-store-location: ${app.ssl.truststore.location}
      trust-store-password: ${app.ssl.truststore.password}
  // And other properties
    listener:
      ack-mode: manual // This is what you need
  

AckMode Документы: https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties .AckMode.html

По сути, manual это асинхронное подтверждение, в то время manual_immediate как является синхронным.

Затем внутри вашего @KafkaListener компонента вы можете ввести org.springframework.kafka.support.Acknowledgment объект, подтверждающий ваше сообщение.

 @Component
public class KafkaConsumer {
    
    @KafkaListener(topics = "topic", groupId = "group")
    public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgment) {
        
        System.out.println(record);
        
        acknowledgment.acknowledge();
    }

}
  

Вот документация о том, что можно внедрить в @KafkaListener метод: https://docs.spring.io/spring-kafka/reference/html/#message-listeners

Ответ №2:

Контейнер-прослушиватель зафиксирует смещение при обычном завершении прослушивания, в зависимости от свойства контейнера AckMode ; AckMode.BATCH (по умолчанию) означает, что смещения для всех записей, возвращенных опросом, будут зафиксированы после того, как все они будут обработаны, AckMode.RECORD что означает, что каждое смещение будет зафиксировано, как только слушатель завершит работу.

sync Vs. async управляется свойством syncCommits контейнера.