Динамический расчет коалесценции в потоке с искровой структурой

#scala #apache-spark #apache-kafka #spark-structured-streaming

Вопрос:

Я ищу способ рассчитать наилучший coalesce параметр, используемый в потоковом задании со структурой Spark. Точнее, я работаю над заданием чтения данных из Кафки, выполняю несколько шагов фильтрации, чтобы восстановить результат в выходной теме. Вот основной код :

 val sparkSession = SparkSession
  .builder()
  .appName(configuration.applicationName)
  .getOrCreate()

val kafkaDf: DataFrame = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", confKafkaConsumer.bootstrapServers)
  .option("subscribe", confKafkaConsumer.inputTopic)
  .option("group.id", confKafkaConsumer.groupId)
  .option("startingOffsets", confKafkaConsumer.autoOffsetReset)
  .load()

  kafkaDf
    .writeStream
    .queryName(configuration.applicationName)
    .option("checkpointLocation", configuration.checkpointKafkaDirPath)
    .foreachBatch((batchDF: DataFrame, batchId: Long) => {

     val filterDF1 = batchDf.filter(col("id").isNotNull)
       .dropDuplicates("id")
       .coalesce(10)

     val filterDF2 = ...

     val filterDF3 = ...
     ...
    })
    .start()
 

Как вы можете видеть, coalesce используемый параметр равен 10 . Действительно, задание периодически обрабатывает очень мало данных, но бывает, что некоторые потоки более последовательны (несколько десятков тысяч строк). Так что слияние 10 оказывается слишком малым в случае больших поступающих объемов.

Итак, мой вопрос: есть ли способ эффективно рассчитать наиболее подходящее слияние на основе количества входящих строк? Я подумал о расчете следующего типа:

 .foreachBatch((batchDF: DataFrame, batchId: Long) => {

 val inputLinesNumber = batchDF.count()
 val coalesceParameter = //formula that depends on inputLinesNumber and maybe other parameters

 val filterDF1 = batchDf.filter(col("id").isNotNull)
   .dropDuplicates("id")
   .coalesce(coalesceParameter)

 val filterDF2 = ...

 val filterDF3 = ...
 ...
})
 

Принцип состоял бы в том, чтобы определить простую формулу, которая позволяет регулировать coalesceParameter в соответствии с входящим объемом ( inputLinesNumber ). Ваши предложения и идеи приветствуются.