Запись в журнал Java выходит из памяти только тогда, когда use_prepared_statements=>true в jdbc при синхронизации исходного огромного набора данных

#java #jdbc #logstash #ojdbc

Вопрос:

Java OutOfMemory Во время начальной загрузки мы получаем огромный набор данных из ~70 миллионов записей с помощью Logstash и jdbc в Elastic. Размер кучи Java составляет 4 ГБ, но даже с 8 или 16 ГБ ситуация та же.

Тот же самый набор данных может быть успешно загружен без use_prepared_statements (с использованием литералов).

Мы успешно загрузили все представление один раз use_prepared_statements=>false , но чтобы оно было более эффективным даже для дельта-нагрузок, мы хотели бы использовать подготовленные операторы. Наша идея состоит в том jdbc_fetch_size=>1000 , чтобы ограничить объем данных в пределах одной выборки. Согласно документации, использование jdbc_page_size или jdbc_paging_enabled не действует при использовании prepared_statements. Это было бы даже неэффективно из-за сложного представления за сценой, которое было бы неэффективным при выполнении нескольких запросов, каждый раз использующих смещения для начальной загрузки.

Использование подготовленных операторов хорошо работает в Oracle, и мы видим, что выборка составила около 1 миллиона записей из ~70 миллионов, выполненных при сбое Logstash с ошибкой OutOfMemory.

Наша текущая конфигурация logstash:

 input {
    jdbc {
        type => "sim_list"
        jdbc_validate_connection => true
        jdbc_driver_library => "/pkg/moip/otc/apps/ls-otc-tmsp_tua2/conf/ojdbc6.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@db-server:1521:TA2A"
        jdbc_user => "test"
        jdbc_password => "testpwd"
        tracking_column => "last_import"
        use_column_value => true
        tracking_column_type => "timestamp"
        prepared_statement_bind_values => [":sql_last_value"]
        prepared_statement_name => "get_sim_list_tua2_v5"
        use_prepared_statements => true
        last_run_metadata_path => "/pkg/moip/otc/data/ls-otc-tmsp_tua2/jdbc/sql_last_run_sim_list_tua2_v5"
        statement => "SELECT simlist.*, simlist.LAST_UPDATE as last_import
FROM (SELECT *
      FROM V_SIM_LIST_VIEW
        WHERE last_update between (?) - 1/86400 and sysdate
          ORDER BY last_update ASC) simlist"
        # run every 5 seconds
        schedule => "*/5 * * * * *"
        connection_retry_attempts => "3"
        jdbc_fetch_size => 1000
    }
}

filter {
    mutate {
        copy => {
            "%{di_id}" => "[@metadata][_id]"
        }
        split => { "user_ids" => "," }
        split => { "option_ids" => "|" }
        split => {"apn_ids" => "|"}
        split => { "ip_address" => "|" }
    }
}

output {
    if [type] == "sim_list" {
        elasticsearch {
            hosts => ["https://ece.mydomain.com:443"]
            index => "sim_list_tua2_v5"
            document_id => "%{di_id}"
            #doc_as_upsert => true
            manage_template => false
            cacert => "/pkg/moip/otc/apps/ls-otc-tmsp_tua2/conf/certs/ca.crt"
            ssl => true
            ssl_certificate_verification => true
            user => "logstash_write"
            password => "${LSWRITEPWD}"
        }
    }
}
 

Идея состоит в том, чтобы использовать один большой запрос для начальной загрузки, который следует разделить на jdbc_fetch_size .
Следующие расписания будут использоваться :sql_last_value для получения постепенных изменений.
Однако вскоре после первого раунда он прерывается примерно на 1000 попыток.

В настоящее время, похоже, jdbc_fetch_size это работает не так, как ожидалось. Поэтому мы понятия не имеем, почему он все время терпит неудачу.

Мы протестировали с ojdbc6.jar и odbc10.jar против Oracle 19c, но никакой разницы. Logstash работает с Java 11.0.10.

Ответ №1:

Возможно, попробуйте использовать fetch,limit и offset в запросе, используемом подготовленным оператором. Например, мы выполняем пакетирование на уровне SQL, а не на уровне JDBC. Отправьте смещение с каждой партией, которую вы хотите загрузить. Я не эксперт по Logstash, но это должно быть настраиваемым.

Комментарии:

1. Спасибо, но для этого потребуется включить подкачку, которая запускает один и тот же запрос несколько раз с разным смещением и ограничениями. Как упоминалось в вопросе, запрос синхронизации является сложным и может привести к очень высокой избыточной нагрузке на БД. Мы оптимизировали запрос, чтобы выполнить дельта-загрузку для небольшого количества изменений в течение приемлемого времени.

2. Возможно, напишите прокси-сервер REST API в Spring Boot, который будет получать данные так, как вы хотите, и попросит elastic просто запросить это. Это должно быть легко, так как вам нужна только одна конечная точка и некоторая стандартная библиотека для запроса базы данных. Или тебе нужно найти кого-то, кто очень хорошо разбирается в эластике. Нет

Ответ №2:

Мы нашли следующий хороший рабочий обходной путь:

Использование logstash с двумя разными конфигурациями (в любом случае требуется):

  1. для начальной синхронизации: Использование литералов use_prepared_statements=>false
  2. для Дельта-Синхронизации: Использование use_prepared_statements=>true

Кроме того, поскольку нам все равно приходится использовать две разные конфигурации logstash, мы использовали подсказки оптимизатора для SQL начальной синхронизации и без этих подсказок для SQL Дельта-синхронизации. Таким образом, у нас есть оптимальная конфигурация для обоих сценариев.