Как динамически определять индекс elasticsearch с помощью logstash?

#elasticsearch #logstash #logstash-configuration #logstash-jdbc

#elasticsearch #logstash #logstash-конфигурация #logstash-jdbc

Вопрос:

Смотрите ниже конфигурационный файл logstash для извлечения записей из базы данных mysql в индекс elasticsearch с помощью плагина jdbc. Как это можно изменить, чтобы на основе company_id значения, найденного в базе данных, создавались отдельные индексы, например: company_%{company_id}_user_events .

Можно ли это сделать динамически или для этого требуется создание отдельных конфигурационных файлов logstash, предварительно настроенных и жестко запрограммированных для каждого идентификатора компании? Есть ли какая-то промежуточная позиция, например, скрипт или шаблон?

Если бы это помогло, company_id поле можно было бы добавить в ahoy_events таблицу в базе данных, вместо того, чтобы «добавлять» через ассоциацию пользователей, как это происходит сейчас.

Текущий logstash.conf

 input {
    jdbc {
        jdbc_driver_library => "/opt/mysql-connector-java-5.1.47-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://mysql:3306/${DB_NAME}"
        jdbc_user => "${DB_USER}"
        jdbc_password => "${DB_PASSWORD}"
        schedule => "* * * * *"
        statement => "select * from ahoy_events where time > :sql_last_value"
    }
}

filter {
    jdbc_streaming {
        jdbc_driver_library => "/opt/mysql-connector-java-5.1.47-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://mysql:3306/${DB_NAME}"
        jdbc_user => "${DB_USER}"
        jdbc_password => "${DB_PASSWORD}"
        statement => "select * from users where id = :user"
        parameters => { "user" => "user_id" }
        target => "user"
    }
    jdbc_streaming {
        jdbc_driver_library => "/opt/mysql-connector-java-5.1.47-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://mysql:3306/${DB_NAME}"
        jdbc_user => "${DB_USER}"
        jdbc_password => "${DB_PASSWORD}"
        statement => "select * from visits where id = :visits"
        parameters => { "visits" => "visit_id" }
        target => "visits"
    }
    mutate {
        add_field => { "company_id" =>  "%{[user][0][company_id]}"}
    }
    jdbc_streaming {
        jdbc_driver_library => "/opt/mysql-connector-java-5.1.47-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://mysql:3306/${DB_NAME}"
        jdbc_user => "${DB_USER}"
        jdbc_password => "${DB_PASSWORD}"
        statement => "select * from companies where id = :company_id"
        parameters => { "company_id" => "company_id" }
        target => "company"
    }
    json {
        source => "properties"
        target => "properties"
    }
    mutate {
        add_field => { "user_name" => "%{[user][0][name]}" }
    }
    mutate {
        add_field => { "company_name" => "%{[company][0][name]}" }
    }
    mutate {
        rename => { "[visits][0]" => "visit" }
    }
    mutate {
        remove_field => ["visits", "company", "user"]
    }
}

output {
    elasticsearch {
        hosts => ["http://elasticsearch:9200"]
        index => "user_events-%{ YYYY.MM.dd}"
        document_id => "%{id}"
    }
}
  

Желаемым результатом является индекс, который имеет пространство имен company_id:
company_%{company_id}_user_events
чтобы я мог добавить другие индексы позже по тому же шаблону
company_%{company_id}_other_records

Ответ №1:

Не уверен на 100%, но технически это должно быть так просто:

 output {
    elasticsearch {
        hosts => ["http://elasticsearch:9200"]
        index => "company_%{company_id}_events-%{ YYYY.MM.dd}"
        document_id => "%{id}"
    }
}
  

Комментарии:

1. Вы правы, похоже, у меня была другая проблема, которая препятствовала правильному выполнению, и я думал, что это просто сбой из-за значения данных.