#java #jdbc #logstash #ojdbc
Вопрос:
Java OutOfMemory
Во время начальной загрузки мы получаем огромный набор данных из ~70 миллионов записей с помощью Logstash и jdbc в Elastic. Размер кучи Java составляет 4 ГБ, но даже с 8 или 16 ГБ ситуация та же.
Тот же самый набор данных может быть успешно загружен без use_prepared_statements
(с использованием литералов).
Мы успешно загрузили все представление один раз use_prepared_statements=>false
, но чтобы оно было более эффективным даже для дельта-нагрузок, мы хотели бы использовать подготовленные операторы. Наша идея состоит в том jdbc_fetch_size=>1000
, чтобы ограничить объем данных в пределах одной выборки. Согласно документации, использование jdbc_page_size
или jdbc_paging_enabled
не действует при использовании prepared_statements. Это было бы даже неэффективно из-за сложного представления за сценой, которое было бы неэффективным при выполнении нескольких запросов, каждый раз использующих смещения для начальной загрузки.
Использование подготовленных операторов хорошо работает в Oracle, и мы видим, что выборка составила около 1 миллиона записей из ~70 миллионов, выполненных при сбое Logstash с ошибкой OutOfMemory.
Наша текущая конфигурация logstash:
input {
jdbc {
type => "sim_list"
jdbc_validate_connection => true
jdbc_driver_library => "/pkg/moip/otc/apps/ls-otc-tmsp_tua2/conf/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@db-server:1521:TA2A"
jdbc_user => "test"
jdbc_password => "testpwd"
tracking_column => "last_import"
use_column_value => true
tracking_column_type => "timestamp"
prepared_statement_bind_values => [":sql_last_value"]
prepared_statement_name => "get_sim_list_tua2_v5"
use_prepared_statements => true
last_run_metadata_path => "/pkg/moip/otc/data/ls-otc-tmsp_tua2/jdbc/sql_last_run_sim_list_tua2_v5"
statement => "SELECT simlist.*, simlist.LAST_UPDATE as last_import
FROM (SELECT *
FROM V_SIM_LIST_VIEW
WHERE last_update between (?) - 1/86400 and sysdate
ORDER BY last_update ASC) simlist"
# run every 5 seconds
schedule => "*/5 * * * * *"
connection_retry_attempts => "3"
jdbc_fetch_size => 1000
}
}
filter {
mutate {
copy => {
"%{di_id}" => "[@metadata][_id]"
}
split => { "user_ids" => "," }
split => { "option_ids" => "|" }
split => {"apn_ids" => "|"}
split => { "ip_address" => "|" }
}
}
output {
if [type] == "sim_list" {
elasticsearch {
hosts => ["https://ece.mydomain.com:443"]
index => "sim_list_tua2_v5"
document_id => "%{di_id}"
#doc_as_upsert => true
manage_template => false
cacert => "/pkg/moip/otc/apps/ls-otc-tmsp_tua2/conf/certs/ca.crt"
ssl => true
ssl_certificate_verification => true
user => "logstash_write"
password => "${LSWRITEPWD}"
}
}
}
Идея состоит в том, чтобы использовать один большой запрос для начальной загрузки, который следует разделить на jdbc_fetch_size
.
Следующие расписания будут использоваться :sql_last_value
для получения постепенных изменений.
Однако вскоре после первого раунда он прерывается примерно на 1000 попыток.
В настоящее время, похоже, jdbc_fetch_size
это работает не так, как ожидалось. Поэтому мы понятия не имеем, почему он все время терпит неудачу.
Мы протестировали с ojdbc6.jar и odbc10.jar против Oracle 19c, но никакой разницы. Logstash работает с Java 11.0.10.
Ответ №1:
Возможно, попробуйте использовать fetch,limit и offset в запросе, используемом подготовленным оператором. Например, мы выполняем пакетирование на уровне SQL, а не на уровне JDBC. Отправьте смещение с каждой партией, которую вы хотите загрузить. Я не эксперт по Logstash, но это должно быть настраиваемым.
Комментарии:
1. Спасибо, но для этого потребуется включить подкачку, которая запускает один и тот же запрос несколько раз с разным смещением и ограничениями. Как упоминалось в вопросе, запрос синхронизации является сложным и может привести к очень высокой избыточной нагрузке на БД. Мы оптимизировали запрос, чтобы выполнить дельта-загрузку для небольшого количества изменений в течение приемлемого времени.
2. Возможно, напишите прокси-сервер REST API в Spring Boot, который будет получать данные так, как вы хотите, и попросит elastic просто запросить это. Это должно быть легко, так как вам нужна только одна конечная точка и некоторая стандартная библиотека для запроса базы данных. Или тебе нужно найти кого-то, кто очень хорошо разбирается в эластике. Нет
Ответ №2:
Мы нашли следующий хороший рабочий обходной путь:
Использование logstash с двумя разными конфигурациями (в любом случае требуется):
- для начальной синхронизации: Использование литералов
use_prepared_statements=>false
- для Дельта-Синхронизации: Использование
use_prepared_statements=>true
Кроме того, поскольку нам все равно приходится использовать две разные конфигурации logstash, мы использовали подсказки оптимизатора для SQL начальной синхронизации и без этих подсказок для SQL Дельта-синхронизации. Таким образом, у нас есть оптимальная конфигурация для обоих сценариев.