#mongodb #aggregation-framework
#mongodb #платформа агрегации
Вопрос:
У меня есть документы, хранящие данные Интернета вещей.
Следуя рекомендациям по разработке схемы MongoDB для IoT, я пришел к документам со следующей структурой:
"_id" : "AQ106_2020-09-12T09",
"date" : "2020-09-12T09:00:00.000Z",
"station" : {
"name" : "AQ106",
"loc" : {
"type" : "Point",
"coordinates" : [
14.339263,
40.814224
]
},
"properties" : {
}
},
"samples" : [
{
"t" : ISODate("2020-09-12T11:02:00.000 02:00"),
"data" : {
"pm1_mg_m3" : 2.7,
"pm2_5_mg_m3" : 4.6,
"pm10_mg_m3" : 12,
"P0" : 152,
"P1" : 16,
"P2" : 4.7,
"P3" : 0.8,
"P4" : 0.86,
"P5" : 0.6,
"P6" : 0.28,
"P7" : 0.152,
"P8" : 0.094,
"P9" : 0.092,
"P10" : 0.019,
"P11" : 0,
"P12" : 0,
"P13" : 0.0188,
"P14" : 0,
"P15" : 0,
"P16" : 0,
"P17" : 0,
"P18" : 0,
"P19" : 0,
"P20" : 0,
"P21" : 0,
"P22" : 0,
"P23" : 0,
"temp_celsius" : 32.59,
"humRelPercent" : 34,
"press_mBar" : 1010.79,
"CO2mA" : 4,
"NO2_WE_mV" : 226.419,
"NO2_AE_mV" : 229.553,
"OX_WE_mV" : 252.287,
"OX_AE_mV" : 220.419,
"CO_WE_mV" : 509.077,
"AE_WE_mV" : 348.51,
"batt_V" : 13.5,
"source_V" : 17.6
}
},
.... additional arrays
}
Теперь я хочу вычислить средние значения за час или за день (или другой показатель), чтобы заполнить новую коллекцию только обобщенными данными.
Я закодировал следующее решение для почасовых средств:
db.collection.aggregate([{$match: {
'station.name':'AQ104'
}}, {$unwind: {
path: "$samples"
}}, {$group: {
_id: "$date",
P0: {
$avg : "$samples.data.P0"
},
temp:{
$avg:"$samples.data.temp_celsius"
}
}}])
Это работает, но мне нужно вручную создать поле для каждого свойства в samples.data
в исходном документе, а это утомительно.
Кроме того, как группировать как по дате, так и station.name ?
Вы можете найти рабочий пример здесь.
Спасибо.
Ответ №1:
Давайте начнем с простого вопроса: как группировать по нескольким полям? С простым изменением синтаксиса:
{
$group: {
_id: {
date: "$date",
station: "$station.name"
}
}
Теперь для второго вопроса это будет немного более утомительно. Mongo не поддерживает «слияние» объектов по их ключам с пользовательской логикой (в данном случае $avg
). Итак, нам придется преобразовать объект в массив. размотайте его, вычислите среднее значение для каждого поля и, в конечном итоге, сгруппируйте, чтобы восстановить требуемую структуру следующим образом:
db.collection.aggregate([
{
$match: {
"station.name": "AQ106"
}
},
{
$unwind: {
path: "$samples"
}
},
{
$addFields: {
objArr: {
"$objectToArray": "$samples.data"
}
}
},
{
$unwind: "$objArr"
},
{
$group: {
_id: {
date: "$date",
station: "$station.name",
objKey: "$objArr.k"
},
value: {
$avg: "$objArr.v"
}
}
},
{
$addFields: {
data: {
"$arrayToObject": [
[
{
k: "$_id.objKey",
v: "$value"
}
]
]
}
}
},
{
$group: {
_id: {
date: "$_id.date",
station: "$_id.station"
},
data: {
"$mergeObjects": "$data"
}
}
},
{
$replaceRoot: {
newRoot: {
"$mergeObjects": [
"$data",
"$_id"
]
}
}
}
])
——- РЕДАКТИРОВАТЬ ———
Для Mongo версии 4.4 вы можете использовать $ accumulator, который позволяет вам выполнять пользовательский код javascript в вашем конвейере. Я не уверен, насколько это будет сопоставимо с собственным конвейером Mongo с точки зрения производительности в масштабе.
Следует отметить, что я добавил начальный $addFields
этап, исходя из предположения, что у разных samples
могут быть разные ключи. если это не так, это не нужно.
db.collection.aggregate([
{
$addFields: {
sampleKeys: {
$reduce: {
input: {
$map: {
input: "$samples",
as: "sample",
in: {
$map: {
input: {
"$objectToArray": "$$sample.data"
},
as: "sampleArrItem",
in: "$$sampleArrItem.k"
}
}
}
},
initialValue: [],
in: {
"$setUnion": [
"$$this",
"$$value"
]
}
}
}
}
},
{
$addFields: {
samples: {
$accumulator: {
init: function(keys){
return keys.map(k => {return {k: {v: 0, c: 0}}});
},
initArgs: ["$sampleKeys"],
accumulateArgs: ["$samples"],
accumulate: function(state, sample){
Object.keys(state).forEach((key) => {
if (key in sample.data) {
state[key].v = sample.data[key];
state[key].c ;
};
});
return state;
},
merge: function(state1, state2){
Object.keys(state1).forEach((key) => {
state1[key].v = state2[key].v;
state1[key].c = state2[key].c;
});
return state1;
},
lang: "js"
}
}
}
},
{
$replaceRoot: {
newRoot: {
$mergeObject: [
"$samples",
{station: "$station.name", date: "$date"},
]
}
}
}
])
Комментарии:
1. Спасибо. Я решил, как показано ниже. Интересно, можно ли упростить общий запрос с помощью
mapReduce
или$function
.2. На самом деле вы не можете использовать
$function
для достижения, и я чувствую, что использованиеmapReduce
в этом случае является излишним, однако, если вы используете версию 4.4, вы можете использовать$accumulator
. Обратите внимание, что в текущем коде предполагается, что полеsamples
существует, даже если это пустой массив. если это не так, вам нужно добавить проверку вinit
функцию для этого.
Ответ №2:
Я частично решил свой вопрос с точки зрения группировки по нескольким полям (документация MongoDB в этом отношении, на мой взгляд, была не такой ясной)
db.collection.aggregate([
{
$unwind: {
path: "$samples"
}
},
{
$group: {
_id: {
date: "$date",
station: "$station.name"
},
P0: {
$avg: "$samples.data.P0"
},
temp: {
$avg: "$samples.data.temp_celsius"
}
}
}
])
Вот обновленный рабочий пример.
Благодаря Тому Слабберту я решил свой вопрос с помощью следующего запроса:
db.collection.aggregate([
{
$unwind: {
path: "$samples"
}
},
{
$addFields: {
objArr: {
"$objectToArray": "$samples.data"
}
}
},
{
$unwind: "$objArr"
},
{
$group: {
_id: {
date: "$date",
station: "$station",
objKey: "$objArr.k"
},
value: {
$avg: "$objArr.v"
}
}
},
{
$addFields: {
data: {
"$arrayToObject": [
[
{
k: "$_id.objKey",
v: "$value"
}
]
]
}
}
},
{
$group: {
_id: {
date: "$_id.date",
station: "$_id.station"
},
data: {
"$mergeObjects": "$data"
}
}
},
{
"$project": {
_id: "$_id.date",
station: "$_id.station",
data: 1
}
}
])
Интересно, можно ли упростить вышеупомянутое решение, используя новый $function
оператор. Спасибо.