Ровно один раз продюсер замедлил

#spring-boot #apache-kafka #kafka-producer-api

Вопрос:

У меня есть продюсер кафки, который должен быть в состоянии производить около 1 млн событий в минуту. События не могут быть продублированы или пропасть без вести

Ниже приведены мои настройки

 @Configuration
public class KafkaProducerConfig {

    @Value("${kafka.server.producer}")
    String server;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,120000);
        config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        //config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "KafkaSpeedProducer");

        config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        config.put(ProducerConfig.LINGER_MS_CONFIG, "200");
        config.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(256 * 1024));
        config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(8192*4 * 1024));
        return new DefaultKafkaProducerFactory<>(config);

    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {

        return new KafkaTemplate<>(producerFactory());
    }

}



@Service
public class KafkaTopicProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void topicProducer(String payload, String topic) {

        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, null, null,
                payload);

        kafkaTemplate.send(producerRecord);
 
    }
}
 

с помощью этой настройки я могу достигать 3,9 млн событий в минуту, что идеально.
Однако от чтения до достижения правильного ровно один раз мне нужно реализовать транзакции.

добавлен

 config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "KafkaSpeedProducer");
 

перейдите в настройки и измените производителя темы следующим образом

 @Service
public class KafkaTopicProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void topicProducer(String payload, String topic) {
      kafkaTemplate.executeInTransaction(kt->kt.send(topic, payload));
        
         
    }
}
 

С такой настройкой я теперь получаю 1800 событий в минуту, что ужасно.

1.Необходимы ли транзакции для данного варианта использования? 2.Is в любом случае, есть ли способ улучшить это, чтобы повысить скорость, если это необходимо?

Комментарии:

1. 1. что такое usecase?

2. Это финансовые транзакции, которые разносятся в книгу учета в режиме реального времени для отчетности и т.д.