#spring-boot #apache-kafka #kafka-producer-api
Вопрос:
У меня есть продюсер кафки, который должен быть в состоянии производить около 1 млн событий в минуту. События не могут быть продублированы или пропасть без вести
Ниже приведены мои настройки
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.server.producer}")
String server;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
config.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,120000);
config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
config.put(ProducerConfig.ACKS_CONFIG, "all");
//config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "KafkaSpeedProducer");
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
config.put(ProducerConfig.LINGER_MS_CONFIG, "200");
config.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(256 * 1024));
config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
config.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.toString(8192*4 * 1024));
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
public class KafkaTopicProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void topicProducer(String payload, String topic) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, null, null,
payload);
kafkaTemplate.send(producerRecord);
}
}
с помощью этой настройки я могу достигать 3,9 млн событий в минуту, что идеально.
Однако от чтения до достижения правильного ровно один раз мне нужно реализовать транзакции.
добавлен
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "KafkaSpeedProducer");
перейдите в настройки и измените производителя темы следующим образом
@Service
public class KafkaTopicProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void topicProducer(String payload, String topic) {
kafkaTemplate.executeInTransaction(kt->kt.send(topic, payload));
}
}
С такой настройкой я теперь получаю 1800 событий в минуту, что ужасно.
1.Необходимы ли транзакции для данного варианта использования? 2.Is в любом случае, есть ли способ улучшить это, чтобы повысить скорость, если это необходимо?
Комментарии:
1. 1. что такое usecase?
2. Это финансовые транзакции, которые разносятся в книгу учета в режиме реального времени для отчетности и т.д.