Не удается создавать и использовать события для Кафки и из Кафки в одном и том же приложении spring boot

#spring-boot #apache-kafka #apache-kafka-streams #spring-cloud-stream

Вопрос:

  1. Запускаю Кафку на моем локальном компьютере localhost:9092
  2. Создал тему ProductOrders с Replica 1 и Partition 3
  3. Создал spring-boot 2.4.3 spring-kafka 2.6.6 spring-cloud-stream 3.1.1 приложение и с помощью POJO OrderEvent, OrderEventPublisher и OrderEventConsumer
  4. Хотите опубликовать OrderEvent в теме ProductOrders и использовать OrderEvent из темы OrderEvent
  5. Запустив приложение, я не вижу, чтобы события заказа публиковались или использовались через приложение. Также проверено, что даже те kafka-console-consumer не могут видеть события.

application.yml

 spring:
  cloud.stream:
    bindings:
      orderCreated-out-0:
        destination: ProductOrders
      processOrder-in-0:
        destination: ProductOrders
    kafka.streams.broker:
      configuration:
        commit.interval.ms: 1000
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
        spring.json.trusted.packages: '*'
        spring.json.value.default.type: com.poc.kafka.orderanalytics.events.OrderEvent
      brokers: localhost:9092
      serdeError: logAndContinue

  kafka:
    bootstrap-servers: localhost:9092
 

OrderEventPublisher.java

 public class OrderEventPublisher {
    @Bean
    public Supplier<OrderEvent> orderCreated() {
        log.info("Configuring Order Data");
        Map<String, String[]> orders = new HashMap<>();
        orders.put("home_appliances", new String[] {"Toaster","Grill","Veg Cutter"});
        orders.put("gardening", new String[] {"Coco Pit","Vermi Compost","Petunia Seeds"});
        orders.put("toys", new String[] {"Dump Truck","Fighter Jet","Fire Truck"});
        orders.put("books", new String[] {"Let Us C","Effective Java","Apache Kafka, complete guide"});
        orders.put("cloths", new String[] {"T Shirt","Short","Vests"});
        return () -> {
            log.info("Creating order");
            String category = (String) orders.keySet().toArray()[new Random().nextInt(orders.size())];
            String name = orders.get(category)[new Random().nextInt(orders.get(category).length)];
            OrderEvent orderEvent = new OrderEvent();
            orderEvent.setCategory(category);
            orderEvent.setName(name);
            orderEvent.setQuantity(1);
            return orderEvent;
        };
    }
}
 

OrderEventConsumer.java

 public class OrderEventConsumer {
    @Bean
    public Consumer<OrderEvent> processOrder() {
        log.info("Consuming the order");
        return orderEvent -> {
          log.info("Order Consumed with category: {}, name: {}, qty: {}",orderEvent.getCategory(), orderEvent.getName(), orderEvent.getQuantity());
        };
    }
}
 

Если я разделю производителя и потребителя на два разных приложения spring boot, все будет работать так, как ожидалось. В чем может быть проблема, из-за которой он не работает, когда и производитель, и потребитель существуют в одном приложении spring boot?

Комментарии:

1. Это должно работать совершенно нормально. Я не вижу никаких проблем в вашей конфигурации. Возможно, вы можете опубликовать/продвинуть уменьшенную версию своего проекта — просто минимум, чтобы воспроизвести проблему, чтобы мы могли взглянуть? Кроме того, есть ли трассировка стека или что-то в журнале, что вы, возможно, пропустили?

2. В дополнение к тому, что Олег предложил в приведенном выше комментарии, вам также не нужна никакая конфигурация, так spring.cloud.stream.kafka.streams.. как я не вижу использования потоков Кафки в вашем приложении.

3. Спасибо @Oleg за ответ. Я отправил минимальное приложение по ссылке: ссылка

4. @sobychacko спасибо. Я удалю конфигурацию потоков, но у меня есть план использовать пары Кафки позже, как только приложение будет вести себя так, как ожидалось.

5. Привет, попробуйте поделиться минимально воспроизводимым примером, а затем мы сможем продолжить сортировку.

Ответ №1:

Приношу извинения за задержку с ответом. Возникла проблема с конфигурацией приложения. Поскольку в приложении у вас есть две разные функции, вам необходимо указать платформе, какие функции необходимо активировать. После добавления следующей конфигурации я смог запустить приложение, а затем создать/использовать данные.

 spring.cloud.function.definition: orderCreated;processOrder