#spring-boot #apache-kafka #apache-kafka-streams #spring-kafka
#spring-boot #apache-kafka #apache-kafka-streams #spring-kafka
Вопрос:
У меня есть требование извлекать временную метку (время события) при создании сообщения в приложении-потребителе kafka. Я знаю о timestampExtractor, который можно использовать с kafka stream , но мои требования отличаются, поскольку я не использую stream для получения сообщения.
Мой производитель kafka выглядит следующим образом :
@Override
public void run(ApplicationArguments args) throws Exception {
List<String> names = Arrays.asList("priya", "dyser", "Ray", "Mark", "Oman", "Larry");
List<String> pages = Arrays.asList("blog", "facebook", "instagram", "news", "youtube", "about");
Runnable runnable = () -> {
String rPage = pages.get(new Random().nextInt(pages.size()));
String rName = pages.get(new Random().nextInt(names.size()));
PageViewEvent pageViewEvent = new PageViewEvent(rName, rPage, Math.random() > .5 ? 10 : 1000);
Message<PageViewEvent> message = MessageBuilder
.withPayload(pageViewEvent).
setHeader(KafkaHeaders.MESSAGE_KEY, pageViewEvent.getUserId().getBytes())
.build();
try {
this.pageViewsOut.send(message);
log.info("sent " message);
} catch (Exception e) {
log.error(e);
}
};
Потребитель Kafka реализован с использованием Spring kafka @KafkaListener.
@KafkaListener(topics = "test1" , groupId = "json", containerFactory = "kafkaListenerContainerFactory")
public void receive(@Payload PageViewEvent data,@Headers MessageHeaders headers) {
LOG.info("Message received");
LOG.info("received data='{}'", data);
}
Конфигурация завода-изготовителя контейнеров
@Bean
public ConsumerFactory<String,PageViewEvent > priceEventConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(PageViewEvent.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> priceEventsKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PageViewEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(priceEventConsumerFactory());
return factory;
}
Производитель, который отправляет сообщение, когда я печатаю, дает мне следующие данные :
[полезная нагрузка=просмотр страницы (идентификатор пользователя = блог, страница = о программе, продолжительность = 10), заголовки={id=8ebdad85-e2f7-958f-500e-4560ac0970e5, kafka_messageKey=[B@71975e1a, ContentType=application/json, timestamp = 1553041963803}]
У этого действительно есть созданная временная метка. Как я могу получить временную метку, созданную сообщением, с помощью Spring kafka?
Комментарии:
1.
headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)
В@KafkaListener
у вас не работает?2. @Gary Russell Это работает… но это не временная метка, созданная сообщением. Насколько мне известно, это временная метка, когда потребитель ее получил. Дайте мне знать, если я неправильно понимаю.
3. Но KafkaHeaders.TimestampType — это ВРЕМЯ СОЗДАНИЯ, значит ли это, что в RECEIVED_TIMESTAMP время создания сообщения??
Ответ №1:
RECEIVED_TIMESTAMP означает, что это временная метка из полученной записи, а не время ее получения.. Мы избегаем указывать это во временной метке, чтобы избежать случайного распространения в исходящее сообщение.
Комментарии:
1. Спасибо @Gary Russell. Это означает, что все зависит от TimestampType… какое значение будет введено.
2. Временная метка чего?
3.
ConsumerRecord.timestamp()
— она может быть установлена либо производителем, либо брокером. Существует также поле TimestampType, которое описывает временную меткуNO_TIMESTAMP_TYPE(-1, "NoTimestampType"), CREATE_TIME(0, "CreateTime"), LOG_APPEND_TIME(1, "LogAppendTime");
.
Ответ №2:
You can use something like below:
final Producer<String, String> producer = new KafkaProducer<String, String>(properties);
long time = System.currentTimeMillis();
final CountDownLatch countDownLatch = new CountDownLatch(5);
int count=0;
try {
for (long index = time; index < time 10; index ) {
String key = null;
count ;
if(count<=5)
key = "id_" Integer.toString(1);
else
key = "id_" Integer.toString(2);
final ProducerRecord<String, String> record =
new ProducerRecord<>(TOPIC, key, "B2B Sample Message: " count);
producer.send(record, (metadata, exception) -> {
long elapsedTime = System.currentTimeMillis() - time;
if (metadata != null) {
System.out.printf("sent record(key=%s value=%s) "
"meta(partition=%d, offset=%d) time=%d timestamp=%dn",
record.key(), record.value(), metadata.partition(),
metadata.offset(), elapsedTime, metadata.timestamp());
System.out.println("Timestamp:: " metadata.timestamp() );
} else {
exception.printStackTrace();
}
countDownLatch.countDown();
});
}
try {
countDownLatch.await(25, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
producer.flush();
producer.close();
}
}