#apache-spark #spark-streaming
#apache-spark #spark-streaming
Вопрос:
Мне нужно сравнить два фрейма данных. Один из них является статическим, а другой — потоковым. Пример статического фрейма данных выглядит следующим образом:
id, value
2786, 5
7252, 3
2525, 4
8038, 1
Пример потокового фрейма данных выглядит следующим образом:
id, value
2786, 9
7252, 8
2525, 7
Результирующий фрейм данных должен выглядеть следующим образом:
id, value
8038, 1
Значение вообще не важно. Мне просто нужно найти, что для этого мини-пакета у меня нет значения с указанным идентификатором 8038. Я пытался использовать для этого функции joins и subtract(), но проблема в том, что потоково -статические соединения не поддерживают те типы соединений, которые мне нужны, а вычитание не работает, когда слева находится статический фрейм данных. Например, эти выражения вернут ошибку:
staticDF.subtract(streamingDF)
staticDF.join(streamingDF, staticDF.id = streamingDF.id, "left_anti")
Есть ли какой-либо способ получить идентификатор, который есть в staticDF, но не в streamingDF в Spark Structured Streaming?
Ответ №1:
Вы можете использовать приемник foreachBatch, а затем использовать левое антисоединение для статического фрейма данных и микропакета.
streamingDf.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
println("------------------------")
println("Batch " batchId " data")
println("Total Records " batchDF.count())
println("------------------------")
staticDf.join(batchDF, staticDf("id") === batchDF("id"),"left_anti")
.select(staticDf("*")).show()
//You can also write your output using any writer
//e.g. df.write.format("csv").save("src/test/resources")
}.start()
Входные данные:
static df
---- -----
| id|value|
---- -----
|2786| 5|
|7252| 3|
|2525| 4|
|8038| 1|
---- -----
streaming batch 0
2786,9
7252,8
2525,7
streaming batch 1
2786,9
7252,8
Вывод:
------------------------
Batch 0 data
Total Records 3
------------------------
---- -----
| id|value|
---- -----
|8038| 1|
---- -----
------------------------
Batch 1 data
Total Records 2
------------------------
---- -----
| id|value|
---- -----
|2525| 4|
|8038| 1|
---- -----