#amazon-web-services #amazon-s3 #pyspark #amazon-emr
#amazon-web-services #amazon-s3 #pyspark #amazon-emr
Вопрос:
Получение ошибки OutOfMemory для следующего кода PySpark: (сбой после записи определенного количества строк. Этого не произойдет, если я попытаюсь выполнить запись в файловую систему hadoop вместо использования s3a, поэтому я думаю, что я сузил проблему до s3a. ) — конечная цель для записи в s3a. Было интересно, существует ли оптимальная конфигурация s3a, в которой у меня не будет нехватки памяти для чрезвычайно больших таблиц.
df = spark.sql("SELECT * FROM my_big_table")
df.repartition(1).write.option("header", "true").csv("s3a://mycsvlocation/folder/")
мои конфигурации s3a (emr по умолчанию) :
('fs.s3a.attempts.maximum', '10')
('fs.s3a.buffer.dir', '${hadoop.tmp.dir}/s3a')
('fs.s3a.connection.establish.timeout', '5000')
('fs.s3a.connection.maximum', '15')
('fs.s3a.connection.ssl.enabled', 'true')
('fs.s3a.connection.timeout', '50000')
('fs.s3a.fast.buffer.size', '1048576')
('fs.s3a.fast.upload', 'true')
('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
('fs.s3a.max.total.tasks', '1000')
('fs.s3a.multipart.purge', 'false')
('fs.s3a.multipart.purge.age', '86400')
('fs.s3a.multipart.size', '104857600')
('fs.s3a.multipart.threshold', '2147483647')
('fs.s3a.paging.maximum', '5000')
('fs.s3a.threads.core', '15')
('fs.s3a.threads.keepalivetime', '60')
('fs.s3a.threads.max', '256')
('mapreduce.fileoutputcommitter.algorithm.version', '2')
('spark.authenticate', 'true')
('spark.network.crypto.enabled', 'true')
('spark.network.crypto.saslFallback', 'true')
('spark.speculation', 'false')
база трассировки стека:
Caused by: java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.hadoop.fs.s3a.S3AFastOutputStream.write(S3AFastOutputStream.java:194)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:60)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
at com.univocity.parsers.common.input.WriterCharAppender.writeCharsAndReset(WriterCharAppender.java:152)
at com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:808)
... 16 more
Комментарии:
1. Я предполагаю, что это из-за того,
repartition(1)
что вся нагрузка идет на одного работника. Можете ли вы попробовать безrepartition
и подтвердить, что это работает или нет? Также, если вы можете записывать в hdfs, вы можете использоватьs3distcp
для копирования изhdfs
вs3
.2. spark намного лучше работает с несколькими файлами, а не с одним.
3. EMR не поддерживается
S3A
. ИспользуйтеS3
. Aws doc Да, вы можете использовать его, но может возникнуть множество проблем.
Ответ №1:
Проблема здесь в том, что загрузка s3a по умолчанию не поддерживает загрузку единственного файла большого размера, превышающего 2 ГБ или 2147483647 байт.
('fs.s3a.multipart.threshold', '2147483647')
Моя версия EMR старше, чем более поздние, поэтому параметр multipart.threshold является всего лишь целым числом, поэтому предел составляет 2147483647 байт для одной «части» или файла. Более поздние версии используют long вместо int и могут поддерживать большее ограничение на размер одного файла.
Я буду использовать обходной путь для записи файла в локальный HDFS, а затем перемещать его в s3 с помощью отдельной Java-программы.