Тайник журнала не загружает точное количество записей в Elasticsearch и при каждом попадании результаты продолжают меняться

#elasticsearch #logstash #logstash-jdbc

#elasticsearch #logstash #logstash-jdbc

Вопрос:

Заявление о проблеме: Logstash не загружает все записи из базы данных в elasticsearch правильно, и каждый раз, когда я нажимаю на один и тот же api, получает разные результаты (однако иногда правильные, но меняются при каждом попадании и показывают только подмножество записей во вложенном поле приветствий). Механизм logstash выглядит спорадически, и результаты загрузки не согласуются, особенно в сценарии «Один ко многим» . http://localhost:9200/staffsalutation/_search Я наблюдаю странное поведение logstash logstash-7.8.0 при загрузке записей из 2 таблиц с запросом и конфигурацией, как показано ниже

Запрос:
выберите s.update_time, s.staff_id как staff_id, birth_date, first_name, last_name, gender, hire_date, st.title КАК title_nm, st.from_date КАК title_frm_dt, st.to_date КАК title_to_dt из staff s СЛЕВА ПРИСОЕДИНИТЬСЯ к приветствию st НА s.staff_id = st. заказ staff_id по s.update_time

     input {
        
        jdbc {

            jdbc_connection_string => "jdbc:postgresql://localhost:5432/postgres"
            jdbc_driver_library => "C:\Users\NS\.m2\repository\org\postgresql\postgresql\42.2.11\postgresql-42.2.11.jar"
            jdbc_user => "postgres"
            jdbc_password => "postgres"
            jdbc_driver_class => "org.postgresql.Driver"
            schedule => "* * * * *"     
            statement => "select  e.update_time, e.emp_no as staff_id, birth_date, first_name, last_name, gender, hire_date, t.title AS title_nm, t.from_date AS title_frm_dt, t.to_date AS title_to_dt 
            from employees e 
            LEFT JOIN titles t 
            ON e.emp_no  = t.emp_no  
            order by e.update_time"

            add_field => { "doctype" => "employee" }
            tracking_column_type => "timestamp"
            use_column_value =>true
            tracking_column => update_time
            
            jdbc_fetch_size => "50000"
        }

    }
    filter {
    aggregate {
            task_id => "%{staff_id}"
                code => "
                    map['staff_id'] = event.get('staff_id')
                    map['birth_date'] = event.get('birth_date')
                    map['first_name'] = event.get('first_name')
                    map['last_name'] = event.get('last_name')
                    map['gender'] = event.get('gender')
                    map['hire_date'] = event.get('hire_date')
                    map['salutations'] ||= []
                    map['salutations'] << {
                    'title_nm' => event.get('title_nm'),'title_frm_dt' => event.get('title_frm_dt'),
                    'title_to_dt' => event.get('title_to_dt')
                    }
                    event.cancel()
                "
            push_previous_map_as_event => true
            timeout => 30
            }
    }
    output {
        elasticsearch {
        document_id => "%{staff_id}"
        index => "staffsalutation"
        }
        file {
        path => "test.log"  
        codec => line
       }
    }
 

Ответ №1:

Найдено решение!

  1. Необходимо использовать предложение order by в запросе, чтобы записи сортировались по emp_no, а logstash мог выполнять поиск и агрегировать зависимые объекты, такие как заголовки (например, от одного ко многим).

выберите e.update_time, e.emp_no как staff_id, birth_date, first_name, last_name, gender, hire_date, т.заголовок КАК title_nm, т.from_date КАК title_frm_dt, т.to_date КАК title_to_dt из employees e СЛЕВА ПРИСОЕДИНИТЕ заголовки t К e.emp_no = т.emp_no порядок по e .эмп_но

  1. Поскольку здесь используется агрегация, для обработки записи должен быть один поток, иначе это вызовет проблемы с агрегацией (и именно здесь случайные результаты, которые вы получите при многократном вызове для поиска по индексу в соответствии с указанным выше URL). Хотя это выглядит как снижение производительности, поскольку только 1 рабочий поток будет обрабатывать записи, но это можно смягчить, вызвав несколько конфигурационных файлов logstash с разнородным набором записей, например, первые 100 emp_no в одном файле и 2-я сотня в другом, чтобы logstash мог выполнять их параллельно. итак, выполните, как показано ниже

logstash -f logstash_config.conf -w 1