Ошибка OutOfMemory при записи в s3a через EMR

#amazon-web-services #amazon-s3 #pyspark #amazon-emr

#amazon-web-services #amazon-s3 #pyspark #amazon-emr

Вопрос:

Получение ошибки OutOfMemory для следующего кода PySpark: (сбой после записи определенного количества строк. Этого не произойдет, если я попытаюсь выполнить запись в файловую систему hadoop вместо использования s3a, поэтому я думаю, что я сузил проблему до s3a. ) — конечная цель для записи в s3a. Было интересно, существует ли оптимальная конфигурация s3a, в которой у меня не будет нехватки памяти для чрезвычайно больших таблиц.

 df = spark.sql("SELECT * FROM my_big_table")
df.repartition(1).write.option("header", "true").csv("s3a://mycsvlocation/folder/")
  

мои конфигурации s3a (emr по умолчанию) :

 ('fs.s3a.attempts.maximum', '10')
('fs.s3a.buffer.dir', '${hadoop.tmp.dir}/s3a')
('fs.s3a.connection.establish.timeout', '5000')
('fs.s3a.connection.maximum', '15')
('fs.s3a.connection.ssl.enabled', 'true')
('fs.s3a.connection.timeout', '50000')
('fs.s3a.fast.buffer.size', '1048576')
('fs.s3a.fast.upload', 'true')
('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
('fs.s3a.max.total.tasks', '1000')
('fs.s3a.multipart.purge', 'false')
('fs.s3a.multipart.purge.age', '86400')
('fs.s3a.multipart.size', '104857600')
('fs.s3a.multipart.threshold', '2147483647')
('fs.s3a.paging.maximum', '5000')
('fs.s3a.threads.core', '15')
('fs.s3a.threads.keepalivetime', '60')
('fs.s3a.threads.max', '256')
('mapreduce.fileoutputcommitter.algorithm.version', '2')
('spark.authenticate', 'true')
('spark.network.crypto.enabled', 'true')
('spark.network.crypto.saslFallback', 'true')
('spark.speculation', 'false')
  

база трассировки стека:

 Caused by: java.lang.OutOfMemoryError
        at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at org.apache.hadoop.fs.s3a.S3AFastOutputStream.write(S3AFastOutputStream.java:194)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:60)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
        at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
        at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
        at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
        at com.univocity.parsers.common.input.WriterCharAppender.writeCharsAndReset(WriterCharAppender.java:152)
        at com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:808)
        ... 16 more
  

Комментарии:

1. Я предполагаю, что это из-за того, repartition(1) что вся нагрузка идет на одного работника. Можете ли вы попробовать без repartition и подтвердить, что это работает или нет? Также, если вы можете записывать в hdfs, вы можете использовать s3distcp для копирования из hdfs в s3 .

2. spark намного лучше работает с несколькими файлами, а не с одним.

3. EMR не поддерживается S3A . Используйте S3 . Aws doc Да, вы можете использовать его, но может возникнуть множество проблем.

Ответ №1:

Проблема здесь в том, что загрузка s3a по умолчанию не поддерживает загрузку единственного файла большого размера, превышающего 2 ГБ или 2147483647 байт.

 ('fs.s3a.multipart.threshold', '2147483647')
  

Моя версия EMR старше, чем более поздние, поэтому параметр multipart.threshold является всего лишь целым числом, поэтому предел составляет 2147483647 байт для одной «части» или файла. Более поздние версии используют long вместо int и могут поддерживать большее ограничение на размер одного файла.

Я буду использовать обходной путь для записи файла в локальный HDFS, а затем перемещать его в s3 с помощью отдельной Java-программы.