Отправить сообщение в определенный раздел темы

#java #spring #apache-kafka #spring-cloud-stream

#java #весна #apache-kafka #spring-cloud-stream

Вопрос:

Есть ли какой-либо способ, которым производитель может отправлять сообщения в определенный раздел темы в брокере?

На данный момент я могу отправить тему, имеющую 2 раздела, но не могу управлять отправкой в определенный раздел.

 Component
@EnableBinding(Source.class)
public class RsvpsKafkaProducer {
  
    private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;

    private final Source source;

    public RsvpsKafkaProducer(Source source) {
        this.source = source;
    }

    public void sendRsvpMessage(WebSocketMessage<?> message) {
        
         System.out.println("sendRsvpMessage");
         source.output()
                .send(MessageBuilder.withPayload(message.getPayload())
                        .build(),
                        SENDING_MESSAGE_TIMEOUT_MS);   
    }
}
  

application.properties

 spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9093

spring.cloud.stream.bindings.output.destination=meetupTopic
spring.cloud.stream.bindings.output.producer.partitionCount=2

spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.producer.headerMode=raw
  

Есть ли какой-либо способ, которым я мог бы достичь этого, используя spring cloud stream? Я хочу, чтобы некоторые сообщения отправлялись в P1 раздел, а некоторые — в P2 раздел внутри meetupTopic .

Комментарии:

1. Возможно, вам следует рассмотреть возможность использования групп?

2. @jr593, значение?

3. Что это за класс message builder? Не могли бы вы рассказать об этом подробнее? ProducerRecord из kafka может быть напрямую предоставлен раздел, в который вы хотите его отправить, поэтому есть вероятность, что MessageBuilder должен каким-то образом предоставлять ту же функциональность.

4. @RishabhSharma, docs.spring.io/spring-framework/docs/current/javadoc-api/org /…

Ответ №1:

 MessageBuilder.withPayload(message.getPayload())
                        .setHeader(KafkaHeaders.PARTITION_ID, 23)
                        .build()
  

Комментарии:

1. Могу ли я каким-либо образом передать какое-либо пользовательское имя раздела, и если оно не существует, будет создано?

2. См. docs.spring.io/spring-cloud-stream/docs/3.0.8.RELEASE/reference/…

Ответ №2:

Я не пробовал это, но из документов похоже, что это может сработать.

 spring.cloud.stream.bindings.output.producer.partitionSelectorExpression=headers['partitionKey']
  

А затем вы добавляете заголовок partitionKey при отправке сообщения.

Комментарии:

1. Это будет работать, но это необходимо partitioned=true , и оно предназначено для более сложного разделения на уровне приложений — проще просто установить KafkaHeaders.PARTITION_ID заголовок. Который является механизмом по умолчанию, используемым базовой инфраструктурой spring-integration-messaging.

2. Хорошее решение, Гэри, я искал что-то подобное, но не смог его найти.