#elasticsearch #indexing
#elasticsearch #индексирование
Вопрос:
TL; DR: какой эквивалент Elasticsearch для этого запроса Postgres?
SELECT latest_pipeline_logs.* FROM (
SELECT pipeline_logs.*,
rank() OVER (
PARTITION BY pipeline_name
ORDER BY updated_at DESC
)
FROM pipeline_logs
) latest_pipeline_logs WHERE RANK = 1
У меня есть сотни конвейеров ETL с журналами, которые сбрасываются в Elasticsearch. Каждый из них выполняется независимо с разными интервалами. Я хотел бы получить простое состояние работоспособности для каждого из моих конвейеров ETL, используя агрегации Elasticsearch.
Каждый конвейер регистрирует свое состояние при выполнении. Мой текущий мыслительный процесс заключается в определении работоспособности каждого конвейера на основе двух наиболее важных возникающих состояний: succeeded
и failed
.
Я знаю, что могу выполнить запрос агрегации и сгруппировать по каждому конвейеру с подагрегацией статусов. Например, что-то вроде этого:
{
...
"aggs": {
"pipelines": {
"field": "pipeline_name"
},
"aggs": {
"states": {
"terms": {
"field": "pipeline_state"
}
}
}
}
}
Проблема с приведенным выше примером заключается в том, что я мог получить несколько состояний из-за набора данных временных рядов, таких как этот:
{
"key": "some-pipeline-name",
"buckets": [
{
"key": "succeeded",
"doc_count": 123
},
{
"key": "failed",
"doc_count": 567
}
]
}
Теоретически я мог бы отфильтровать результаты на основе даты выполнения конвейера, но поскольку некоторые конвейеры выполняются примерно раз в два месяца, я не думаю, что это вариант.
Конечным состоянием является создание простой панели мониторинга с использованием набора результатов Elasticsearch, который выглядит примерно так:
[
{
"key": "some-pipeline-name",
"latest-status": "succeeded"
},
{
"key": "some-other-pipeline",
"latest-status": "failed"
}
]
Следует отметить одну вещь: в этом случае исторические данные не важны. Панель мониторинга просто передаст последнее состояние для каждого конвейера.
Как бы вы добились этого с помощью Elasticsearch?
Ответ №1:
Если вас интересует только последний статус для каждого конвейера, вы могли бы использовать top_hits в качестве подагрегации, а затем сортировать по времени
{
"size": 0,
"aggs": {
"pipeline": {
"terms": {
"field": "pipeline_name",
"size": 1000
},
"aggs": {
"top_hits_status": {
"top_hits": {
"size": 1,
"sort": [
{
"timestamp": {
"order": "desc"
}
}
],
"_source": {
"includes": [
"pipeline_state"
]
}
}
}
}
}
}
}