Как мне обработать входящее событие и на основе поля в событии выполнить запись в разные потоки, используя wso2?

#wso2 #siddhi #stream-processing #wso2sp #event-stream-processing

#wso2 #siddhi #потоковая обработка #wso2-streaming-integrator #обработка потока событий

Вопрос:

Я пытаюсь использовать один поток, обрабатывать входящий формат json и записывать в разные потоки на основе атрибута в событии. Например, если входной поток состоит из чего-то подобного:

 { "event_type" : "temperature",
  "json" : {    
            "type": "Temperature",
            "DeviceID":"xyz",
            "temperature": "32",
            "timestamp" :  "2019-03-19T12:37:43.356119Z"
            }
 }
  

Другое событие выглядит следующим образом:

 { "event_type" : "location",
  "json" : {    
        "type": "GPS",
          "DeviceID":"xyz",
         "location": {"coordinates": [-73.856077, 40.848447]},
         "timestamp" :  "2019-09-22T00:00:00 05:30"
           }
 }
  

Оба события передаются в одну конечную точку http (это ограничение, с которым я сталкиваюсь)

Как я могу использовать единый поток http-источника, обработать эти события и, если они event_type есть temperature , вставить в temperature_collection в mongo db, и если они event_type есть location , вставить в location_collection в mongo db?

  1. Возможно ли сделать это с помощью одного потока?

  2. Если нет, то как я могу избежать записи нескольких конечных точек, по одной для каждого типа события?

Ответ №1:

Да, можно определить только один поток и маршрутизировать каждый поток с помощью фильтров Siddhi,

 @source(type='http' , receiver.url='http://localhost:8000/SensorStream', 
    @map(type='json', fail.on.missing.attribute='false', 
        @attributes(eventType='$.event_type', type='$.json.type',deviceID='$.json.DeviceID', temperature='$.json.temperature', location='$.json.location', timestamp='$.json.timestamp' ) ) ) 
define stream SensorStream(eventType string, type string, deviceID string, temperature string, location string, timestamp string);

from SensorStream[eventType=='temperature']
select deviceID, temperature, timestamp  
insert into TemperatureStream;

from SensorStream[eventType=='location']
select deviceID, location, timestamp  
insert into LocationStream;
  

Как видно из приведенного выше примера, свойство исходной карты ‘fail.on.missing.attribute’ используется для обеспечения того, чтобы различные форматы могли быть сопоставлены с одним потоком вместе с пользовательским отображением. После того, как события прибыли в конечную точку, поток затем разделяется на основе значения атрибута с использованием фильтров.