Как распараллелить операцию загрузки Spark для Elasticsearch?

#apache-spark #dataframe #elasticsearch

#apache-spark #фрейм данных #elasticsearch

Вопрос:

У меня есть трехузловой кластер Spark (8 ядер и 16 ГБ оперативной памяти каждый) в автономном режиме. Я использую соединитель Elasticsearch-hadoop для чтения индекса ES. Индекс действительно огромен, содержит более 100 миллионов документов и имеет 5 сегментов и 2 репликации. Когда я создаю фрейм данных с помощью Spark, я хочу, чтобы эта операция загрузки была распараллелена, но я вижу, что она обрабатывается только в драйвере, а не в исполнителях. Загрузка только этой операции занимает более 8 часов. Как я могу оптимизировать это и позволить всем рабочим узлам загружать данные параллельно?

Я отправляю задание, используя 16 исполнителей, каждый с 1 ядром и 2 ГБ памяти, и драйвер с 4 ГБ памяти.

 df = sqlContext.read.format("org.elasticsearch.spark.sql").option("es.nodes", es_ip).load(es_index)
df.coalesce(16).write.option("compression","gzip").parquet(pq_filename)
  

После загрузки я конвертирую это в Parquet и сохраняю в HDFS.