Как определить состояние работоспособности ETL по журналам временных рядов с помощью Elasticsearch?

#elasticsearch #indexing

#elasticsearch #индексирование

Вопрос:

TL; DR: какой эквивалент Elasticsearch для этого запроса Postgres?

     SELECT latest_pipeline_logs.* FROM (
      SELECT pipeline_logs.*, 
      rank() OVER (
          PARTITION BY pipeline_name
          ORDER BY updated_at DESC
      )
      FROM pipeline_logs
    ) latest_pipeline_logs WHERE RANK = 1
  

У меня есть сотни конвейеров ETL с журналами, которые сбрасываются в Elasticsearch. Каждый из них выполняется независимо с разными интервалами. Я хотел бы получить простое состояние работоспособности для каждого из моих конвейеров ETL, используя агрегации Elasticsearch.

Каждый конвейер регистрирует свое состояние при выполнении. Мой текущий мыслительный процесс заключается в определении работоспособности каждого конвейера на основе двух наиболее важных возникающих состояний: succeeded и failed .

Я знаю, что могу выполнить запрос агрегации и сгруппировать по каждому конвейеру с подагрегацией статусов. Например, что-то вроде этого:

 {
  ...

  "aggs": {
    "pipelines": {
      "field": "pipeline_name"
    },
    "aggs": {
      "states": {
        "terms": {
          "field": "pipeline_state"
        }
      }
    }
  }
}

  

Проблема с приведенным выше примером заключается в том, что я мог получить несколько состояний из-за набора данных временных рядов, таких как этот:

 {
  "key": "some-pipeline-name",
  "buckets": [
    {
      "key": "succeeded",
      "doc_count": 123
    },
    {
      "key": "failed",
      "doc_count": 567
    }
  ]
}
  

Теоретически я мог бы отфильтровать результаты на основе даты выполнения конвейера, но поскольку некоторые конвейеры выполняются примерно раз в два месяца, я не думаю, что это вариант.

Конечным состоянием является создание простой панели мониторинга с использованием набора результатов Elasticsearch, который выглядит примерно так:

 [
  {
    "key": "some-pipeline-name",
    "latest-status": "succeeded"
  },
  {
    "key": "some-other-pipeline",
    "latest-status": "failed"
  }
]
  

Следует отметить одну вещь: в этом случае исторические данные не важны. Панель мониторинга просто передаст последнее состояние для каждого конвейера.

Как бы вы добились этого с помощью Elasticsearch?

Ответ №1:

Если вас интересует только последний статус для каждого конвейера, вы могли бы использовать top_hits в качестве подагрегации, а затем сортировать по времени

 {
  "size": 0,
  "aggs": {
    "pipeline": {
      "terms": {
        "field": "pipeline_name",
        "size": 1000
      },
      "aggs": {
        "top_hits_status": {
          "top_hits": {
            "size": 1,
            "sort": [
              {
                "timestamp": {
                  "order": "desc"
                }
              }
            ],
            "_source": {
              "includes": [
                "pipeline_state"
              ]
            }
          }
        }
      }
    }
  }
}