#python #pandas #apache-spark #pyspark #parquet
Вопрос:
Я пытаюсь прочитать файлы паркета с помощью Pyspark из папки, в которой есть несколько файлов паркета, созданных Пандами. Вот как создавались паркеты
Вот как я создал паркет с использованием Панд
df= pd.DataFrame({'foo':[2,1], "bar": [1,2], 'for': [3,2]})
df.to_parquet('./Dir/filename.parquet')
spark = SparkSession(sc)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
dwh= spark.read.format('parquet').option('header', True).load('./Dir/')
Когда я это делаю, я получаю большую ошибку. Но когда я читаю то же самое от панд read_parquet()
, это работает. Кроме того, паркеты, созданные Pyspark, работают с пандами, но не наоборот. ниже приведена часть ошибки
21/09/23 16:59:46 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:375)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:450)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:496)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(ParquetFileFormat.scala:490)
at org.apache.spark.sql.execution.datasources.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$2(SchemaMergeUtils.scala:75)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Could not read footer for file: FileStatus{path=file:/home/ravi/Documents/outputParq/Hisar_v4pandas/2021-09-23 16:31:24 to 2021-09-23 16:32:24.parquet; isDirectory=false; length=28960; replication=0; blocksize=0; modification_time=0; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:463)
at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
Комментарии:
1. Как панды создали файлы для паркета, вы можете поделиться ими?
2. Можете ли вы попробовать загрузить конкретный файл напрямую, а не указывать на папку? например
.load(./Dir/my_file.parquet
? Кроме того, опция заголовка не применяется при чтении файлов паркетной доски.3. Привет @pltc спасибо за ваш ответ. Я отредактировал сообщение, пожалуйста, проверьте.
4. Спасибо @ScootCork, я тоже это пробовал. Но та же ошибка.
5. @RaviTejaPavuluri Я протестировал ваш образец паркета, и он работает просто отлично. Можете ли вы повторить попытку с вашим образцом паркета?