#java #spring-boot #apache-kafka #confluent-platform
#java #весенняя загрузка #apache-kafka #confluent-платформа
Вопрос:
Я пытаюсь создать приложение Java Spring Boot, которое будет отправлять и получать сообщения от Confluent Cloud Kafka.
Я следил за статьей для публикации сообщения Kafka в Confluent Cloud, и это работает.
Ниже приведена реализация
KafkaController.java
package com.seroter.confluentboot.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.seroter.confluentboot.dto.Product;
import com.seroter.confluentboot.engine.Producer;
@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {
private final Producer producer;
private final com.seroter.confluentboot.engine.Consumer consumer;
@Autowired
KafkaController(Producer producer,com.seroter.confluentboot.engine.Consumer consumer) {
this.producer = producer;
this.consumer=consumer;
}
@PostMapping(value = "/publish")
public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
this.producer.sendMessage(message);
}
@PostMapping(value="/publishJson")
public ResponseEntity<Product> publishJsonMessage(@RequestBody Product product) {
producer.sendJsonMessage(product);
ResponseEntity<Product> responseEntity=new ResponseEntity<>(product,HttpStatus.CREATED);
return responseEntity;
}
}
Product.java
package com.seroter.confluentboot.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonPropertyOrder(value = {"product_id","product_name","quantity","price"})
public class Product {
@JsonProperty(value = "product_id")
private int productId;
@JsonProperty(value="product_name")
private String productName;
private int quantity;
private double price;
}
Producer.java
package com.seroter.confluentboot.engine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;
import com.seroter.confluentboot.dto.Product;
@Service
@EnableBinding(Source.class)
public class Producer {
private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "users";
@Autowired
private Source source;
public void sendMessage(String message) {
logger.info(String.format("#### -> Producing message -> %s", message));
this.source.output().send(new GenericMessage<>(message));
}
public void sendJsonMessage(Product product)
{
logger.info(String.format("#### -> Producing message -> %s",product.toString()));
this.source.output().send(new GenericMessage<>(product));
}
}
ConfluentBootApplication.java
package com.seroter.confluentboot;
import org.apache.tomcat.util.net.WriteBuffer.Sink;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.seroter.confluentboot.dto.Product;
@SpringBootApplication
@EnableBinding(Source.class)
@RestController
@RequestMapping(value = "/confluent")
public class ConfluentBootApplication {
@Autowired
private com.seroter.confluentboot.engine.Consumer consumer;
public static void main(String[] args) {
SpringApplication.run(ConfluentBootApplication.class, args);
}
}
application.properties
spring.cloud.stream.kafka.binder.brokers=pkc-epwny.eastus.azure.confluent.cloud:9092
spring.cloud.stream.bindings.output.destination=test
spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
server.port=9000
Это работает
и я мог бы проверить
Я хочу создать конечную точку REST потребителя Spring Boot? Как мне это сделать?
Обновить:
ConfluentConsumer.java
package com.seroter.confluentboot.controller;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import com.seroter.confluentboot.dto.Product;
//@RestController
@EnableBinding(Sink.class)
public class ConfluentConsumer {
@StreamListener(Sink.INPUT)
public void consumeMessage(Product product)
{
System.out.println("******************************");
System.out.println("============= " product.getProductId() " ================");
System.out.println("******************************");
}
}
Consumer.java
package com.seroter.confluentboot.engine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Service;
@Service
@PropertySource("classpath:application.properties")
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Producer.class);
}
Комментарии:
1. Это может помочь, если вы отредактируете свой вопрос, чтобы включить то, что вы ожидаете. Например, тема содержит тысячи сообщений. Вы получаете их все, только последнюю или первую? Если вы получаете более одного сообщения — вы ожидаете, что они будут сериализованы в список? Когда вы получаете несколько сообщений и отправляете один и тот же запрос, фиксирует ли потребитель смещения, и поэтому вы получаете разные сообщения? Я имею в виду, что Kafka действительно не предназначен для такого типа шаблона доступа
Ответ №1:
Я полагаю, что вы пытаетесь сделать здесь, выбрать последнее сообщение с Kafka consumer
помощью REST
конечной точки, т.Е. Вы хотите вручную опросить Kafka
тему. Публикация сообщения через REST
конечную точку логична, но использование сообщений через конечную точку не кажется хорошей идеей. Если вам нужно поведение очереди, вы должны использовать RabbitMQ
вместо Kafka
.
Но все же, если вы хотите использовать Kafka
и опрашивать сообщение вручную. Вы можете использовать один из следующих 2 подходов.
Подход 1: создайте ConsumerFactory
и получите a Consumer
с фабрики, а затем опросите Kafka с помощью Consumer
@Configuration
class KafkaConsumerConfig {
private static final String TOPIC_NAME = "test";
private final String userName = "username";
private final String password = "password";
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"pkc-epwny.eastus.azure.confluent.cloud:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG,"conumer-gp-1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=" userName " password=" password);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public Consumer<String, String> createConsumer(ConsumerFactory consumerFactory) {
Consumer consumer = consumerFactory.createConsumer("consumer-group-1", "client-1");
consumer.subscribe(List.of(TOPIC_NAME));
return consumer;
}
}
Вы можете прочитать название темы, идентификатор группы, серверы начальной загрузки, конфигурации SSL и т. Д. Из application.properties
Теперь вы можете использовать сообщения, вводя потребителя в RestController.
private final Consumer<String, String> consumer;
@Autowired
ConsumerController(Consumer<String, String> consumer) {
this.consumer = consumer;
}
@GetMapping("retrieveMessage")
public String getMessage() {
// Kafka might return more than 1 events so be careful
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
if (!consumerRecords.isEmpty()) {
Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
String value = iterator.next().value();
consumer.commitSync();
return value;
} else {
return "no message";
}
}
Подход 2. сохраните сообщения в очереди в памяти, а затем опросите очередь в памяти
spring.cloud.stream.bindings.input.destination=test
Затем сохраните сообщения в a Queue
и извлеките их через REST
конечную точку
@RestController
@EnableBinding(Sink.class)
class ConsumerController {
private final Queue<String> queue;
ConsumerController() {
this.queue = new ConcurrentLinkedQueue<>();
}
@StreamListener(target = Sink.INPUT)
public void consume(String message) {
this.queue.add(message);
}
@GetMapping("getMessage")
public String retrieveMessage() {
return this.queue.poll();
}
}
Минусы: вы потеряете все сообщения в памяти, если ваше приложение перезапустится. Таким образом, хранение сообщений в распределенном кэше, например, Redis
было бы лучшим решением.