Pyspark org.apache.http.Исключение ConnectionClosedException: преждевременное завершение тела сообщения с разделителями по длине содержимого

#apache-spark #hadoop #amazon-s3 #pyspark

#apache-spark #hadoop #amazon-s3 #pyspark

Вопрос:

Я пытаюсь прочитать файлы JSON из подкаталога, вызываемого world из корзины S3 с именем hello . Когда я перечисляю все объекты этого каталога с помощью boto3, я вижу несколько файлов деталей (которые, возможно, были созданы с помощью задания spark), как показано ниже.

 world/
world/_SUCCESS
world/part-r-00000-....json
world/part-r-00001-....json
world/part-r-00002-....json
world/part-r-00003-....json
world/part-r-00004-....json
world/part-r-00005-....json
world/part-r-00006-....json
world/part-r-00007-....json
  

Я написал следующий код для чтения всех этих файлов.

 spark_session = SparkSession
            .builder
            .config(
            conf=SparkConf().setAll(spark_config).setAppName(app_name)
        ).getOrCreate()
hadoop_conf = spark_session._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.server-side-encryption-algorithm", "AES256")
hadoop_conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
hadoop_conf.set("fs.s3a.access.key", "my-aws-access-key")
hadoop_conf.set("fs.s3a.secret.key", "my-aws-secret-key")
hadoop_conf.set("com.amazonaws.services.s3a.enableV4", "true")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

df = spark_session.read.json("s3a://hello/world/")
  

и получение следующей ошибки

 20/10/18 01:13:01 ERROR TaskContextImpl: Error in TaskCompletionListener
org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 31439128; received: 11113005
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:198)
    at org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:101)
    at org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:166)
    at org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
    at org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:172)
    at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
    at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
    at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
    at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
    at com.amazonaws.services.s3.model.S3ObjectInputStream.abort(S3ObjectInputStream.java:90)
    at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:199)
    at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
    at org.apache.hadoop.util.LineReader.close(LineReader.java:150)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.close(LineRecordReader.java:231)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.close(RecordReaderIterator.scala:62)
    at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.close(HadoopFileLinesReader.scala:73)
    at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anonfun$readToUnsafeMem$1$$anonfun$apply$1$$anonfun$apply$2.apply(TextFileFormat.scala:123)
    at org.apache.spark.sql.execution.datasources.text.TextFileFormat$$anonfun$readToUnsafeMem$1$$anonfun$apply$1$$anonfun$apply$2.apply(TextFileFormat.scala:123)
    at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:131)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
    at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:117)
    at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:130)
    at org.apache.spark.TaskContextImpl$$anonfun$invokeListeners$1.apply(TaskContextImpl.scala:128)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:128)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:133)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
20/10/18 01:13:01 ERROR Utils: Uncaught exception in thread Executor task launch worker for task 9
java.lang.NullPointerException
    at org.apache.spark.scheduler.Task$$anonfun$run$1.apply$mcV$sp(Task.scala:144)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
    at org.apache.spark.scheduler.Task.run(Task.scala:142)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
20/10/18 01:13:01 INFO ShutdownHookManager: Deleting directory /private/var/folders/_c/gf1xl24d2y7f69vdqjthq4p40000gn/T/spark-2241418f-6797-4d06-85bb-6577b42d5d86/pyspark-58a27a10-8221-489a-b7a8-b04e29e8db60
  

Я также пробовал с "s3a://hello/world/*" и "s3a://hello/world/*.json" , но все равно получаю ту же ошибку.

К вашему сведению, я использую следующие версии инструментов:

 pyspark 2.4.5
com.amazonaws:aws-java-sdk:1.7.4
org.apache.hadoop:hadoop-aws:2.7.1
org.apache.hadoop:hadoop-common:2.7.1
  

Кто-нибудь может мне помочь с этим?

Комментарии:

1. Вы пробовали читать только один файл json в Spark, просто чтобы убедиться, что ваши базовые настройки / aws верны.

2. Да, чтение одного файла также выдает ту же ошибку. Я пробовал считывать данные с помощью boto3, и все работало нормально, поэтому проблема не в учетных данных AWS.

3. Используйте сборку spark с двоичными файлами hadoop-3.1, чтобы получить текущую версию соединителя S3A, посмотрите, что произойдет дальше. Похоже, что это исключение EOF происходит при вызове close(), поэтому оно не является основной причиной проблемы. Отправлено issues.apache.org/jira/browse/HADOOP-17312 чтобы решить эту проблему