Структурированная потоковая передача Spark объединяет поток файлов csv и поток скорости слишком много времени на пакет

#scala #apache-spark #hadoop #spark-structured-streaming

Вопрос:

У меня есть потоки файлов rate и csv, которые объединяются по значениям rat и идентификатору файла csv:

 def readFromCSVFile(path: String)(implicit spark: SparkSession): DataFrame =  {
    val schema = StructType(
        StructField("id", LongType, nullable = false) ::
        StructField("value1", LongType, nullable = false) ::
        StructField("another", DoubleType, nullable = false) :: Nil)
  val spark: SparkSession = SparkSession
  .builder
  .master("local[1]")
  .config(new SparkConf().setIfMissing("spark.master", "local[1]")
  .set("spark.eventLog.dir", "file:///tmp/spark-events")
  ).getOrCreate()

   spark
      .readStream
      .format("csv")
      .option("header", value=true)
      .schema(schema)
      .option("delimiter", ",")
      .option("maxFilesPerTrigger", 1)
      //.option("inferSchema", value = true)
      .load(path)
  }

   val rate = spark.readStream
      .format("rate")
      .option("rowsPerSecond", 1)
      .option("numPartitions", 10)
      .load()
      .withWatermark("timestamp", "1 seconds")

    val cvsStream=readFromCSVFile(tmpPath.toString)
    val cvsStream2 = cvsStream.as("csv").join(rate.as("counter")).where("csv.id == counter.value").withWatermark("timestamp", "1 seconds")

    cvsStream2
      .writeStream
      .trigger(Trigger.ProcessingTime(10))
      .format("console")
      .option("truncate", "false")
      .queryName("kafkaDataGenerator")
      .start().awaitTermination(300000)
 

CSV-файл имеет длину 6 строк, но обработка одной партии занимает около 100 секунд:

 2021-10-15 23:21:29 WARN  ProcessingTimeExecutor:69 - Current batch is falling behind. The trigger interval is 10 milliseconds, but spent 92217 milliseconds
-------------------------------------------
Batch: 1
-------------------------------------------
 --- ------ ------- ----------------------- ----- 
|id |value1|another|timestamp              |value|
 --- ------ ------- ----------------------- ----- 
|6  |2     |3.0    |2021-10-15 20:20:02.507|6    |
|5  |2     |2.0    |2021-10-15 20:20:01.507|5    |
|1  |1     |1.0    |2021-10-15 20:19:57.507|1    |
|3  |1     |3.0    |2021-10-15 20:19:59.507|3    |
|2  |1     |2.0    |2021-10-15 20:19:58.507|2    |
|4  |2     |1.0    |2021-10-15 20:20:00.507|4    |
 --- ------ ------- ----------------------- ----- 
 

Как я могу оптимизировать операцию объединения, чтобы быстрее обработать этот пакет? Это не должно занимать так много вычислений, поэтому похоже, что есть какой-то скрытый водяной знак или что-то еще, заставляющее пакет ждать около 100 секунд. Какие параметры/свойства могут быть применены?

Ответ №1:

Я бы предположил, что у вас пока недостаточно данных для оценки производительности. Почему бы вам не увеличить данные до 500 000 и не посмотреть, есть ли у вас проблемы? Прямо сейчас я обеспокоен тем, что у вас недостаточно данных для эффективного использования производительности вашей системы, а затраты на запуск не амортизируются должным образом объемом данных.

Комментарии:

1. Возможно. Итак, могу ли я поиграть с некоторыми параметрами, чтобы снизить затраты на запуск, или они, похоже, не настраиваются?

2. Я предполагаю, что ваше разочарование-это время разработки? Запуск на самом деле не является проблемой потоковой передачи, речь идет скорее о том, чтобы поддерживать ваш проект в рабочем состоянии и принимать потоковые данные. Я ценю разочарование, вызванное ожиданием результатов. Возможно, вам захочется на самом деле работать в кластере spark, а не запускать локально. Это может помочь сократить время запуска. Это может ухудшить ситуацию, если кластер будет занят. (Это помогло бы выявить хитрые ошибки, которые обнаруживаются только тогда, когда вы не работаете локально.) Там, где это возможно, я всегда предлагаю работать в облаке и развертываться с помощью конвейера кода.

Ответ №2:

Что резко улучшило производительность? Использование spark.read вместо spark.readStream этого и сохранение DataFrame в памяти:

 val dataFrameToBeReturned = spark.read
      .format("csv")
      .schema(schema)
      .option("delimiter", ";")
      .option("maxFilesPerTrigger", 1)
      .csv("hdfs://"   hdfsLocation   homeZeppelinPrefix   "/generator/"   shortPath)
      .persist(StorageLevel.MEMORY_ONLY_SER)