#python #apache-spark #pyspark #databricks #parquet
Вопрос:
У меня есть s3, настроенный с клеем в качестве озера данных. В озере данных у меня есть таблица (паркетные файлы), расположенная внутри определенного пути. Ежедневно я читаю информацию из одного источника (также s3), добавляю столбцы разделов (и никаких других преобразований) и записываю ее следующим образом:
# 2 years of data. One day of data ~5.5 GB.
data.write.format('parquet')
.mode('overwrite')
.option('path', 's3_path')
.option('mergeSchema', 'true')
.partitionBy(*partition_by)
.saveAsTable('schema.table')
Ведьма, дай мне следующую древовидную структуру:
{s3_path}/table
|-dt=2021-08-04
|-field=a
|-field=b
|-dt=2021-07-03
|-field=a
|-field=b
[..]
Иногда мой процесс умирает со следующей ошибкой:
При вызове o305 произошла ошибка.saveAsTable. : Исключение org.apache.spark.SparkException: Задание прервано. в org.apache.spark.sql.выполнение.источники данных.Файл formatwriter$.запись(файл formatwriter.scala:230) в org.apache.spark.sql.выполнение.источники данных.Команда insertinhadoopfsrelationcommand.выполнить(команда insertohadoopfsrelationcommand.scala:187) в org.apache.spark.sql.выполнение.источники данных.Источник данных.writeAndRead(источник данных.scala:575) в команде org.apache.spark.sql.выполнение..CreateDataSourceTableAsSelectCommand.Сохраненный файл(createDataSourceTables.scala:218) в организации.apache.spark.sql.выполнение.команды.CreateDataSourceTableAsSelectCommand.выполнить(createDataSourceTables.scala:176) в org.apache.spark.sql.выполнение.команда.Запись данных commandexec.sideffectresult$lzycompute(команды.scala:116) в org.apache.spark.sql.выполнение.команда.Запись данных командыexec.sideэффектрезультат(команды.scala:114) в org.apache.spark.sql.выполнение.команды.Запись данных.Выполнение команды. выполнение(команды.scala:139) в org.apache.spark.sql.выполнение.SparkPlan.$anonfun$выполнить$1(SparkPlan.scala:200) в org.apache.spark.sql.выполнение.SparkPlan.$anonfun$ExecuteQuery$3(SparkPlan.scala:252) в организации.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) в организации. apache.spark.sql.выполнение.SparkPlan.ExecuteQuery(SparkPlan.scala:248) в org.apache.spark.sql.выполнение.SparkPlan.выполнить(SparkPlan.scala:192) в org.apache.spark.sql.выполнение.Выполнение запроса.toRdd$lzycompute(QueryExecution.scala:158) в org.apache.spark.sql.выполнение.Выполнение запроса.toRdd(QueryExecution.scala:157) в org.apache.spark.sql.DataFrameWriter.$anonfun$Команда выполнения$1(DataFrameWriter.scala:999) в org.apache.spark.sql.выполнение.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:116) в org.apache.spark.sql.выполнение.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:249) в org.apache.spark.sql.выполнение.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:101) в сеансе org.apache.spark.sql.SparkSession.С помощью withActive(SparkSession.scala:845) в org.apache.spark.sql.выполнение.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77) в org.apache.spark.sql.выполнение.SQLExecution$.withNewExecutionId(SQLExecution.scala:199) в организации.apache.spark.sql.DataFrameWriter.runcomand(DataFrameWriter.scala:999) в организации apache.spark.sql.DataFrameWriter.CreateTable(DataFrameWriter.scala:763) в организации apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:741) в организации apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:625) на sun.reflect.NativeMethodAccessorImpl.invoke0(Собственный метод) на sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) на sun.reflect.Делегирование methodaccessorimpl.invoke(делегирование methodaccessorimpl.java:43) в java.lang.reflect.Метод.вызов(Метод.java:498) в py4j.отражение.Методинвокер.вызов(методинвокер.java:244) в py4j.отражение.ReflectionEngine.вызовите(ReflectionEngine.java:380) в py4j.Gateway.invoke(Gateway.java:295) в командах py4j.AbstractCommand.Вызов метода(AbstractCommand.java:132) в py4j.команды.Вызов команды.выполнить(CallCommand.java:79) в py4j.GatewayConnection.run(GatewayConnection.java:251) в java.lang.Thread.run(Поток. java:748) Вызван: организацией.исключение apache.spark.SparkException: Задание прервано из-за сбоя этапа: Задача 90 на этапе 2.0 завершилась неудачно 4 раза, последний сбой: Потерянная задача 90.3 на этапе 2.0 (TID 400, 10.172.61.198, исполнитель 23): Ошибка ExecutorLostFailure (исполнитель 23 вышел из-за одной из запущенных задач) Причина: Удаленный клиент RPC отключен. Вероятно, из-за контейнеров, превышающих пороговые значения, или проблем с сетью. Проверьте журналы драйверов на наличие предупреждающих сообщений. Трассировка стека драйверов:
I think my cluster is big enough to manage that transformation:
num_workers: 8
spark_version: 7.3.x-cpu-ml-scala2.12
node_type_id: i3.2xlarge
driver_node_type_id: i3.2xlarge
Поэтому я думаю, что одна из проблем может заключаться в том, как я в первую очередь читаю данные. Насколько я знаю, это несколько эквивалентных способов (может быть, другие лучше?)
[1] spark.read.parquet('{s3_path}/table').filter('dt={date}')
[2] spark.read.parquet('{s3_path}/table/dt={date}/')
[3] spark.sql('select * from schema.table where dt={date}')
Когда я попробовал план объяснения, каждый запрос давал мне один и тот же физический план.
Так что же могло произойти?