#java #elasticsearch #events #logstash #pipeline
Вопрос:
Я хочу сопоставить сообщения, отправляемые в logstash в фильтре, и отправить их в следующий фильтр logstash внутри конвейера.
Однако я успешно настроил пользовательский плагин фильтра логов в соответствии с документацией elastic.
Мой код плагина фильтра выглядит следующим образом:
package org.logstashplugins;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.FilterMatchListener;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.PluginConfigSpec;
import org.apache.commons.lang3.StringUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.*;
// class name must match plugin name
@LogstashPlugin(name = "java_filter_example")
public class JavaFilterExample implements Filter {
public static final PluginConfigSpec<String> SOURCE_CONFIG =
PluginConfigSpec.stringSetting("source", "message");
private String id;
private String sourceField;
private List<String> buffer;
public JavaFilterExample(String id, Configuration config, Context context)
{
// constructors should validate configuration options
this.id = id;
this.sourceField = config.get(SOURCE_CONFIG);
this.buffer = new ArrayList<String>();
}
@Override
public Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener)
{
for (Event e : events)
{
Object f = e.getField(sourceField);
if (f instanceof String)
{
buffer.add((String) f);
matchListener.filterMatched(e);
}
}
return events;
}
@Override
public Collection<PluginConfigSpec<?>> configSchema()
{
// should return a list of all configuration options for this plugin
return Collections.singletonList(SOURCE_CONFIG);
}
@Override
public String getId()
{
return this.id;
}
}
Теперь я хочу добавить поток, который выполняется в фоновом режиме, который проходит через буфер, содержащий сообщения, и отправляет соответствующее событие после истечения времени ожидания или если приходит новое событие, содержащее сообщение «готово».
Теперь мой вопрос в том, как я могу запустить новое событие и отправить его следующим фильтрам в конвейере logstash? Что такое java API или класс для этого?
Я знаю, что есть плагин Aggregate Filter, который частично отвечает моим требованиям, но мне нужно реализовать различные парсеры для сообщений, поэтому лучше реализовать собственный плагин самостоятельно. Но в соответствии с документацией неясно, как отправить новое событие, не возвращая коллекцию измененных событий внутри метода фильтра. Можете ли вы помочь мне достичь этого?
Мне нужно что-то вроде этого (в псевдокоде):
public void anotherFunctionInsideAnotherJavaClass(...)
{
Logstash.sendEvent(new Event(...));
}
Мне нужно сгенерировать и отправить событие в другом контексте, чем внутри метода фильтра. Возможно ли это сделать?
Надеюсь, вам ясно, в чем моя проблема.
Спасибо.