Слияние событий KSQL — объединение событий из одного потока на основе метки времени

#apache-kafka #ksqldb

#apache-kafka #ksqldb

Вопрос:

Я пытаюсь объединить несколько событий из одного входного потока в одно выходное событие, сгруппированное по метке времени, используя ksql. Я бы также хотел, чтобы выходное событие содержало среднее значение входных событий, хотя это не совсем несерьезно и более приятно иметь.

Входной поток: температура

 event1: {location: "hallway", value: 23, property_Id: "123", timestamp: "1551645625878"} 
event2: {location: "bedroom", value: 21, property_Id: "123", timestamp: "1551645625878"}
event3: {location: "kitchen", value: 20, property_Id: "123", timestamp: "1551645625878"}
event4: {location: "hallway", value: 19, property_Id: "123", timestamp: "9991645925878"} 
event5: {location: "bedroom", value: 18, property_Id: "123", timestamp: "9991645925878"}
event6: {location: "kitchen", value: 18, property_Id: "123", timestamp: "9991645925878"}
  

(желаемый) выходной поток:

 event1:
{
    "property_id": "123",
    "timestamp": "1551645625878",
    "average_temperature": 21,   
    "temperature": [
        {
            "location": "hallway",
            "value": 23
        },
        {
            "location": "bedroom",
            "value": 21
        },
        {
            "location": "kitchen",
            "value": 20
        }
    ]
}

event2:
{
    "property_id": "123",
    "timestamp": "9991645925878",
    "average_temperature": 18,   
    "temperature": [
        {
            "location": "hallway",
            "value": 19
        },
        {
            "location": "bedroom",
            "value": 18
        },
        {
            "location": "kitchen",
            "value": 18
        }
    ]
}
  

Насколько я могу судить, это просто невозможно с использованием ksql, кто-нибудь может подтвердить?

Ответ №1:

Правильно, вы не можете сделать это в KSQL в настоящее время. Начиная с версии 5.1.1 / март 2019 KSQL может считывать вложенные объекты, но не создавать их: https://github.com/confluentinc/ksql/issues/2147 (пожалуйста, поддержите / прокомментируйте, если вам это нужно)

Вы могли бы выполнить среднее вычисление, используя что-то вроде:

 SELECT timestamp, SUM(value)/COUNT(*) AS avg_temp 
  FROM input_stream 
  GROUP BY timestamp;
  

Комментарии:

1. Спасибо, итак, можно ли создать процессор с использованием steamsapi или мне просто записать данные в базу данных, а затем создать новый поток оттуда?

2. Уверен, вы можете сделать это с потоками Kafka — проблема, которую я вижу, заключается только в манипулировании структурой сообщений, и потоки Java / Kafka предоставят вам гибкость для этого.

3. @SamShiles Интересуется, как у вас дела. Предъявляйте те же / схожие требования. Чем-нибудь вы можете поделиться?

4. @DarVar В итоге мы отказались от kafka, просто слишком много вещей не совсем закончено для вариантов использования, которые мы изучали