Confluent Cloud — конечная точка REST потребителя Spring Boot?

#java #spring-boot #apache-kafka #confluent-platform

#java #весенняя загрузка #apache-kafka #confluent-платформа

Вопрос:

Я пытаюсь создать приложение Java Spring Boot, которое будет отправлять и получать сообщения от Confluent Cloud Kafka.

Я следил за статьей для публикации сообщения Kafka в Confluent Cloud, и это работает.

Ниже приведена реализация

KafkaController.java

 package com.seroter.confluentboot.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.seroter.confluentboot.dto.Product;
import com.seroter.confluentboot.engine.Producer;

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final Producer producer;
    
    private final com.seroter.confluentboot.engine.Consumer consumer;

    @Autowired
    KafkaController(Producer producer,com.seroter.confluentboot.engine.Consumer consumer) {
        this.producer = producer;
        this.consumer=consumer;
    }

    @PostMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
    }
   
    
    @PostMapping(value="/publishJson")
    public ResponseEntity<Product> publishJsonMessage(@RequestBody Product product) {
        producer.sendJsonMessage(product);
        ResponseEntity<Product> responseEntity=new ResponseEntity<>(product,HttpStatus.CREATED);
        return responseEntity;
    }
    
    
}
  

Product.java

 package com.seroter.confluentboot.dto;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonPropertyOrder(value = {"product_id","product_name","quantity","price"})
public class Product {

    @JsonProperty(value = "product_id")
    private int productId;
    @JsonProperty(value="product_name")
    private String productName;
    
    private int quantity;
    
    private double price;
    
}
  

Producer.java

 package com.seroter.confluentboot.engine;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Service;

import com.seroter.confluentboot.dto.Product;

@Service
@EnableBinding(Source.class)
public class Producer {

    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    private static final String TOPIC = "users";

    @Autowired
    private Source source;

    public void sendMessage(String message) {
        logger.info(String.format("#### -> Producing message -> %s", message));
        this.source.output().send(new GenericMessage<>(message));
    }
    
    
    public void sendJsonMessage(Product product)
    {
        logger.info(String.format("#### -> Producing message -> %s",product.toString()));
        this.source.output().send(new GenericMessage<>(product));
    }
    
}
  

ConfluentBootApplication.java

 package com.seroter.confluentboot;

import org.apache.tomcat.util.net.WriteBuffer.Sink;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.seroter.confluentboot.dto.Product;


@SpringBootApplication
@EnableBinding(Source.class)
@RestController
@RequestMapping(value = "/confluent")

public class ConfluentBootApplication {
    @Autowired
    private  com.seroter.confluentboot.engine.Consumer consumer;
    
    public static void main(String[] args) {
        SpringApplication.run(ConfluentBootApplication.class, args);
    }
    
     
}
  

application.properties

 spring.cloud.stream.kafka.binder.brokers=pkc-epwny.eastus.azure.confluent.cloud:9092
spring.cloud.stream.bindings.output.destination=test
  
spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

server.port=9000
  

Это работает

введите описание изображения здесь

и я мог бы проверить

введите описание изображения здесь

Я хочу создать конечную точку REST потребителя Spring Boot? Как мне это сделать?

Обновить:

ConfluentConsumer.java

 package com.seroter.confluentboot.controller;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

import com.seroter.confluentboot.dto.Product;

//@RestController
@EnableBinding(Sink.class)
public class ConfluentConsumer {
    
    @StreamListener(Sink.INPUT)
    public void consumeMessage(Product product)
    {
        System.out.println("******************************");
        System.out.println("============= " product.getProductId() " ================");
        System.out.println("******************************");
    }

}
  

Consumer.java

 package com.seroter.confluentboot.engine;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Service;

@Service
@PropertySource("classpath:application.properties")
public class Consumer {

    private final Logger logger = LoggerFactory.getLogger(Producer.class);
}
  

Комментарии:

1. Это может помочь, если вы отредактируете свой вопрос, чтобы включить то, что вы ожидаете. Например, тема содержит тысячи сообщений. Вы получаете их все, только последнюю или первую? Если вы получаете более одного сообщения — вы ожидаете, что они будут сериализованы в список? Когда вы получаете несколько сообщений и отправляете один и тот же запрос, фиксирует ли потребитель смещения, и поэтому вы получаете разные сообщения? Я имею в виду, что Kafka действительно не предназначен для такого типа шаблона доступа

Ответ №1:

Я полагаю, что вы пытаетесь сделать здесь, выбрать последнее сообщение с Kafka consumer помощью REST конечной точки, т.Е. Вы хотите вручную опросить Kafka тему. Публикация сообщения через REST конечную точку логична, но использование сообщений через конечную точку не кажется хорошей идеей. Если вам нужно поведение очереди, вы должны использовать RabbitMQ вместо Kafka .

Но все же, если вы хотите использовать Kafka и опрашивать сообщение вручную. Вы можете использовать один из следующих 2 подходов.

Подход 1: создайте ConsumerFactory и получите a Consumer с фабрики, а затем опросите Kafka с помощью Consumer

 @Configuration
class KafkaConsumerConfig {

    private static final String TOPIC_NAME = "test";
    private final String userName = "username";
    private final String password = "password";

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"pkc-epwny.eastus.azure.confluent.cloud:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"conumer-gp-1");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username="   userName   " password="   password);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public Consumer<String, String> createConsumer(ConsumerFactory consumerFactory) {
        Consumer consumer = consumerFactory.createConsumer("consumer-group-1", "client-1");
        consumer.subscribe(List.of(TOPIC_NAME));
        return consumer;
    }
}
  

Вы можете прочитать название темы, идентификатор группы, серверы начальной загрузки, конфигурации SSL и т. Д. Из application.properties

Теперь вы можете использовать сообщения, вводя потребителя в RestController.

 private final Consumer<String, String> consumer;

@Autowired
ConsumerController(Consumer<String, String> consumer) {
    this.consumer = consumer;
}

@GetMapping("retrieveMessage")
public String getMessage() {
    // Kafka might return more than 1 events so be careful
    ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
    if (!consumerRecords.isEmpty()) {
        Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
        String value = iterator.next().value();
        consumer.commitSync();
        return value;
    } else {
        return "no message";
    }
}
  

Подход 2. сохраните сообщения в очереди в памяти, а затем опросите очередь в памяти

 spring.cloud.stream.bindings.input.destination=test
  

Затем сохраните сообщения в a Queue и извлеките их через REST конечную точку

 @RestController
@EnableBinding(Sink.class)
class ConsumerController {

    private final Queue<String> queue;

    ConsumerController() {
        this.queue = new ConcurrentLinkedQueue<>();
    }


    @StreamListener(target = Sink.INPUT)
    public void consume(String message) {
        this.queue.add(message);
    }

    @GetMapping("getMessage")
    public String retrieveMessage() {
        return this.queue.poll();
    }
}
  

Минусы: вы потеряете все сообщения в памяти, если ваше приложение перезапустится. Таким образом, хранение сообщений в распределенном кэше, например, Redis было бы лучшим решением.