Понимание того, почему меньшие исполнители терпят неудачу, а большие преуспевают в spark

#apache-spark #pyspark

#apache-spark #pyspark

Вопрос:

У меня есть задание, которое анализирует приблизительно терабайт данных в формате json, разделенных на файлы размером 20 Мб (это потому, что каждая минута по существу получает набор данных объемом 1 ГБ).

Задание анализирует, фильтрует и преобразует эти данные и записывает их обратно в другой путь. Однако, выполняется ли он, зависит от конфигурации spark.

Кластер состоит из 46 узлов с 96 ядрами и 768 ГБ памяти на узел. Драйвер имеет те же характеристики.

Я отправляю задание в автономном режиме и:

  1. При использовании 22g и 3 ядер на исполнителя задание завершается неудачно из-за gc и OOM
   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco                                              
  File "/opt/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value                              
py4j.protocol.Py4JJavaError19/04/13 01:35:32 WARN TransportChannelHandler: Exception in connection from /10.0.118.151:34014      
java.lang.OutOfMemoryError: GC overhead limit exceeded                                                                          
        at com.sun.security.sasl.digest.DigestMD5Base$DigestIntegrity.getHMAC(DigestMD5Base.java:1060)                           
        at com.sun.security.sasl.digest.DigestMD5Base$DigestPrivacy.unwrap(DigestMD5Base.java:1470)                             
        at com.sun.security.sasl.digest.DigestMD5Base.unwrap(DigestMD5Base.java:213)                                           
        at org.apache.spark.network.sasl.SparkSaslServer.unwrap(SparkSaslServer.java:150)                                        
        at org.apache.spark.network.sasl.SaslEncryption$DecryptionHandler.decode(SaslEncryption.java:126)                       
        at org.apache.spark.network.sasl.SaslEncryption$DecryptionHandler.decode(SaslEncryption.java:101)                        
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)                          
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)              
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)             
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)                
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)                       
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)              
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)              
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)                
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)                     
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)             
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)             
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)                              
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)                      
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:643)                                                              
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)                                 
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)                                                                                              
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)                                                                                     
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)                         
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)             
        at java.lang.Thread.run(Thread.java:748)                                                                                 
: An error occurred while calling o54.json.                                                                                                                                                                                              
: java.lang.OutOfMemoryError: GC overhead limit exceeded  
 
  1. Используя 120g и 15 ядер на исполнителя, задание выполняется успешно.

Почему задание завершится неудачей при настройке памяти / ядра меньшего размера?

Примечания: Существует операция разнесения, которая, возможно, также может быть связана. Редактировать: не связано. Протестировал код, выполнил простой spark.read.json().count().show(), и он gc’d и OOM’d .

Моя текущая любимая теория на данный момент заключается в том, что большое количество небольших файлов приводит к высоким накладным расходам на перетасовку. Это то, что происходит, и есть ли способ обойти это (за исключением повторной агрегации файлов по отдельности)?

Код в соответствии с запросом: Launcher

 ./bin/spark-submit --master spark://0.0.0.0:7077 
--conf "spark.executor.memory=90g" 
--conf "spark.executor.cores=12" 
--conf 'spark.default.parallelism=7200' 
--conf 'spark.sql.shuffle.partitions=380' 
--conf "spark.network.timeout=900s" 
--conf "spark.driver.extraClassPath=$LIB_JARS" 
--conf "spark.executor.extraClassPath=$LIB_JARS" 
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX: PrintGCDetails -XX: PrintGCTimeStamps" 
launcher.py
 

Код

     spark = SparkSession.builder 
        .appName('Rewrites by Frequency') 
        .getOrCreate()
    spark.read.json("s3://path/to/file").count()
 

Комментарии:

1. Не могли бы вы уточнить, сколько исполнителей запущено в обоих случаях?

2. Привет, я понимаю, что spark в настоящее время вычисляет количество исполнителей на основе ядер / памяти. В случае с 3 ядрами это будет означать максимум 1472 исполнителя. В случае с 15 ядрами максимум 276 исполнителей.

3. Такого рода ошибки, как правило, возникают, когда ваше разделение неоптимально (т. Е. У вас Есть несколько разделов, которые не могут поместиться в память одного исполнителя). Если вы опубликуете некоторый код, мы сможем помочь вам в дальнейшем. Кроме того, JSON — довольно неэффективный формат данных, в зависимости от того, как выглядят ваши данные, вы можете повысить производительность, используя Parquet.