Как проверить показатели производительности StreamingQuery в структурированной потоковой передаче?

#apache-spark #spark-streaming #spark-structured-streaming

#apache-spark #spark-streaming #spark-structured-streaming

Вопрос:

Я хочу получить информацию, например triggerExecution, inputRowsPerSecond, numInputRows, processedRowsPerSecond , из потокового запроса.

Я использую rate формат для генерации 10 rows per second и QueryProgressEvent получения всех показателей.

Однако в консоли при печати QueryProgressEvent.inputRowsPerSecond я получаю неправильные значения, такие как : 625.0 666.66

Может кто-нибудь объяснить, почему он генерирует такое значение?

Код и пример вывода ниже:

  spark.streams.addListener(new EventMetric())

val df = spark.readStream
.format("rate")
  .option("rowsPerSecond",10)
  .option("numPartitions",1)
  .load()
  .select($"value",$"timestamp")

df.writeStream
.outputMode("append")
.option("checkpointLocation", "/testjob")
.foreachBatch((batchDf: DataFrame, batchId: Long) =>{
  println("rowcount value >>>> "   rowCountAcc.value)
  val outputDf = batchDf
  outputDf.write
    .format("console")
    .mode("append")
    .save()
})
.start()
.awaitTermination()
 

StreamingQueryListener:

 class EventMetric extends StreamingQueryListener{
  override def onQueryStarted(event: QueryStartedEvent): Unit = {
  }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    val p = event.progress
//    println("id : "   p.id)
    println("runId : "    p.runId)
//    println("name : "   p.name)
    println("batchid : "   p.batchId)
    println("timestamp : "   p.timestamp)
    println("triggerExecution"   p.durationMs.get("triggerExecution"))
    println(p.eventTime)
    println("inputRowsPerSecond : "   p.inputRowsPerSecond)
    println("numInputRows : "   p.numInputRows)
    println("processedRowsPerSecond : "   p.processedRowsPerSecond)
    println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
  }

  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {

  }
}
 

ВЫВОД 1:

 runId : bc7f97c1-687f-4125-806a-dc573e006dcd
batchid : 164
timestamp : 2020-12-12T12:31:14.323Z
triggerExecution453
{}
inputRowsPerSecond : 625.0
numInputRows : 10
processedRowsPerSecond : 22.075055187637968
 

ВЫВОД 2:

 runId : bc7f97c1-687f-4125-806a-dc573e006dcd
batchid : 168
timestamp : 2020-12-12T12:31:18.326Z
triggerExecution453
{}
inputRowsPerSecond : 666.6666666666667
numInputRows : 10
processedRowsPerSecond : 22.075055187637968
 

Редактировать:

Кроме того, если 625 — это скорость ввода, то почему processedRowsPerSecond такой низкий для этой работы, которая на самом деле не выполняет преобразования?


UPDATE :: OUTPUT WITH PRETTY JSON:

Пакет 1:

 runId : 16c82066-dea0-4e0d-8a1e-ad1df55ad516
batchid : 198
timestamp : 2020-12-13T16:23:14.331Z
triggerExecution422
{}
inputRowsPerSecond : 666.6666666666667
numInputRows : 10
processedRowsPerSecond : 23.696682464454977
json : {
  "id" : "f8af5400-533c-4f7f-8b01-b365dc736735",
  "runId" : "16c82066-dea0-4e0d-8a1e-ad1df55ad516",
  "name" : null,
  "timestamp" : "2020-12-13T16:23:14.331Z",
  "batchId" : 198,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 666.6666666666667,
  "processedRowsPerSecond" : 23.696682464454977,
  "durationMs" : {
    "addBatch" : 47,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 0,
    "setOffsetRange" : 0,
    "triggerExecution" : 422,
    "walCommit" : 234
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=1",
    "startOffset" : 212599,
    "endOffset" : 212600,
    "numInputRows" : 10,
    "inputRowsPerSecond" : 666.6666666666667,
    "processedRowsPerSecond" : 23.696682464454977
  } ],
  "sink" : {
    "description" : "ForeachBatchSink"
  }
}
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
 

Пакет 2:

 runId : 16c82066-dea0-4e0d-8a1e-ad1df55ad516
batchid : 191
timestamp : 2020-12-13T16:23:07.328Z
triggerExecution421
{}
inputRowsPerSecond : 625.0
numInputRows : 10
processedRowsPerSecond : 23.752969121140143
json : {
  "id" : "f8af5400-533c-4f7f-8b01-b365dc736735",
  "runId" : "16c82066-dea0-4e0d-8a1e-ad1df55ad516",
  "name" : null,
  "timestamp" : "2020-12-13T16:23:07.328Z",
  "batchId" : 191,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 625.0,
  "processedRowsPerSecond" : 23.752969121140143,
  "durationMs" : {
    "addBatch" : 62,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 16,
    "setOffsetRange" : 0,
    "triggerExecution" : 421,
    "walCommit" : 187
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=1",
    "startOffset" : 212592,
    "endOffset" : 212593,
    "numInputRows" : 10,
    "inputRowsPerSecond" : 625.0,
    "processedRowsPerSecond" : 23.752969121140143
  } ],
  "sink" : {
    "description" : "ForeachBatchSink"
  }
}
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
 

Ответ №1:

Имейте в виду, что генерация 10 строк в секунду ничего не говорит о скорости ввода в вашем общем потоковом запросе.

В вашем writeStream вызове вы не задаете a Trigger , что означает, что потоковый запрос запускается, когда он выполнен и доступны новые данные.

Теперь, похоже, что потоковому запросу требуется не целая секунда, чтобы прочитать эти 10 секунд, а скорее ее часть. «inputRowsPerSecond» — это скорее показатель скорости чтения входных данных. Вы можете видеть разные значения в разных пакетах также из-за округления значений. Проверьте поле «временная метка» в вашем выводе, это не ровно одна секунда, а обычно — несколько миллисекунд.

Для чтения данных заданию требуется всего несколько миллисекунд, и это может незначительно отличаться от пакета к пакету. В пакете 164 на выполнение задания ушло 16 мс, а в пакете 168 на чтение 10 сообщений ушло 15 мс.

 Batch 164 => 10 / 0,016sec = 625 messages per second

Batch 168 => 10 / 0,015ses = 666.6667 messages per second

 

Вычисляется processedRowsPerSecond на основе triggerExecution

 1000 / triggerExecution x 10msg = 1000 / 421 x 10msg = 23.752969