#apache-spark #pyspark #parquet #fixed-width
Вопрос:
Я планировал преобразовать фиксированную ширину в паркет в AWS Glue, мои данные содержат около 1600 столбцов и около 3000 строк. Похоже, когда я пытаюсь написать фрейм данных spark (в паркете), у меня возникает проблема «стекового потока».
Проблема видна даже тогда, когда я считаю (), показываю() и т. Д. Я попытался вызвать cache(),repartition (), но все равно вижу эту ошибку.
Код работает, если я уменьшу количество столбцов до 500.
Пожалуйста, помогите
ниже приведен мой код
data_df = spark.read.text(input_path) schema_df = pd.read_json(schema_path) df = data_df for r in schema_df.itertuples(): df = df.withColumn( str(r.name), df.value.substr(int(r.start), int(r.length)) ) df = df.drop("value") df.write.mode("overwrite").option("compression", "gzip").parquet(output_path) # FAILING HERE
Трассировка стека ниже.
gt; 2021-11-10 05:00:13,542 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last): File "/tmp/conv_fw_2_pq.py", line 148, in lt;modulegt; partition_ts=parsed_args.partition_timestamp, File "/tmp/conv_fw_2_pq.py", line 125, in process_file df.write.mode("overwrite").option("compression", "gzip").parquet(output_path) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 839, in parquet self._jwrite.parquet(path) File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) **py4j.protocol.Py4JJavaError: An error occurred while calling o7066.parquet. : java.lang.StackOverflowError** at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:355) at org.apache.spark.sql.catalyst.expressions.Expression.references(Expression.scala:88) at org.apache.spark.sql.catalyst.expressions.Expression$anonfun$references$1.apply(Expression.scala:88) at org.apache.spark.sql.catalyst.expressions.Expression$anonfun$references$1.apply(Expression.scala:88) at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:355) at org.apache.spark.sql.catalyst.expressions.Expression.references(Expression.scala:88) at org.apache.spark.sql.catalyst.plans.QueryPlan$anonfun$references$1.apply(QueryPlan.scala:45) at org.apache.spark.sql.catalyst.plans.QueryPlan$anonfun$references$1.apply(QueryPlan.scala:45) at scala.collection.immutable.Stream$anonfun$flatMap$1.apply(Stream.scala:497) at scala.collection.immutable.Stream$anonfun$flatMap$1.apply(Stream.scala:497)
Ответ №1:
Официальная документация Spark содержит следующее описание: Этот метод( withColumn
) представляет внутреннюю проекцию. Поэтому, вызывая его несколько раз, например, через циклы, чтобы добавить несколько столбцов, можно создать большие планы, которые могут вызвать проблемы с производительностью и даже исключение StackOverflowException. Чтобы избежать этого, используйте **select()**
сразу несколько столбцов.
Рекомендуется сначала создать список выбора, а затем использовать метод выбора для создания нового фрейма данных.