Фильтр Logstash grok — динамически присваивает полям имена

#logstash

#logstash

Вопрос:

У меня есть строки журнала в следующем формате, и я хочу извлечь поля:

 [field1: content1] [field2: content2] [field3: content3] ...
  

Я не знаю ни имен полей, ни их количества.

Я попробовал это с обратными ссылками и форматом sprintf, но результатов не получил:

 match => [ "message", "(?:[(w ): %{DATA:k<-1>}]) " ] # not working
match => [ "message", "(?:[%{WORD:fieldname}: %{DATA:%{fieldname}}]) " ] # not working
  

Похоже, это работает только для одного поля, но не более:

 match => [ "message", "(?:[%{WORD:field}: %{DATA:content}] ?) " ]
add_field => { "%{field}" => "%{content}" }
  

Фильтр kv также не подходит, поскольку содержимое полей может содержать пробелы.

Существует ли какой-либо плагин / стратегия для решения этой проблемы?

Ответ №1:

Плагин Logstash Ruby может вам помочь. 🙂

Вот конфигурация:

 input {
    stdin {}
}

filter {
    ruby {
        code => "
            fieldArray = event['message'].split('] [')
            for field in fieldArray
                field = field.delete '['
                field = field.delete ']'
                result = field.split(': ')
                event[result[0]] = result[1]
            end
        "
    }
}

output {
    stdout {
        codec => rubydebug
    }
}
  

С вашими журналами:

 [field1: content1] [field2: content2] [field3: content3]
  

Это результат:

 {
   "message" => "[field1: content1] [field2: content2] [field3: content3]",
  "@version" => "1",
"@timestamp" => "2014-07-07T08:49:28.543Z",
      "host" => "abc",
    "field1" => "content1",
    "field2" => "content2",
    "field3" => "content3"
}
  

Я попробовал с 4 полями, это тоже работает.

Пожалуйста, обратите внимание, что event в коде ruby это событие logstash. Вы можете использовать его для получения всех ваших полей событий, таких как message, @timestamp и т.д.

Наслаждайтесь!!!

Ответ №2:

Я нашел другой способ, используя регулярное выражение:

 ruby {
    code => "
        fields = event['message'].scan(/(?<=[)w : .*?(?=](?: |$))/)
        for field in fields
            field = field.split(': ')
            event[field[0]] = field[1]
        end
    "
}
  

Ответ №3:

Я знаю, что это старый пост, но я наткнулся на него только сегодня, поэтому я подумал, что предложу альтернативный метод. Пожалуйста, обратите внимание, что, как правило, я бы почти всегда использовал фильтр ruby, как предложено в любом из двух предыдущих ответов. Однако я подумал, что мог бы предложить это в качестве альтернативы.

Если имеется фиксированное количество полей или максимальное количество полей (т. Е. может быть меньше трех полей, но никогда не будет больше трех полей), это также можно сделать с помощью комбинации фильтров grok и mutate .

 # Test message is: `[fieldname: value]`
# Store values in [@metadata] so we don't have to explicitly delete them.
grok {
    match => {
        "[message]" => [
            "[%{DATA:[@metadata][_field_name_01]}:s %{DATA:[@metadata][_field_value_01]}]( [%{DATA:[@metadata][_field_name_02]}:s %{DATA:[@metadata][_field_value_02]}])?( [%{DATA:[@metadata][_field_name_03]}:s %{DATA:[@metadata][_field_value_03]}])?"
        ]
    }
}

# Rename the fieldname, value combinations. I.e., if the following data is in the message:
#
#     [foo: bar]
#
# It will be saved in the elasticsearch output as:
#
#    {"foo":"bar"}
#
mutate {
    rename => {
        "[@metadata][_field_value_01]" => "[%{[@metadata][_field_name_01]}]"
        "[@metadata][_field_value_02]" => "[%{[@metadata][_field_name_02]}]"
        "[@metadata][_field_value_03]" => "[%{[@metadata][_field_name_03]}]"
    }
    tag_on_failure => []
}
  

Для тех, кто, возможно, не так хорошо знаком с регулярными выражениями, записи в ()? являются необязательными совпадениями регулярных выражений, что означает, что если совпадений нет, выражение не завершится ошибкой. tag_on_failure => [] Опция в mutate фильтре гарантирует, что к tags одному из переименований не будет добавлена ошибка, если произойдет сбой из-за отсутствия данных для захвата, и, как следствие, нет поля для переименования.