#apache-spark #pyspark
#apache-spark #pyspark
Вопрос:
У меня есть задание, которое анализирует приблизительно терабайт данных в формате json, разделенных на файлы размером 20 Мб (это потому, что каждая минута по существу получает набор данных объемом 1 ГБ).
Задание анализирует, фильтрует и преобразует эти данные и записывает их обратно в другой путь. Однако, выполняется ли он, зависит от конфигурации spark.
Кластер состоит из 46 узлов с 96 ядрами и 768 ГБ памяти на узел. Драйвер имеет те же характеристики.
Я отправляю задание в автономном режиме и:
- При использовании 22g и 3 ядер на исполнителя задание завершается неудачно из-за gc и OOM
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/opt/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError19/04/13 01:35:32 WARN TransportChannelHandler: Exception in connection from /10.0.118.151:34014
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.sun.security.sasl.digest.DigestMD5Base$DigestIntegrity.getHMAC(DigestMD5Base.java:1060)
at com.sun.security.sasl.digest.DigestMD5Base$DigestPrivacy.unwrap(DigestMD5Base.java:1470)
at com.sun.security.sasl.digest.DigestMD5Base.unwrap(DigestMD5Base.java:213)
at org.apache.spark.network.sasl.SparkSaslServer.unwrap(SparkSaslServer.java:150)
at org.apache.spark.network.sasl.SaslEncryption$DecryptionHandler.decode(SaslEncryption.java:126)
at org.apache.spark.network.sasl.SaslEncryption$DecryptionHandler.decode(SaslEncryption.java:101)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
: An error occurred while calling o54.json.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
- Используя 120g и 15 ядер на исполнителя, задание выполняется успешно.
Почему задание завершится неудачей при настройке памяти / ядра меньшего размера?
Примечания: Существует операция разнесения, которая, возможно, также может быть связана. Редактировать: не связано. Протестировал код, выполнил простой spark.read.json().count().show(), и он gc’d и OOM’d .
Моя текущая любимая теория на данный момент заключается в том, что большое количество небольших файлов приводит к высоким накладным расходам на перетасовку. Это то, что происходит, и есть ли способ обойти это (за исключением повторной агрегации файлов по отдельности)?
Код в соответствии с запросом: Launcher
./bin/spark-submit --master spark://0.0.0.0:7077
--conf "spark.executor.memory=90g"
--conf "spark.executor.cores=12"
--conf 'spark.default.parallelism=7200'
--conf 'spark.sql.shuffle.partitions=380'
--conf "spark.network.timeout=900s"
--conf "spark.driver.extraClassPath=$LIB_JARS"
--conf "spark.executor.extraClassPath=$LIB_JARS"
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX: PrintGCDetails -XX: PrintGCTimeStamps"
launcher.py
Код
spark = SparkSession.builder
.appName('Rewrites by Frequency')
.getOrCreate()
spark.read.json("s3://path/to/file").count()
Комментарии:
1. Не могли бы вы уточнить, сколько исполнителей запущено в обоих случаях?
2. Привет, я понимаю, что spark в настоящее время вычисляет количество исполнителей на основе ядер / памяти. В случае с 3 ядрами это будет означать максимум 1472 исполнителя. В случае с 15 ядрами максимум 276 исполнителей.
3. Такого рода ошибки, как правило, возникают, когда ваше разделение неоптимально (т. Е. У вас Есть несколько разделов, которые не могут поместиться в память одного исполнителя). Если вы опубликуете некоторый код, мы сможем помочь вам в дальнейшем. Кроме того, JSON — довольно неэффективный формат данных, в зависимости от того, как выглядят ваши данные, вы можете повысить производительность, используя Parquet.