#apache-spark #join #group-by #apache-spark-sql #spark-streaming
#apache-spark #Присоединиться #группировка по #apache-spark-sql #spark-streaming
Вопрос:
Я пытаюсь написать код, который сначала выполняет объединение, а затем агрегацию (groupby и count).
Я хочу, чтобы выходные данные моего этапа агрегирования были обновляемыми. Ниже приведен код, который я использую:
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "testerIn")
.load().selectExpr("CAST(value AS STRING)").as[String]
val interimDF = df.join(df,"value")
val newDF = interimDF.groupBy("value").count().toJSON
newDF.writeStream.format("kafka").outputMode("update") .option("kafka.bootstrap.servers", "localhost:9092") . option("checkpointLocation","/path/to/directory")
.option("topic", "tester").start()
spark.streams.awaitAnyTermination()
Этот код выдает ошибку, поскольку режим обновления не поддерживается объединениями потоков в spark:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported in Update output mode, only in Append output mode;;
Теперь я полностью понимаю, почему spark выдает эту ошибку, потому что, когда мы объединяемся; режим обновления вряд ли имеет какой-либо смысл (потому что мы просто получаем новую строку на выходе всякий раз, когда на входе появляется какая-либо новая строка, следовательно, append).
Если бы я вывел фрейм данных после моего объединения (interimDF) в Kafka в режиме добавления, а затем прочитал из него и выполнил этап агрегации (newDF) и записал его обратно в какой-либо другой поток в режиме обновления, моя проблема была бы решена. Это именно то, что я хочу сделать, но я хочу избежать стадии записи в Kafka в середине. Возможно ли это каким-либо образом? Я также готов принять хакерские решения или ссылку на запрос на извлечение, который кто-то, возможно, сделал в отношении подобных материалов.
Комментарии:
1. Аналогичная проблема. Я пытаюсь выполнить агрегацию, за которой следует объединение. Это, по-видимому, невозможно, поскольку запросы объединения поддерживают только Append, а запросы агрегирования не поддерживают Append.
2. С тех пор я над этим не работал. Но если вы хотите сотрудничать, я более чем уверен, что мы сможем сделать патч для spark, чтобы сделать это возможным.
3. К сожалению, в настоящее время я не в состоянии уделить этому время, но я действительно ценю предложение.