#elasticsearch #logstash #logstash-grok
#elasticsearch #logstash #logstash-grok
Вопрос:
Я тестировал конвейер Logstash для обработки многострочного журнала аудита Oracle, который имеет следующий формат:
Audit file /u01/app/oracle/admin/DEVINST/adump/DEVINST_ora_15460_20201001230100743853143795.aud
Oracle Database 12c Standard Edition Release 12.2.0.1.0 - 64bit DEV
Build label: RDBMS_12.2.0.1.0_LINUX.X64_170125
ORACLE_HOME: /u01/app/oracle/product/12.2.0/dbhome_1
System name: Linux
Node name: testdevserver
Release: 3.10.0-862.14.4.el7.x86_64
Version: #1 SMP Fri Sep 21 09:07:21 UTC 2018
Machine: x86_64
Instance name: DEVINST
Redo thread mounted by this instance: 1
Oracle process number: 57
Unix process pid: 15460, image: oracle@testdevserver (TNS V1-V3)
Thu Oct 1 23:01:00 2020 00:00
LENGTH : '275'
ACTION :[7] 'CONNECT'
DATABASE USER:[1] '/'
PRIVILEGE :[6] 'SYSDBA'
CLIENT USER:[9] 'test_user'
CLIENT TERMINAL:[5] 'pts/0'
STATUS:[1] '0'
DBID:[10] '1762369616'
SESSIONID:[10] '4294967295'
USERHOST:[21] 'testdevserver'
CLIENT ADDRESS:[0] ''
ACTION NUMBER:[3] '100'
Thu Oct 1 23:01:00 2020 00:00
LENGTH : '296'
ACTION :[29] 'SELECT STATUS FROM V$INSTANCE'
DATABASE USER:[1] '/'
PRIVILEGE :[6] 'SYSDBA'
CLIENT USER:[9] 'test_user'
CLIENT TERMINAL:[5] 'pts/0'
STATUS:[1] '0'
DBID:[10] '1762369616'
SESSIONID:[10] '4294967295'
USERHOST:[21] 'testdevserver'
CLIENT ADDRESS:[0] ''
ACTION NUMBER:[1] '3'
Мой /etc/logstash/conf.d/25-filter.conf
:
filter {
grok {
match => { "message" => "(?<day>^[A-Za-z]{3}) (?<month>[A-Za-z]{3}) (?<monthday>[0-9]{2}) (?<hour>[0-9]{2}):(?<min>[0-9]{2}):(?<sec>[0-9]{2}) (?<year>[0-9]{4}) %{GREEDYDATA:message}" }
overwrite => [ "message" ]
add_tag => [ "oracle_audit" ]
}
grok {
match => { "ACTION :[[0-9]*] '(?<ora_audit_action>.*)'.*DATABASE USER:[[0-9]*] '(?<ora_audit_dbuser>.*)'.*PRIVILEGE :[[0-9]*] '(?<ora_audit_priv>.*)'.*CLIENT USER:[[0-9]*] '(?<ora_audit_osuser>.*)'.*CLIENT TERMINAL:[[0-9]*] '(?<ora_audit_term>.*)'.*STATUS:[[0-9]*] '(?<ora_audit_status>.*)'.*DBID:[[0-9]*] '(?<ora_audit_dbid>.*)'.*SESSIONID:[[0-9]*] '(?<ora_audit_sessionid>.*)'.*USERHOST:[[0-9]*] '(?<ora_audit_dbhost>.*)'.*CLIENT ADDRESS:[[0-9]*] '(?<ora_audit_clientaddr>.*)'.*ACTION NUMBER:[[0-9]*] '(?<ora_audit_actionnum>.*)'" }
}
grok {
match => { "source" => [ ".*/[a-zA-Z0-9_#$]*_[a-z0-9]*_(?<ora_audit_derived_pid>[0-9]*)_[0-9]*.aud" ] }
}
mutate {
add_field => { "timestamp" => "%{year}-%{day}-%{monthday} %{hour}:%{min}:%{sec}" }
}
date {
locale => "en"
match => [ "timestamp", "YYYY-MMM-dd HH:mm:ss" ]
}
mutate {
remove_field => [ "timestamp", "year", "day", "monthday", "hour", "min", "sec" ]
}
}
Мой /etc/logstash/conf.d/000-file-in.conf
файл :
input {
file {
path => [ "/tmp/testora" ]
start_position => "beginning"
codec => multiline
{
pattern => "^[A-Za-z]{3} [A-Za-z]{3} [0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} [0-9]{4}"
negate => "true"
what => "previous"
}
}
}
Затем я протестировал это, запустив :
/usr/share/logstash/bin/logstash -r -f /etc/logstash/conf.d/
:
....
[INFO ] 2020-10-05 11:16:30.656 [Converge PipelineAction::Reload<main>] reload - Reloading pipeline {"pipeline.id"=>:main}
[INFO ] 2020-10-05 11:16:30.662 [Converge PipelineAction::Reload<main>] observingtail - QUIT - closing all files and shutting down.
{
"ora_audit_dbid" => "1762369616",
"ora_audit_actionnum" => "3",
"ora_audit_sessionid" => "4294967295",
"@version" => "1",
"tags" => [
[0] "multiline",
[1] "_grokparsefailure",
[2] "_dateparsefailure"
],
"path" => "/tmp/testora",
"ora_audit_action" => [
[0] "275'nACTION :[7] 'CONNECT'nDATABASE USER:[1] '/'nPRIVILEGE :[6] 'SYSDBA'nCLIENT USER:[9] 'test_user'nCLIENT TERMINAL:[5] 'pts/0'nSTATUS:[1] '0'nDBID:[10] '1762369616'nSESSIONID:[10] '4294967295'nUSERHOST:[21] 'testdevserver'nCLIENT ADDRESS:[0] ''nACTION NUMBER:[3] '100'nThu Oct 1 23:01:00 2020 00:00nLENGTH : '296",
[1] "SELECT STATUS FROM V$INSTANCE"
],
"@timestamp" => 2020-10-05T01:16:30.889Z,
"ora_audit_priv" => "SYSDBA",
"message" => "Audit file /u01/app/oracle/admin/DEVINST/adump/DEVINST_ora_15460_20201001230100743853143795.audnOracle Database 12c Standard Edition Release 12.2.0.1.0 - 64bit DEVnBuild label: RDBMS_12.2.0.1.0_LINUX.X64_170125nORACLE_HOME: /u01/app/oracle/product/12.2.0/dbhome_1nSystem name: LinuxnNode name: testdevservernRelease: 3.10.0-862.14.4.el7.x86_64nVersion: #1 SMP Fri Sep 21 09:07:21 UTC 2018nMachine: x86_64nInstance name: DEVINSTnRedo thread mounted by this instance: 1nOracle process number: 57nUnix process pid: 15460, image: oracle@testdevserver (TNS V1-V3)nThu Oct 1 23:01:00 2020 00:00nLENGTH : '275'nACTION :[7] 'CONNECT'nDATABASE USER:[1] '/'nPRIVILEGE :[6] 'SYSDBA'nCLIENT USER:[9] 'test_user'nCLIENT TERMINAL:[5] 'pts/0'nSTATUS:[1] '0'nDBID:[10] '1762369616'nSESSIONID:[10] '4294967295'nUSERHOST:[21] 'testdevserver'nCLIENT ADDRESS:[0] ''nACTION NUMBER:[3] '100'nThu Oct 1 23:01:00 2020 00:00nLENGTH : '296'nACTION :[29] 'SELECT STATUS FROM V$INSTANCE'nDATABASE USER:[1] '/'nPRIVILEGE :[6] 'SYSDBA'nCLIENT USER:[9] 'test_user'nCLIENT TERMINAL:[5] 'pts/0'nSTATUS:[1] '0'nDBID:[10] '1762369616'nSESSIONID:[10] '4294967295'nUSERHOST:[21] 'testdevserver'nCLIENT ADDRESS:[0] ''nACTION NUMBER:[1] '3'",
"ora_audit_dbhost" => "testdevserver",
"host" => "myhost",
"ora_audit_dbuser" => "/",
"ora_audit_osuser" => "test_user",
"ora_audit_term" => "pts/0",
"ora_audit_status" => "0"
}
К сожалению, это не то, чего я ожидал. Каким-то образом он не разбивает сообщения на части и не анализирует сообщения правильно. Я ожидал чего-то вроде :
"ora_audit_action" => "CONNECT",
"ora_audit_dbuser" => "/",
"ora_audit_dbid" => "1762369616",
"ora_audit_status" => "0",
"ora_audit_osuser" => "test_user",
"ora_audit_priv" => "SYSDBA",
"ora_audit_term" => "pts/0",
"ora_audit_sessionid" => "4294967295",
"ora_audit_dbhost" => "testdevserver",
"ora_audit_clientaddr" => "",
"ora_audit_actionnum" => "3",
"host" => "myhost",
"@version" => "1",
"@timestamp" => 2020-10-05T01:16:30.889Z,
"message" => "Thu Oct 1 23:01:00 2020 00:00nLENGTH : '275'nACTION :[7] 'CONNECT'nDATABASE USER:[1] '/'nPRIVILEGE :[6] 'SYSDBA'nCLIENT USER:[9] 'test_user'nCLIENT TERMINAL:[5] 'pts/0'nSTATUS:[1] '0'nDBID:[10] '1762369616'nSESSIONID:[10] '4294967295'nUSERHOST:[21] 'testdevserver'nCLIENT ADDRESS:[0] ''nACTION NUMBER:[3] '100'nThu Oct 1 23:01:00 2020 00:00nLENGTH : '296'nACTION :[29] 'SELECT STATUS FROM V$INSTANCE'nDATABASE USER:[1] '/'nPRIVILEGE :[6] 'SYSDBA'nCLIENT USER:[9] 'test_user'nCLIENT TERMINAL:[5] 'pts/0'nSTATUS:[1] '0'nDBID:[10] '1762369616'nSESSIONID:[10] '4294967295'nUSERHOST:[21] 'testdevserver'nCLIENT ADDRESS:[0] ''nACTION NUMBER:[1] '3'"
И я вижу некоторые из этих сведений, однако я также ожидаю, что он выдаст 2 «фрагмента» сообщений (например, соответствующие шаблону в ^[A-Za-z]{3} [A-Za-z]{3} [0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} [0-9]{4}
качестве начала строки Thu Oct 1 23:01:00 2020 00:00
) — вместо этого похоже, что он видит только один? Я думаю, что здесь может быть проблема с моим сопоставлением с шаблоном, признателен, если кто-нибудь может предложить какие-либо подсказки.
Кроме того, я не уверен, что породило эти ошибки [1] "_grokparsefailure", [2] "_dateparsefailure"
— очевидно, что он неправильно анализирует вещи, но я просто не знаю, как это сделать.
Справка: (
Спасибо J
Ответ №1:
Я думаю, что приведенный ниже шаблон должен работать с вашим многострочным кодеком и давать требуемую группировку.
pattern => "^([A-Za-z]{3})(s*)([A-Za-z]{3})(s*)([0-9]{1,2})(s*)([0-9]{2}:[0-9]{2}:[0-9]{2} [0-9]{4})"
Кроме того, приведенный ниже фильтр grok должен дать вам желаемую разбивку многострочного. Возможно, вы захотите переименовать ключи по своему вкусу.
input {
file {
path => [ "/tmp/testora" ]
start_position => "beginning"
codec => multiline
{
pattern => "^([A-Za-z]{3})(s*)([A-Za-z]{3})(s*)([0-9]{1,2})(s*)([0-9]{2}:[0-9]{2}:[0-9]{2} [0-9]{4})"
negate => "true"
what => "previous"
}
}
}
filter {
grok {
match => { "message" => "(?<day>[A-Za-z]{3})%{SPACE}(?<month>[A-Za-z]{3})%{SPACE}(?<monthday>[0-9]{1,2})%{SPACE}(?<hour>[0-9]{1,2}):(?<min>[0-9]{1,2}):(?<sec>[0-9]{2})%{SPACE}(?<year>[0-9]{4})%{SPACE}%{GREEDYDATA:message}" }
overwrite => [ "message" ]
add_tag => [ "oracle_audit" ]
}
kv {
field_split => "n"
value_split_pattern => "s*:s*[[0-9]*]s*"
source => "message"
}
mutate {
add_field => { "timestamp" => "%{year}-%{month}-%{monthday} %{hour}:%{min}:%{sec}" }
}
date {
locale => "en"
match => [ "timestamp", "YYYY-MMM-dd HH:mm:ss" ]
}
mutate {
remove_field => [ "timestamp", "year", "day", "monthday", "month", "hour", "min", "sec" ]
}
}
output {
stdout {}
}
Пример вывода конвейера выглядит следующим образом
{
"PRIVILEGE" => "SYSDBA",
"DBID" => "1762369616",
"ACTION NUMBER" => "100",
"USERHOST" => "testdevserver",
"@version" => "1",
"path" => "/usr/share/logstash/stack/data/file.txt",
"SESSIONID" => "4294967295",
"CLIENT USER" => "test_user",
"CLIENT TERMINAL" => "pts/0",
"STATUS" => "0",
"host" => "e8f97b3e53ff",
"tags" => [
[0] "multiline",
[1] "oracle_audit"
],
"ACTION" => "CONNECT",
"DATABASE USER" => "/",
"@timestamp" => 2020-10-01T23:01:00.000Z,
"message" => " 00:00nLENGTH : '275'nACTION :[7] 'CONNECT'nDATABASE USER:[1] '/'nPRIVILEGE :[6] 'SYSDBA'nCLIENT USER:[9] 'test_user'nCLIENT TERMINAL:[5] 'pts/0'nSTATUS:[1] '0'nDBID:[10] '1762369616'nSESSIONID:[10] '4294967295'nUSERHOST:[21] 'testdevserver'nCLIENT ADDRESS:[0] ''nACTION NUMBER:[3] '100'"
}
Комментарии:
1. Большое спасибо, это работает как шарм! Мне нравится использование kv там! 🙂 Я многому учусь. Есть идеи, как я могу удалить
00:00
изmessage
? его приятно иметь, но не обязательно. Ваш ответ действительно помог мне, еще раз спасибо.2. Я удалил его таким образом
"message" => "(?<day>[A-Za-z]{3})%{SPACE}(?<month>[A-Za-z]{3})%{SPACE}(?<monthday>[0-9]{1,2})%{SPACE}(?<hour>[0-9]{1,2}):(?<min>[0-9]{1,2}):(?<sec>[0-9]{2})%{SPACE}(?<year>[0-9]{4})%{SPACE}(?<junk1>[ ][0-9]{2}):(?<junk2>[0-9]{2})%{SPACE}%{GREEDYDATA:message}"
, чтобыjunk1
иjunk2
не были включены. Есть ли лучший способ?
Ответ №2:
Найдите ниже шаблон Grok, который соответствует Thu Oct 1 23:01:00 2020 00:00
(?<timestamp>%{DAY} %{MONTH} %{MONTHDAY} %{TIME})
Скриншот вывода:
Комментарии:
1. Спасибо за это, сураватта. К сожалению, это не сработает
Thu Oct 23 23:01:00 2020 00:00
из-за этого дополнительного пробела. Используя этот шаблон, я могу достичь обоих:(?<timestamp>%{DAY:day} %{MONTH:month} *%{MONTHDAY:mday} %{TIME:time} %{YEAR:year})
однако ваш ответ очень полезен, спасибо.2. Добро пожаловать. Поскольку здесь есть одно дополнительное пространство
Thu Oct 1 23:01:00 2020 00:00
. Итак, я создал шаблон таким образом. В любом случае, рад, что это вам помогло. 🙂
Ответ №3:
в зависимости от ваших требований вам необходимо изменить шаблон grok во втором фильтре grok как для параметра LENGH, так и для параметра ACTION вы создаете одно и то же поле ora_audit_action, чтобы оно добавляло данные и формировало массив, лучше создать отдельное поле для обоих параметров
grok {
match => { "msg1" => "LENGTH : '(?<ora_audit_length>.*)'.ACTION :[[0-9]*] '(?<ora_audit_action>.*)'.*DATABASE USER:[[0-9]*] '(?<ora_audit_dbuser>.*)'.*PRIVILEGE :[[0-9]*] '(?<ora_audit_priv>.*)'.*CLIENT USER:[[0-9]*] '(?<ora_audit_osuser>.*)'.*CLIENT TERMINAL:[[0-9]*] '(?<ora_audit_term>.*)'.*STATUS:[[0-9]*] '(?<ora_audit_status>.*)'.*DBID:[[0-9]*] '(?<ora_audit_dbid>.*)'.*SESSIONID:[[0-9]*] '(?<ora_audit_sessionid>.*)'.*USERHOST:[[0-9]*] '(?<ora_audit_dbhost>.*)'.*CLIENT ADDRESS:[[0-9]*] '(?<ora_audit_clientaddr>.*)'.*ACTION NUMBER:[[0-9]*] '(?<ora_audit_actionnum>.*)'" }
}
вместо удаления полей вы можете внести необходимые поля в белый список с помощью prune filter.
prune {
whitelist_names => ["ora_audit_action", "ora_audit_dbuser", "ora_audit_dbid", "ora_audit_status", "ora_audit_osuser", "ora_audit_priv", "ora_audit_term", "ora_audit_sessionid", "ora_audit_dbhost", "ora_audit_clientaddr", "ora_audit_actionnum", "host", "@timestamp", "@version", "message"]
}
в вашем коде вы разделяете временную метку и изменяете ее, но в части фильтра вы удаляете ее и используете поле @timestamp по умолчанию, если у вас нет требований, вы можете удалить ненужный код.
Комментарии:
1. Спасибо за это, читреш! Да, я допустил опечатку
LENGTH
, я не должен использовать одно и то же поле дляACTION
indeed. Я исправил это, но я все еще получаю весь журнал в виде одного сообщения вместо 2 фрагментов сообщений. Я думаю, что шаблон^[A-Za-z]{3} [A-Za-z]{3} [0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} [0-9]{4}
не работает. Как я могу изменить это, чтобы оно соответствовалоThu Oct 1 23:01:00 2020 00:00
? Ценю вашу помощь 🙂