#java #apache-kafka #spring-kafka #confluent-platform #confluent-cloud
Вопрос:
Я пытаюсь привести пример публикации события в kafka с помощью двух разных API. Один принимает события создания, а другой принимает события обновления пользователей одной и той же темы.
Оба этих API публикуют сообщения в одной и той же теме кафки с разными схемами.
В основном созданные пользователем, обновленные пользователем схемы публикуются в разделе пользователи.
Потребителя интересуют только события, обновленные пользователем, поэтому ему интересно, как можно настроить потребителя, поддерживающего TopicRecordNameStrategy. Прямо сейчас у меня есть два потребителя, которые слушают тему пользователей, и оба получают сообщения.
Я написал пример приложения spring boot, подключающегося к confluent kafka, где производитель и потребитель находятся в одном и том же сервисе
У меня есть API rest, который поддерживает две конечные точки для создания и обновления. Этот rest API публикует сообщение производителю
Вот мой код.
приложение yml
name: users
partitions-num: 3
replication-factor: 3
server:
port: 9080
spring:
kafka:
properties:
bootstrap.servers: *****
value.subject.name.strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
sasl:
jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='****' password='****';
mechanism: PLAIN
security.protocol: SASL_SSL
# CCloud Schema Registry Connection parameter
schema.registry.url: ******
basic.auth.credentials.source: USER_INFO
schema.registry.basic.auth.user.info: *****:*****
consumer:
group-id: local-test-user-consumergroup
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
template:
default-topic:
logging:
level:
root: info
RestController.java
import com.aligntech.UserUpdated;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/user")
public class KafkaController {
private final Producer producer;
@Autowired
KafkaController(Producer producer) {
this.producer = producer;
}
@PostMapping(value = "/create")
public void create(@RequestParam("name") String name, @RequestParam("age") Integer age) {
UserCreated build = UserCreated.newBuilder().setAge(age).setName(name).build();
this.producer.sendMessage(build);
}
@PutMapping(value = "/update")
public void update(@RequestParam("name") String name, @RequestParam("age") Integer age) {
UserUpdated build = UserUpdated.newBuilder().setAge(age).setName(name).setUpdated(true).build();
this.producer.sendMessage(build);
}
}
Producer.java
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class Producer {
private final KafkaTemplate<String, User> kafkaTemplate;
@Value("${topic.name}")
private String topic;
@Autowired
public Producer(KafkaTemplate<String, User> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
void sendMessage(com.aligntech.UserCreated user) {
ProducerRecord record = new ProducerRecord<>(topic, user);
kafkaTemplate.send(record);
}
void sendMessage(com.aligntech.UserUpdated user) {
ProducerRecord record = new ProducerRecord<>(topic, user);
kafkaTemplate.send(record);
}
}
Consumer.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class Consumer {
@KafkaListener(topics = "users", groupId = "user-updated-group")
public void consumeUserUpdated(ConsumerRecord<String, com.aligntech.UserUpdated> record) {
log.info("Read UserUpdated Record");
log.info(record.topic() ":" record.key() ":" record.headers());
log.info(String.format("Consumed message -> %s", record.value()));
}
@KafkaListener(topics = "users", groupId = "user-created-group")
public void consumeUserCreated(ConsumerRecord<String, com.aligntech.UserCreated> record) {
log.info("Read UserCreated Record");
log.info(record.topic() ":" record.key() ":" record.headers());
log.info(String.format("Consumed message -> %s", record.value()));
}
}
Выход
2021-05-24 14:51:54.220 INFO 3878 --- [ntainer#1-0-C-1] com.****.kafka.kafkatest.Consumer : Read UserCreated Record
2021-05-24 14:51:54.223 INFO 3878 --- [ntainer#1-0-C-1] com.****.kafka.kafkatest.Consumer : users:null:RecordHeaders(headers = [], isReadOnly = false)
2021-05-24 14:51:54.223 INFO 3878 --- [ntainer#1-0-C-1] com.****.kafka.kafkatest.Consumer : Consumed message -> {"name": "UserUpdated", "age": 25, "updated": true}
2021-05-24 14:51:54.508 INFO 3878 --- [ntainer#0-0-C-1] com.****.kafka.kafkatest.Consumer : Read UserUpdated Record
2021-05-24 14:51:54.508 INFO 3878 --- [ntainer#0-0-C-1] com.****.kafka.kafkatest.Consumer : users:null:RecordHeaders(headers = [], isReadOnly = false)
2021-05-24 14:51:54.508 INFO 3878 --- [ntainer#0-0-C-1] com.****.kafka.kafkatest.Consumer : Consumed message -> {"name": "UserUpdated", "age": 25, "updated": true}
Ответ №1:
Вам нужно будет добавить a RecortFilterStrategy
в фабрику контейнеров прослушивателя.
К сожалению, существует только один на фабрику, который применяется ко всем слушателям, поэтому вам понадобится другая фабрика (и фильтр) для каждого слушателя.
Вы используете исполнителя пользовательских задач в каждом контейнере (с помощью настройщика контейнеров) и используете имя потока, чтобы решить, какие записи отфильтровывать.
При необходимости я могу привести пример.
@SpringBootApplication
public class So67669747Application {
public static void main(String[] args) {
SpringApplication.run(So67669747Application.class, args);
}
@Bean
NewTopic topic2() {
return TopicBuilder.name("so67669747").partitions(1).replicas(1).build();
}
@Bean
RecordFilterStrategy<String, String> f1() {
return rec -> Thread.currentThread().getName().contains("exec1")
? rec.value().startsWith("A")
: rec.value().startsWith("B");
}
@Bean
AsyncListenableTaskExecutor exec1() {
return new SimpleAsyncTaskExecutor("exec1-");
}
@Bean
AsyncListenableTaskExecutor exec2() {
return new SimpleAsyncTaskExecutor("exec2-");
}
@KafkaListener(id = "so67669747-1", topics = "so67669747")
public void listen1(String in) {
System.out.println("1:" in);
}
@KafkaListener(id = "so67669747-2", topics = "so67669747")
public void listen2(String in) {
System.out.println("2:" in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send("so67669747", i % 2 == 0 ? "AAAAAAA" : "BBBBBBB"));
};
}
}
@Component
class FactoryCustomizer {
FactoryCustomizer(ConcurrentKafkaListenerContainerFactory<String, String> factory,
RecordFilterStrategy<String, String> f1,
AsyncListenableTaskExecutor exec1, AsyncListenableTaskExecutor exec2) {
factory.setRecordFilterStrategy(f1);
factory.setContainerCustomizer(container -> {
if (container.getGroupId().equals("so67669747-1")) {
container.getContainerProperties().setConsumerTaskExecutor(exec1);
}
else if (container.getGroupId().equals("so67669747-2")) {
container.getContainerProperties().setConsumerTaskExecutor(exec2);
}
});
}
}
Комментарии:
1. Я добавил пример.