Как сгенерировать событие logstash в пользовательском плагине фильтра java после тайм-аута?

#java #elasticsearch #events #logstash #pipeline

Вопрос:

Я хочу сопоставить сообщения, отправляемые в logstash в фильтре, и отправить их в следующий фильтр logstash внутри конвейера.

Однако я успешно настроил пользовательский плагин фильтра логов в соответствии с документацией elastic.

Мой код плагина фильтра выглядит следующим образом:

 package org.logstashplugins;

import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.FilterMatchListener;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.PluginConfigSpec;
import org.apache.commons.lang3.StringUtils;

import java.util.Collection;
import java.util.Collections;
import java.util.*;

// class name must match plugin name
@LogstashPlugin(name = "java_filter_example")
public class JavaFilterExample implements Filter {

    public static final PluginConfigSpec<String> SOURCE_CONFIG =
            PluginConfigSpec.stringSetting("source", "message");

    private String id;
    private String sourceField;
    private List<String> buffer;
    

    public JavaFilterExample(String id, Configuration config, Context context)
    {
        // constructors should validate configuration options
        this.id = id;
        this.sourceField = config.get(SOURCE_CONFIG);
        this.buffer = new ArrayList<String>();
    }

    @Override
    public Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener)
    {
        for (Event e : events)
        {
            Object f = e.getField(sourceField);
            if (f instanceof String)
            {
                buffer.add((String) f);
                matchListener.filterMatched(e);
            }
        }
        return events;
    }

    @Override
    public Collection<PluginConfigSpec<?>> configSchema()
    {
        // should return a list of all configuration options for this plugin
        return Collections.singletonList(SOURCE_CONFIG);
    }

    @Override
    public String getId() 
    {
        return this.id;
    }
}
 

Теперь я хочу добавить поток, который выполняется в фоновом режиме, который проходит через буфер, содержащий сообщения, и отправляет соответствующее событие после истечения времени ожидания или если приходит новое событие, содержащее сообщение «готово».

Теперь мой вопрос в том, как я могу запустить новое событие и отправить его следующим фильтрам в конвейере logstash? Что такое java API или класс для этого?

Я знаю, что есть плагин Aggregate Filter, который частично отвечает моим требованиям, но мне нужно реализовать различные парсеры для сообщений, поэтому лучше реализовать собственный плагин самостоятельно. Но в соответствии с документацией неясно, как отправить новое событие, не возвращая коллекцию измененных событий внутри метода фильтра. Можете ли вы помочь мне достичь этого?

Мне нужно что-то вроде этого (в псевдокоде):

 public void anotherFunctionInsideAnotherJavaClass(...)
{
    Logstash.sendEvent(new Event(...));
}
 

Мне нужно сгенерировать и отправить событие в другом контексте, чем внутри метода фильтра. Возможно ли это сделать?

Надеюсь, вам ясно, в чем моя проблема.

Спасибо.