#wso2 #siddhi #stream-processing #wso2sp #event-stream-processing
#wso2 #siddhi #потоковая обработка #wso2-streaming-integrator #обработка потока событий
Вопрос:
Я пытаюсь использовать один поток, обрабатывать входящий формат json и записывать в разные потоки на основе атрибута в событии. Например, если входной поток состоит из чего-то подобного:
{ "event_type" : "temperature",
"json" : {
"type": "Temperature",
"DeviceID":"xyz",
"temperature": "32",
"timestamp" : "2019-03-19T12:37:43.356119Z"
}
}
Другое событие выглядит следующим образом:
{ "event_type" : "location",
"json" : {
"type": "GPS",
"DeviceID":"xyz",
"location": {"coordinates": [-73.856077, 40.848447]},
"timestamp" : "2019-09-22T00:00:00 05:30"
}
}
Оба события передаются в одну конечную точку http (это ограничение, с которым я сталкиваюсь)
Как я могу использовать единый поток http-источника, обработать эти события и, если они event_type
есть temperature
, вставить в temperature_collection
в mongo db, и если они event_type
есть location
, вставить в location_collection в mongo db?
-
Возможно ли сделать это с помощью одного потока?
-
Если нет, то как я могу избежать записи нескольких конечных точек, по одной для каждого типа события?
Ответ №1:
Да, можно определить только один поток и маршрутизировать каждый поток с помощью фильтров Siddhi,
@source(type='http' , receiver.url='http://localhost:8000/SensorStream',
@map(type='json', fail.on.missing.attribute='false',
@attributes(eventType='$.event_type', type='$.json.type',deviceID='$.json.DeviceID', temperature='$.json.temperature', location='$.json.location', timestamp='$.json.timestamp' ) ) )
define stream SensorStream(eventType string, type string, deviceID string, temperature string, location string, timestamp string);
from SensorStream[eventType=='temperature']
select deviceID, temperature, timestamp
insert into TemperatureStream;
from SensorStream[eventType=='location']
select deviceID, location, timestamp
insert into LocationStream;
Как видно из приведенного выше примера, свойство исходной карты ‘fail.on.missing.attribute’ используется для обеспечения того, чтобы различные форматы могли быть сопоставлены с одним потоком вместе с пользовательским отображением. После того, как события прибыли в конечную точку, поток затем разделяется на основе значения атрибута с использованием фильтров.