#apache-kafka #ksqldb
#apache-kafka #ksqldb
Вопрос:
Я пытаюсь объединить несколько событий из одного входного потока в одно выходное событие, сгруппированное по метке времени, используя ksql. Я бы также хотел, чтобы выходное событие содержало среднее значение входных событий, хотя это не совсем несерьезно и более приятно иметь.
Входной поток: температура
event1: {location: "hallway", value: 23, property_Id: "123", timestamp: "1551645625878"}
event2: {location: "bedroom", value: 21, property_Id: "123", timestamp: "1551645625878"}
event3: {location: "kitchen", value: 20, property_Id: "123", timestamp: "1551645625878"}
event4: {location: "hallway", value: 19, property_Id: "123", timestamp: "9991645925878"}
event5: {location: "bedroom", value: 18, property_Id: "123", timestamp: "9991645925878"}
event6: {location: "kitchen", value: 18, property_Id: "123", timestamp: "9991645925878"}
(желаемый) выходной поток:
event1:
{
"property_id": "123",
"timestamp": "1551645625878",
"average_temperature": 21,
"temperature": [
{
"location": "hallway",
"value": 23
},
{
"location": "bedroom",
"value": 21
},
{
"location": "kitchen",
"value": 20
}
]
}
event2:
{
"property_id": "123",
"timestamp": "9991645925878",
"average_temperature": 18,
"temperature": [
{
"location": "hallway",
"value": 19
},
{
"location": "bedroom",
"value": 18
},
{
"location": "kitchen",
"value": 18
}
]
}
Насколько я могу судить, это просто невозможно с использованием ksql, кто-нибудь может подтвердить?
Ответ №1:
Правильно, вы не можете сделать это в KSQL в настоящее время. Начиная с версии 5.1.1 / март 2019 KSQL может считывать вложенные объекты, но не создавать их: https://github.com/confluentinc/ksql/issues/2147 (пожалуйста, поддержите / прокомментируйте, если вам это нужно)
Вы могли бы выполнить среднее вычисление, используя что-то вроде:
SELECT timestamp, SUM(value)/COUNT(*) AS avg_temp
FROM input_stream
GROUP BY timestamp;
Комментарии:
1. Спасибо, итак, можно ли создать процессор с использованием steamsapi или мне просто записать данные в базу данных, а затем создать новый поток оттуда?
2. Уверен, вы можете сделать это с потоками Kafka — проблема, которую я вижу, заключается только в манипулировании структурой сообщений, и потоки Java / Kafka предоставят вам гибкость для этого.
3. @SamShiles Интересуется, как у вас дела. Предъявляйте те же / схожие требования. Чем-нибудь вы можете поделиться?
4. @DarVar В итоге мы отказались от kafka, просто слишком много вещей не совсем закончено для вариантов использования, которые мы изучали