#spring-boot #apache-kafka #apache-kafka-streams #spring-cloud-stream
Вопрос:
- Запускаю Кафку на моем локальном компьютере
localhost:9092
- Создал тему
ProductOrders
сReplica 1
иPartition 3
- Создал
spring-boot 2.4.3
spring-kafka 2.6.6
spring-cloud-stream 3.1.1
приложение и с помощью POJO OrderEvent, OrderEventPublisher и OrderEventConsumer - Хотите опубликовать
OrderEvent
в темеProductOrders
и использоватьOrderEvent
из темыOrderEvent
- Запустив приложение, я не вижу, чтобы события заказа публиковались или использовались через приложение. Также проверено, что даже те
kafka-console-consumer
не могут видеть события.
application.yml
spring:
cloud.stream:
bindings:
orderCreated-out-0:
destination: ProductOrders
processOrder-in-0:
destination: ProductOrders
kafka.streams.broker:
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
spring.json.trusted.packages: '*'
spring.json.value.default.type: com.poc.kafka.orderanalytics.events.OrderEvent
brokers: localhost:9092
serdeError: logAndContinue
kafka:
bootstrap-servers: localhost:9092
OrderEventPublisher.java
public class OrderEventPublisher {
@Bean
public Supplier<OrderEvent> orderCreated() {
log.info("Configuring Order Data");
Map<String, String[]> orders = new HashMap<>();
orders.put("home_appliances", new String[] {"Toaster","Grill","Veg Cutter"});
orders.put("gardening", new String[] {"Coco Pit","Vermi Compost","Petunia Seeds"});
orders.put("toys", new String[] {"Dump Truck","Fighter Jet","Fire Truck"});
orders.put("books", new String[] {"Let Us C","Effective Java","Apache Kafka, complete guide"});
orders.put("cloths", new String[] {"T Shirt","Short","Vests"});
return () -> {
log.info("Creating order");
String category = (String) orders.keySet().toArray()[new Random().nextInt(orders.size())];
String name = orders.get(category)[new Random().nextInt(orders.get(category).length)];
OrderEvent orderEvent = new OrderEvent();
orderEvent.setCategory(category);
orderEvent.setName(name);
orderEvent.setQuantity(1);
return orderEvent;
};
}
}
OrderEventConsumer.java
public class OrderEventConsumer {
@Bean
public Consumer<OrderEvent> processOrder() {
log.info("Consuming the order");
return orderEvent -> {
log.info("Order Consumed with category: {}, name: {}, qty: {}",orderEvent.getCategory(), orderEvent.getName(), orderEvent.getQuantity());
};
}
}
Если я разделю производителя и потребителя на два разных приложения spring boot, все будет работать так, как ожидалось. В чем может быть проблема, из-за которой он не работает, когда и производитель, и потребитель существуют в одном приложении spring boot?
Комментарии:
1. Это должно работать совершенно нормально. Я не вижу никаких проблем в вашей конфигурации. Возможно, вы можете опубликовать/продвинуть уменьшенную версию своего проекта — просто минимум, чтобы воспроизвести проблему, чтобы мы могли взглянуть? Кроме того, есть ли трассировка стека или что-то в журнале, что вы, возможно, пропустили?
2. В дополнение к тому, что Олег предложил в приведенном выше комментарии, вам также не нужна никакая конфигурация, так
spring.cloud.stream.kafka.streams..
как я не вижу использования потоков Кафки в вашем приложении.3. Спасибо @Oleg за ответ. Я отправил минимальное приложение по ссылке: ссылка
4. @sobychacko спасибо. Я удалю конфигурацию потоков, но у меня есть план использовать пары Кафки позже, как только приложение будет вести себя так, как ожидалось.
5. Привет, попробуйте поделиться минимально воспроизводимым примером, а затем мы сможем продолжить сортировку.
Ответ №1:
Приношу извинения за задержку с ответом. Возникла проблема с конфигурацией приложения. Поскольку в приложении у вас есть две разные функции, вам необходимо указать платформе, какие функции необходимо активировать. После добавления следующей конфигурации я смог запустить приложение, а затем создать/использовать данные.
spring.cloud.function.definition: orderCreated;processOrder