Модификация события Flume

#flume

#поток

Вопрос:

Как я могу добавить временную метку в начало каждого события потока? допустим, у меня есть этот журнал, но я также хочу добавить временную метку

127.0.0.1 — фрэнк [10 / Октябрь/2000:13:55:36 -0700] » ПОЛУЧИТЬ /apache_pb.gif HTTP/1.0″ 200 2326 2016-05-95 127.0.0.1 — фрэнк [10/ Октябрь/2000:13:55:36 -0700] » ПОЛУЧИТЬ /apache_pb.gif HTTP/1.0″ 200 2326

Ответ №1:

Предполагая, что вы хотите добавить временную метку в начало байтов тела события, тогда нет предварительно настроенного перехватчика для выполнения того, что вы просите (перехватчик поиска и замены не может заменить динамическое значение, такое как дата).

Однако запись перехватчика для префикса каждого тела события с текущей меткой времени должна быть достаточно тривиальной.

Проверьте исходный код на наличие существующих перехватчиков в качестве примера, скомпилируйте свой код и разверните jar в новую папку с именем $FLUME_HOME/plugins.d/my-plugins/lib/my-plugin.jar

Вот ссылка на текущий источник для стандартных перехватчиков потока:

Но, по сути, вам просто нужно создать класс, который реализует Interceptor интерфейс, и также создать Interceptor.Builder реализацию.

Комментарии:

1. Я вижу, спасибо Крису за комментарий, насколько я знаю, есть два способа сделать это, тот, который вы указали выше, написав мой собственный класс, который реализует Interceptor, а другой способ — использовать morphline.

2. Я создал TimeStampClass и Builder и поместил jar в этот каталог /usr/lib/flume-ng/lib/ . Но он не находит ava.lang.ClassNotFoundException: org.apache.flume.interceptor.TimestampBodyInterceptor$Builder

3. Можете ли вы опубликовать список содержимого вашего jar? unzip -l myjar.jar

Ответ №2:

Вы можете использовать перехватчики потока: https://flume.apache.org/FlumeUserGuide.html#flume-interceptors чтобы изменить события.

Вы можете написать свои собственные классы, которые обеспечивают функциональность, которую вы ищете, или вы также можете использовать перехватчики Morphline.

Если изменение тела не требуется, вы можете просто использовать перехватчик временных меток, чтобы добавить временную метку в заголовки.

Комментарии:

1. Привет, bessbd, спасибо за ответ. Я видел это, и я попытался с этим, но это работает только с добавлением метки времени в заголовок, а не содержимого каждой строки файла. Мне нужно добавить временную метку к каждому событию, но похоже, что у самого flume нет функции для этого, мне либо нужно написать мой morpline conf моего класса interceptor.

2. Привет, Фарслан, в зависимости от вашего варианта использования может быть достаточно иметь эти данные в заголовках: например, если вы используете приемник hdfs, вы можете добавить его в путь, в который вы записываете, и когда вы выполняете обработку, вы используете дополнительные данные, которые у вас есть в имени файла.

3. Я создал TimeStampClass и Builder и поместил jar в этот каталог /usr/lib/flume-ng/lib/ . Но он не находит ava.lang.ClassNotFoundException: org.apache.flume.interceptor.TimestampBodyInterceptor$Builde‌​r , что я чего-то не хватает?

4. Это мой conf, где я вызываю класс, но не смог вставить код, говорящий слишком большой `agent1.sources.webserver-log-source.interceptors.i4.type = org.apache. поток.перехватчик. TimestampBodyInterceptor$Builder #agent1.sources.webserver-log-source.interceptors.i4.separator = ,

5. public static class Builder implements Interceptor.Builder{ private String seperator; public void configure(Context ctx) { this.seperator = ctx.getString(Constants.SEPERATOR); } public Interceptor build() { return new EventTimestampAdder(this.seperator); } }

Ответ №3:

Я решил этот вопрос с помощью Morphline, добавив эти функции ниже, где временная метка добавляется к началу каждого события потока. {
addCurrentTime { field : timestamp }
},
{
setValues {
_result = "@{timestamp}|@{message}"
}
},