#regex #logging #logstash
#регулярное выражение #ведение журнала #logstash
Вопрос:
В настоящее время я работаю с журналами, часть содержимого которых выглядит следующим образом:
00:19:59.771 (07120/evtThread ) TRC> Cem< [Core1] CALL_STATE...
00:20:00.199 (05768/BCMApplicationThread) INF>
#S#|Call stats, ongoing calls: 8, handled_calls: 7304
#S# ---------------------------- ---------- ---------- ---------- ---------- ----------
#S#|Peer | From| To| MinTime| MaxTime| AvgTime|
#S# ---------------------------- ---------- ---------- ---------- ---------- ----------
#S#| CallDispatcher:Core2| 0| 0| 0| 0| 0|
#S#| CallDispatcher:Core3| 0| 0| 0| 0| 0|
#S#| Cem:Core1| 1632| 6207| 0| 5996522| 311685|
Я проанализировал строку, содержащую время, следующим образом:
grok {
match => [ "message", "%{TIME:time} (?<bcm_comp>(d{5}/w{4,}:* *w*)) (?<loglevel>w{3}>{1}) %{GREEDYDATA:message}" ]
overwrite => [ "message" ]
add_field => [ "BCM_System", "PROD" ]
}
Строки, содержащие #S # в начале, были проанализированы следующим образом. Не обращайте внимания на строки, содержащие ——— и строки, содержащие заголовок таблицы и строку статистики вызовов.
grok {
match => [ "message", "(?<start>#S#|)s* (?<peer>w*:w*)(?<div2>|)s* %{NUMBER:From}(?<div3>|)s* %{NUMBER:To}(?<div4>|)s* %{NUMBER:MinTime}(?<div5>|)s* %{NUMBER:MaxTime}(?<div6>|)s* %{NUMBER:AvgTime}(?<div7>|)" ]
remove_field => [ "start", "div2", "div3", "div4", "div5", "div6", "div7" ]
overwrite => [ "message"]
add_field => [ "reference_time", "%{@time}"]
}
То, что я пытаюсь сделать, это взять время из предыдущей строки и добавить его в качестве поля для того, где я скопировал строки #s #. Я пытаюсь использовать синтаксис add_field из logstash, как показано, но это не так work…it просто буквально выводит%{@time}.
Могу ли я каким-либо образом извлечь это время из предыдущей строки и поместить его в поле для другого события?
Ответ №1:
Насколько я знаю, вам нужно было бы написать плагин фильтра, чтобы сделать что-то подобное. Вот простой плагин, который я собрал, чтобы сделать что-то подобное — он запоминает поле, когда оно его видит, а затем использует последнее значение, которое он видел, если его нет.
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "set"
#
# This filter will look for a field from an event and record the last value
# of it. If it's not present, it will add the last value to the event
#
# The config looks like this:
#
# filter {
# memorize {
# field => "time"
# default => "00:00:00.000"
# }
# }
#
# The `field` is the name of the field that you want to memorize
# The `default` is the value to use for the field if you haven't seen it yet
# in the file (this is optional)
class LogStash::Filters::Memorize < LogStash::Filters::Base
config_name "memorize"
milestone 1
# The field to memorize
config :field, :validate => :string, :required => true
# the default value to use for the field if it's not seen before we need it
config :default, :validate => :string, :required => false
# The stream identity is how the multiline filter determines which stream an
# event belongs to. See the multiline plugin if you want more details on how
# this might work
config :stream_identity , :validate => :string, :default => "%{host}.%{path}.%{type}"
public
def initialize(config = {})
super
@threadsafe = false
# This filter needs to keep state.
@memorized = Hash.new
end # def initialize
public
def register
# nothing needed
end # def register
public
def filter(event)
return unless filter?(event)
if event[@field].nil?
val = @memorized[@stream_identity]
if val.nil?
val = @default
end
event[@field] = val
filter_matched(event)
else
@memorized[@stream_identity] = event[@field]
end
end
end
Комментарии:
1. Я не знаю, что вы закодировали, его сгенерированный по умолчанию плагин. И не работает для меня
2. Я переписал это как плагин logstash для logstash 5… См github.com/alcanzar/logstash-filter-memorize