Как запустить kafka consumer перед производителем

#java #spring-boot #apache-kafka

#java #весенняя загрузка #apache-kafka

Вопрос:

У меня есть множество микросервисов, взаимодействующих с темой kafka. Один из микросервисов должен использовать два целых числа, после чего суммировать их и отправлять в тему. Проблема в том, что я не могу настроить микросервис так, чтобы потребитель запускался перед производителем. Мой код выглядит следующим образом:

 @SpringBootApplication
public class AdderApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(AdderApplication.class, args);
        AdderConsumer consumer = context.getBean(AdderConsumer.class);
        AdderProducer producer = context.getBean(AdderProducer.class);

        producer.sumTwoIntegers();
    }
  
 @Component
public class AdderConsumer extends Controller {

    private CountDownLatch latch = new CountDownLatch(3);

    @KafkaListener(topics = "${kafka.topic.name}")
    public void listenToPartitionWithOffset(@Payload Integer message) {
        if (message != null) {
            list.add(message);
            isProduce = true;
            System.out.println(list);
        }   
    }
  
 @Component
public class AdderProducer extends Controller {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.topic.name}")
    private String topicName;


    public void sumTwoIntegers() {
       // logic
    }

    private void sendMessage(String message) {
        // logic
    }
  
 @Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.boot.server}")
    private String kafkaServer;

    @Value("${kafka.consumer.group.id}")
    private String kafkaGroupId;

    @Bean
    public LoggingErrorHandler errorHandler(){
        return new LoggingErrorHandler();
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerConfig());
    }

    @Bean
    public ProducerFactory<String, String> producerConfig() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(config);
    }

    @Bean
    public ConsumerFactory<String, Integer> consumerConfig() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(config);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Integer>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Integer> listener = new ConcurrentKafkaListenerContainerFactory<>();
        listener.setConsumerFactory(consumerConfig());
        listener.setErrorHandler(errorHandler());
        return listener;
    }
  

Я отладил код, и он вызывает как производителя, так и слушателя, однако мне нужно, чтобы слушатель сначала получал оба целых числа и только после этого вызывал метод производителя.

Я буду признателен за ваши мысли.

Комментарии:

1. микросервис использует и производит ту же тему

2. Используя Spring, я не думаю, что это возможно. Я бы также обычно отделял потребителя от производителя, если вам действительно нужны «микро» сервисы

3. Кроме того, используя ту же тему или логику, подобную этой, вы обычно используете потоки Kafka, а не API производителя / потребителя

Ответ №1:

Я нашел решение. Я реализовал таймер для производителя, и он периодически вызывается при запуске приложения. Поэтому, когда он впервые вызывается без данных для обработки, он ничего не публикует, затем клиент потребляет данные, а второй вызов producer отправляет необходимые данные в тему.

Другим решением является вызов метода производителя из потребителя.