#spring-boot #apache-kafka #spring-kafka
#весенняя загрузка #apache-kafka #spring-кафка
Вопрос:
Я пытаюсь написать потребительское приложение kafka на Java на платформе Springboot. Ранее я писал код на обычном java, но теперь конвертирую в spring-kafka, поскольку это может дать некоторое преимущество перед обычной java. У меня есть несколько вопросов, которые я пытаюсь понять.
-
Кажется, мне не нужно явно выполнять цикл poll() в spring-kafka, и он будет автоматически обрабатываться @KafkaListener?
-
Я установил enable.auto.commit=’false’, так как мне нужно выполнить некоторую обработку перед фиксацией смещений, как я могу выполнить commitAsync() в Spring-Kafka?
ConsumerConfig.java :
@EnableKafka @Configuration public class KafkaConsumerConfig { @Value("${app.kafka_brokers}") private String KAFKA_BROKERS; @Value("${app.topic}") private String KAFKA_TOPIC; @Value("${app.group_id_config}") private String GROUP_ID_CONFIG; @Value("${app.schema_registry_url}") private String SCHEMA_REGISTRY_URL; @Value("${app.offset_reset}") private String OFFSET_RESET; @Value("${app.max_poll_records}") private String MAX_POLL_RECORDS; @Value("${app.security.protocol}") private String SSL_PROTOCOL; @Value("${app.ssl.truststore.password}") private String SSL_TRUSTSTORE_SECURE; @Value("${app.ssl.keystore.password}") private String SSL_KEYSTORE_SECURE; @Value("${app.ssl.key.password}") private String SSL_KEY_SECURE; @Value("${app.ssl.truststore.location}") private String SSL_TRUSTSTORE_LOCATION_FILE_NAME; @Value("${app.ssl.keystore.location}") private String SSL_KEYSTORE_LOCATION_FILE_NAME; @Bean public ConsumerFactory<String, String> consumerFactory(){ Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG); props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE); props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME); props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE); props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE); return new DefaultKafkaConsumerFactory<>(props); } @Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); return factory; } }
KafkaConsumer.java :
@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic", groupId = "group")
public void run(ConsumerRecord<String, GenericRecord> record) {
System.out.println(record);
<-- how to asyncCommit()-->
}
}
Ответ №1:
Прежде всего, я предлагаю вам использовать свойства и автоконфигурацию, установленные Spring kafka, вместо того, чтобы создавать свои собственные, поскольку они следуют принципу DRY: не повторяйтесь.
spring:
kafka:
bootstrap-servers: ${app.kafka_brokers}
consumer:
auto-offset-reset: ${app.offset_reset}
enable-auto-commit: false // <---- disable auto committing
ssl:
protocol: ${app.security.protocol}
key-store-location: ${app.ssl.keystore.location}
key-store-password: ${app.ssl.keystore.password}
trust-store-location: ${app.ssl.truststore.location}
trust-store-password: ${app.ssl.truststore.password}
// And other properties
listener:
ack-mode: manual // This is what you need
AckMode
Документы: https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ContainerProperties .AckMode.html
По сути, manual
это асинхронное подтверждение, в то время manual_immediate
как является синхронным.
Затем внутри вашего @KafkaListener
компонента вы можете ввести org.springframework.kafka.support.Acknowledgment
объект, подтверждающий ваше сообщение.
@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic", groupId = "group")
public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgment) {
System.out.println(record);
acknowledgment.acknowledge();
}
}
Вот документация о том, что можно внедрить в @KafkaListener
метод: https://docs.spring.io/spring-kafka/reference/html/#message-listeners
Ответ №2:
Контейнер-прослушиватель зафиксирует смещение при обычном завершении прослушивания, в зависимости от свойства контейнера AckMode
; AckMode.BATCH
(по умолчанию) означает, что смещения для всех записей, возвращенных опросом, будут зафиксированы после того, как все они будут обработаны, AckMode.RECORD
что означает, что каждое смещение будет зафиксировано, как только слушатель завершит работу.
sync
Vs. async
управляется свойством syncCommits
контейнера.