#java #spring #apache-kafka #spring-cloud-stream
#java #весна #apache-kafka #spring-cloud-stream
Вопрос:
Есть ли какой-либо способ, которым производитель может отправлять сообщения в определенный раздел темы в брокере?
На данный момент я могу отправить тему, имеющую 2 раздела, но не могу управлять отправкой в определенный раздел.
Component
@EnableBinding(Source.class)
public class RsvpsKafkaProducer {
private static final int SENDING_MESSAGE_TIMEOUT_MS = 10000;
private final Source source;
public RsvpsKafkaProducer(Source source) {
this.source = source;
}
public void sendRsvpMessage(WebSocketMessage<?> message) {
System.out.println("sendRsvpMessage");
source.output()
.send(MessageBuilder.withPayload(message.getPayload())
.build(),
SENDING_MESSAGE_TIMEOUT_MS);
}
}
application.properties
spring.cloud.stream.kafka.binder.zkNodes=localhost:2181
spring.cloud.stream.kafka.binder.brokers=localhost:9093
spring.cloud.stream.bindings.output.destination=meetupTopic
spring.cloud.stream.bindings.output.producer.partitionCount=2
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.producer.headerMode=raw
Есть ли какой-либо способ, которым я мог бы достичь этого, используя spring cloud stream? Я хочу, чтобы некоторые сообщения отправлялись в P1
раздел, а некоторые — в P2
раздел внутри meetupTopic
.
Комментарии:
1. Возможно, вам следует рассмотреть возможность использования групп?
2. @jr593, значение?
3. Что это за класс message builder? Не могли бы вы рассказать об этом подробнее? ProducerRecord из kafka может быть напрямую предоставлен раздел, в который вы хотите его отправить, поэтому есть вероятность, что MessageBuilder должен каким-то образом предоставлять ту же функциональность.
4. @RishabhSharma, docs.spring.io/spring-framework/docs/current/javadoc-api/org /…
Ответ №1:
MessageBuilder.withPayload(message.getPayload())
.setHeader(KafkaHeaders.PARTITION_ID, 23)
.build()
Комментарии:
1. Могу ли я каким-либо образом передать какое-либо пользовательское имя раздела, и если оно не существует, будет создано?
2. См. docs.spring.io/spring-cloud-stream/docs/3.0.8.RELEASE/reference/…
Ответ №2:
Я не пробовал это, но из документов похоже, что это может сработать.
spring.cloud.stream.bindings.output.producer.partitionSelectorExpression=headers['partitionKey']
А затем вы добавляете заголовок partitionKey
при отправке сообщения.
Комментарии:
1. Это будет работать, но это необходимо
partitioned=true
, и оно предназначено для более сложного разделения на уровне приложений — проще просто установитьKafkaHeaders.PARTITION_ID
заголовок. Который является механизмом по умолчанию, используемым базовой инфраструктурой spring-integration-messaging.2. Хорошее решение, Гэри, я искал что-то подобное, но не смог его найти.