Повышение производительности медленно работающего процесса потоковой передачи искр, использующего микропакетирование

#scala #performance #apache-spark #spark-streaming

#scala #Производительность #apache-spark #потоковая передача искровых данных

Вопрос:

Я пытаюсь создать приложение для обработки 10 миллионов файлов json, где размер json может варьироваться от 1 МБ до 50 МБ.

Чтобы избежать нагрузки на драйвер, я использую API структурированной потоковой передачи для одновременной обработки 100 000 файлов json, а не для загрузки всех исходных файлов сразу.

mySchema

  val mySchema: StructType = 
      StructType( Array(
        StructField("ID",StringType,true), 
        StructField("StartTime",DoubleType, true),
        StructField("Data", ArrayType(
          StructType( Array(
              StructField("field1",DoubleType,true),
              StructField("field2",LongType,true),
              StructField("field3",LongType,true),
              StructField("field4",DoubleType,true),
              StructField("field5",DoubleType,true),
              StructField("field6",DoubleType,true),
              StructField("field7",LongType,true),          
              StructField("field8",LongType,true)
              )),true),true)))
 

Создавайте потоковый фрейм данных, выбирая 100 000 файлов одновременно

 val readDF = spark.readStream
                    .format("json")
                    .option("MaxFilesPerTrigger", 100000)
                    .option( "pathGlobFilter", "*.json")
                    .option( "recursiveFileLookup", "true")
                    .schema(mySchema)
                    .load("/mnt/source/2020/*")
 

writeStream для запуска потоковых вычислений

 val sensorFileWriter = binaryDF
                      .writeStream
                      .queryName( "myStream")
                      .format("delta")
                      .trigger(Trigger.ProcessingTime("30 seconds"))
                      .outputMode("append")
                      .option( "checkpointLocation", "/mnt/dir/checkpoint")
                      .foreachBatch(
                        (batchDF: DataFrame, batchId: Long) => {
                          
                          batchDF.persist()
                        
                          val parseDF = batchDF
                          .withColumn("New_Set", expr("transform(Data, x -> (x.field1 as f1, x.field2 as f2, x.field3 as field3))"))
                          .withColumn("Data_New",addCol(uuid(),to_json($"New_Set")))
                          .withColumn("data_size", size(col("Data")))
                          .withColumn("requestid", uuid())
                          .withColumn("start_epoch_double", bround($"StartTime").cast("long"))
                          .withColumn("Start_date", from_unixtime($"start_epoch_double", "YYYYMMdd"))
                          .withColumn("request", concat(lit("start"), col("Data_New"), lit("end")))
                          .persist()
                          
                          val requestDF = parseDF
                           .select($"Start_date", $"request")

                           requestDF.write
                            .partitionBy("Start_date")
                            .mode("append")
                            .parquet("/mnt/Target/request")
                      }
                        )
 

В приведенном выше «addCol» — это определяемая пользователем функция, которая добавляет новое StructField в массив StructFields

 val addCol = udf((id:String,json:String) => {
    import org.json4s._
    import org.json4s.jackson.JsonMethods._
    implicit val formats = DefaultFormats
    import org.json4s.JsonDSL._
    compact(parse(json).extract[List[Map[String,String]]].map(m => Map("requestid" -> id)    m))
}) 
 

«uuid» — это еще один udf, который генерирует уникальный идентификатор

 val uuid = udf(() => java.util.UUID.randomUUID().toString)
 

Конфигурация кластера Databricks:-

Apache Spark 2.4.5

70 рабочих: 3920,0 ГБ памяти, 1120 ядер (т.е. 56,0 ГБ памяти и 16 ядер на одного рабочего)

1 Драйвер: 128,0 ГБ памяти, 32 ядра

На рисунке ниже показано общее количество задач для написания каждой партии из 100 000, что занимает более часа. Весь процесс занимает несколько дней, чтобы завершить обработку 10 миллионов файлов json.

введите описание изображения здесь

Как я могу ускорить этот процесс потоковой передачи?

Должен ли я устанавливать свойство для «spark.sql.shuffle.partitions». Если да, то какова хорошая ценность этого объекта недвижимости?

Ответ №1:

В большинстве случаев требуется сопоставление разделов с ядрами 1 к 1 для потоковых приложений…Azure Databricks структурирует потоковую передачу.

Чтобы оптимизировать сопоставление ваших разделов с ядрами, попробуйте следующее:

 spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)
 

Это даст вам сопоставление ваших разделов с ядрами 1 к 1.

Комментарии:

1. Запрос потока по-прежнему выполняется медленно даже после установки этого свойства. Должен ли я также устанавливать spark.sql.files.maxPartitionBytes? Если у вас есть какие-либо другие предложения, дайте мне знать

2. Привет, вы нашли решение для этого? Я сталкиваюсь с аналогичными проблемами, когда синтаксический анализ json делает мой потоковый запрос чрезвычайно медленным.