Проблемы с кластером spark при чтении/записи паркета

#python #apache-spark #pyspark #databricks #parquet

Вопрос:

У меня есть s3, настроенный с клеем в качестве озера данных. В озере данных у меня есть таблица (паркетные файлы), расположенная внутри определенного пути. Ежедневно я читаю информацию из одного источника (также s3), добавляю столбцы разделов (и никаких других преобразований) и записываю ее следующим образом:

 # 2 years of data. One day of data ~5.5 GB.
data.write.format('parquet')
        .mode('overwrite')
        .option('path', 's3_path')
        .option('mergeSchema', 'true')
        .partitionBy(*partition_by)
        .saveAsTable('schema.table')
 

Ведьма, дай мне следующую древовидную структуру:

 {s3_path}/table
     |-dt=2021-08-04
       |-field=a
       |-field=b
     |-dt=2021-07-03
       |-field=a
       |-field=b
    [..]
 

Иногда мой процесс умирает со следующей ошибкой:

При вызове o305 произошла ошибка.saveAsTable. : Исключение org.apache.spark.SparkException: Задание прервано. в org.apache.spark.sql.выполнение.источники данных.Файл formatwriter$.запись(файл formatwriter.scala:230) в org.apache.spark.sql.выполнение.источники данных.Команда insertinhadoopfsrelationcommand.выполнить(команда insertohadoopfsrelationcommand.scala:187) в org.apache.spark.sql.выполнение.источники данных.Источник данных.writeAndRead(источник данных.scala:575) в команде org.apache.spark.sql.выполнение..CreateDataSourceTableAsSelectCommand.Сохраненный файл(createDataSourceTables.scala:218) в организации.apache.spark.sql.выполнение.команды.CreateDataSourceTableAsSelectCommand.выполнить(createDataSourceTables.scala:176) в org.apache.spark.sql.выполнение.команда.Запись данных commandexec.sideffectresult$lzycompute(команды.scala:116) в org.apache.spark.sql.выполнение.команда.Запись данных командыexec.sideэффектрезультат(команды.scala:114) в org.apache.spark.sql.выполнение.команды.Запись данных.Выполнение команды. выполнение(команды.scala:139) в org.apache.spark.sql.выполнение.SparkPlan.$anonfun$выполнить$1(SparkPlan.scala:200) в org.apache.spark.sql.выполнение.SparkPlan.$anonfun$ExecuteQuery$3(SparkPlan.scala:252) в организации.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) в организации. apache.spark.sql.выполнение.SparkPlan.ExecuteQuery(SparkPlan.scala:248) в org.apache.spark.sql.выполнение.SparkPlan.выполнить(SparkPlan.scala:192) в org.apache.spark.sql.выполнение.Выполнение запроса.toRdd$lzycompute(QueryExecution.scala:158) в org.apache.spark.sql.выполнение.Выполнение запроса.toRdd(QueryExecution.scala:157) в org.apache.spark.sql.DataFrameWriter.$anonfun$Команда выполнения$1(DataFrameWriter.scala:999) в org.apache.spark.sql.выполнение.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116) в org.apache.spark.sql.выполнение.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249) в org.apache.spark.sql.выполнение.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101) в сеансе org.apache.spark.sql.SparkSession.С помощью withActive(SparkSession.scala:845) в org.apache.spark.sql.выполнение.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77) в org.apache.spark.sql.выполнение.SQLExecution$.withNewExecutionId(SQLExecution.scala:199) в организации.apache.spark.sql.DataFrameWriter.runcomand(DataFrameWriter.scala:999) в организации apache.spark.sql.DataFrameWriter.CreateTable(DataFrameWriter.scala:763) в организации apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:741) в организации apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:625) на sun.reflect.NativeMethodAccessorImpl.invoke0(Собственный метод) на sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) на sun.reflect.Делегирование methodaccessorimpl.invoke(делегирование methodaccessorimpl.java:43) в java.lang.reflect.Метод.вызов(Метод.java:498) в py4j.отражение.Методинвокер.вызов(методинвокер.java:244) в py4j.отражение.ReflectionEngine.вызовите(ReflectionEngine.java:380) в py4j.Gateway.invoke(Gateway.java:295) в командах py4j.AbstractCommand.Вызов метода(AbstractCommand.java:132) в py4j.команды.Вызов команды.выполнить(CallCommand.java:79) в py4j.GatewayConnection.run(GatewayConnection.java:251) в java.lang.Thread.run(Поток. java:748) Вызван: организацией.исключение apache.spark.SparkException: Задание прервано из-за сбоя этапа: Задача 90 на этапе 2.0 завершилась неудачно 4 раза, последний сбой: Потерянная задача 90.3 на этапе 2.0 (TID 400, 10.172.61.198, исполнитель 23): Ошибка ExecutorLostFailure (исполнитель 23 вышел из-за одной из запущенных задач) Причина: Удаленный клиент RPC отключен. Вероятно, из-за контейнеров, превышающих пороговые значения, или проблем с сетью. Проверьте журналы драйверов на наличие предупреждающих сообщений. Трассировка стека драйверов:

I think my cluster is big enough to manage that transformation:

     num_workers: 8
    spark_version: 7.3.x-cpu-ml-scala2.12
    node_type_id: i3.2xlarge
    driver_node_type_id: i3.2xlarge
 

Поэтому я думаю, что одна из проблем может заключаться в том, как я в первую очередь читаю данные. Насколько я знаю, это несколько эквивалентных способов (может быть, другие лучше?)

 [1] spark.read.parquet('{s3_path}/table').filter('dt={date}')
[2] spark.read.parquet('{s3_path}/table/dt={date}/')
[3] spark.sql('select * from schema.table where dt={date}')

 

Когда я попробовал план объяснения, каждый запрос давал мне один и тот же физический план.

Так что же могло произойти?