Как сравнить статический фрейм данных с потоковым в Spark Structured Streaming?

#apache-spark #spark-streaming

#apache-spark #spark-streaming

Вопрос:

Мне нужно сравнить два фрейма данных. Один из них является статическим, а другой — потоковым. Пример статического фрейма данных выглядит следующим образом:

  id, value
2786,  5
7252,  3
2525,  4
8038,  1
  

Пример потокового фрейма данных выглядит следующим образом:

  id, value
2786,  9
7252,  8
2525,  7
  

Результирующий фрейм данных должен выглядеть следующим образом:

 id, value
8038, 1
  

Значение вообще не важно. Мне просто нужно найти, что для этого мини-пакета у меня нет значения с указанным идентификатором 8038. Я пытался использовать для этого функции joins и subtract(), но проблема в том, что потоково -статические соединения не поддерживают те типы соединений, которые мне нужны, а вычитание не работает, когда слева находится статический фрейм данных. Например, эти выражения вернут ошибку:

 staticDF.subtract(streamingDF)
staticDF.join(streamingDF, staticDF.id = streamingDF.id, "left_anti")
  

Есть ли какой-либо способ получить идентификатор, который есть в staticDF, но не в streamingDF в Spark Structured Streaming?

Ответ №1:

Вы можете использовать приемник foreachBatch, а затем использовать левое антисоединение для статического фрейма данных и микропакета.

 streamingDf.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  println("------------------------")
  println("Batch " batchId  " data")
  println("Total Records "   batchDF.count())
  println("------------------------")
  staticDf.join(batchDF, staticDf("id") === batchDF("id"),"left_anti")
    .select(staticDf("*")).show()

//You can also write your output using any writer
//e.g. df.write.format("csv").save("src/test/resources")

}.start()
  

Входные данные:

 static df
 ---- ----- 
|  id|value|
 ---- ----- 
|2786|    5|
|7252|    3|
|2525|    4|
|8038|    1|
 ---- ----- 

streaming batch 0
2786,9
7252,8
2525,7

streaming batch 1
2786,9
7252,8
  

Вывод:

 ------------------------
Batch 0 data
Total Records 3
------------------------
 ---- ----- 
|  id|value|
 ---- ----- 
|8038|    1|
 ---- ----- 

------------------------
Batch 1 data
Total Records 2
------------------------
 ---- ----- 
|  id|value|
 ---- ----- 
|2525|    4|
|8038|    1|
 ---- -----