#logstash
#logstash
Вопрос:
У меня есть строки журнала в следующем формате, и я хочу извлечь поля:
[field1: content1] [field2: content2] [field3: content3] ...
Я не знаю ни имен полей, ни их количества.
Я попробовал это с обратными ссылками и форматом sprintf, но результатов не получил:
match => [ "message", "(?:[(w ): %{DATA:k<-1>}]) " ] # not working
match => [ "message", "(?:[%{WORD:fieldname}: %{DATA:%{fieldname}}]) " ] # not working
Похоже, это работает только для одного поля, но не более:
match => [ "message", "(?:[%{WORD:field}: %{DATA:content}] ?) " ]
add_field => { "%{field}" => "%{content}" }
Фильтр kv также не подходит, поскольку содержимое полей может содержать пробелы.
Существует ли какой-либо плагин / стратегия для решения этой проблемы?
Ответ №1:
Плагин Logstash Ruby может вам помочь. 🙂
Вот конфигурация:
input {
stdin {}
}
filter {
ruby {
code => "
fieldArray = event['message'].split('] [')
for field in fieldArray
field = field.delete '['
field = field.delete ']'
result = field.split(': ')
event[result[0]] = result[1]
end
"
}
}
output {
stdout {
codec => rubydebug
}
}
С вашими журналами:
[field1: content1] [field2: content2] [field3: content3]
Это результат:
{
"message" => "[field1: content1] [field2: content2] [field3: content3]",
"@version" => "1",
"@timestamp" => "2014-07-07T08:49:28.543Z",
"host" => "abc",
"field1" => "content1",
"field2" => "content2",
"field3" => "content3"
}
Я попробовал с 4 полями, это тоже работает.
Пожалуйста, обратите внимание, что event
в коде ruby это событие logstash. Вы можете использовать его для получения всех ваших полей событий, таких как message, @timestamp
и т.д.
Наслаждайтесь!!!
Ответ №2:
Я нашел другой способ, используя регулярное выражение:
ruby {
code => "
fields = event['message'].scan(/(?<=[)w : .*?(?=](?: |$))/)
for field in fields
field = field.split(': ')
event[field[0]] = field[1]
end
"
}
Ответ №3:
Я знаю, что это старый пост, но я наткнулся на него только сегодня, поэтому я подумал, что предложу альтернативный метод. Пожалуйста, обратите внимание, что, как правило, я бы почти всегда использовал фильтр ruby, как предложено в любом из двух предыдущих ответов. Однако я подумал, что мог бы предложить это в качестве альтернативы.
Если имеется фиксированное количество полей или максимальное количество полей (т. Е. может быть меньше трех полей, но никогда не будет больше трех полей), это также можно сделать с помощью комбинации фильтров grok
и mutate
.
# Test message is: `[fieldname: value]`
# Store values in [@metadata] so we don't have to explicitly delete them.
grok {
match => {
"[message]" => [
"[%{DATA:[@metadata][_field_name_01]}:s %{DATA:[@metadata][_field_value_01]}]( [%{DATA:[@metadata][_field_name_02]}:s %{DATA:[@metadata][_field_value_02]}])?( [%{DATA:[@metadata][_field_name_03]}:s %{DATA:[@metadata][_field_value_03]}])?"
]
}
}
# Rename the fieldname, value combinations. I.e., if the following data is in the message:
#
# [foo: bar]
#
# It will be saved in the elasticsearch output as:
#
# {"foo":"bar"}
#
mutate {
rename => {
"[@metadata][_field_value_01]" => "[%{[@metadata][_field_name_01]}]"
"[@metadata][_field_value_02]" => "[%{[@metadata][_field_name_02]}]"
"[@metadata][_field_value_03]" => "[%{[@metadata][_field_name_03]}]"
}
tag_on_failure => []
}
Для тех, кто, возможно, не так хорошо знаком с регулярными выражениями, записи в ()?
являются необязательными совпадениями регулярных выражений, что означает, что если совпадений нет, выражение не завершится ошибкой. tag_on_failure => []
Опция в mutate
фильтре гарантирует, что к tags
одному из переименований не будет добавлена ошибка, если произойдет сбой из-за отсутствия данных для захвата, и, как следствие, нет поля для переименования.