ошибка записи в pyspark с ошибкой StackOverflowError

#apache-spark #pyspark #parquet #fixed-width

Вопрос:

Я планировал преобразовать фиксированную ширину в паркет в AWS Glue, мои данные содержат около 1600 столбцов и около 3000 строк. Похоже, когда я пытаюсь написать фрейм данных spark (в паркете), у меня возникает проблема «стекового потока».
Проблема видна даже тогда, когда я считаю (), показываю() и т. Д. Я попытался вызвать cache(),repartition (), но все равно вижу эту ошибку.

Код работает, если я уменьшу количество столбцов до 500.

Пожалуйста, помогите

ниже приведен мой код

 data_df = spark.read.text(input_path)    schema_df = pd.read_json(schema_path)  df = data_df   for r in schema_df.itertuples():  df = df.withColumn(  str(r.name), df.value.substr(int(r.start), int(r.length))  )  df = df.drop("value")   df.write.mode("overwrite").option("compression", "gzip").parquet(output_path) # FAILING HERE  

Трассировка стека ниже.

 gt;  2021-11-10 05:00:13,542 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):  File "/tmp/conv_fw_2_pq.py", line 148, in lt;modulegt;  partition_ts=parsed_args.partition_timestamp,  File "/tmp/conv_fw_2_pq.py", line 125, in process_file  df.write.mode("overwrite").option("compression", "gzip").parquet(output_path)  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 839, in parquet  self._jwrite.parquet(path)  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__  answer, self.gateway_client, self.target_id, self.name)  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco  return f(*a, **kw)  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value  format(target_id, ".", name), value) **py4j.protocol.Py4JJavaError: An error occurred while calling o7066.parquet. : java.lang.StackOverflowError**  at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241)  at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241)  at scala.collection.immutable.List.foreach(List.scala:392)  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)  at scala.collection.immutable.List.flatMap(List.scala:355)  at org.apache.spark.sql.catalyst.expressions.Expression.references(Expression.scala:88)  at org.apache.spark.sql.catalyst.expressions.Expression$anonfun$references$1.apply(Expression.scala:88)  at org.apache.spark.sql.catalyst.expressions.Expression$anonfun$references$1.apply(Expression.scala:88)  at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241)  at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241)  at scala.collection.immutable.List.foreach(List.scala:392)  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)  at scala.collection.immutable.List.flatMap(List.scala:355)  at org.apache.spark.sql.catalyst.expressions.Expression.references(Expression.scala:88)  at org.apache.spark.sql.catalyst.plans.QueryPlan$anonfun$references$1.apply(QueryPlan.scala:45)  at org.apache.spark.sql.catalyst.plans.QueryPlan$anonfun$references$1.apply(QueryPlan.scala:45)  at scala.collection.immutable.Stream$anonfun$flatMap$1.apply(Stream.scala:497)  at scala.collection.immutable.Stream$anonfun$flatMap$1.apply(Stream.scala:497)  

Ответ №1:

Официальная документация Spark содержит следующее описание: Этот метод( withColumn ) представляет внутреннюю проекцию. Поэтому, вызывая его несколько раз, например, через циклы, чтобы добавить несколько столбцов, можно создать большие планы, которые могут вызвать проблемы с производительностью и даже исключение StackOverflowException. Чтобы избежать этого, используйте **select()** сразу несколько столбцов.

Рекомендуется сначала создать список выбора, а затем использовать метод выбора для создания нового фрейма данных.