Apache Spark: обновите режим вывода после операции объединения для потоковых наборов данных

#apache-spark #join #group-by #apache-spark-sql #spark-streaming

#apache-spark #Присоединиться #группировка по #apache-spark-sql #spark-streaming

Вопрос:

Я пытаюсь написать код, который сначала выполняет объединение, а затем агрегацию (groupby и count).

Я хочу, чтобы выходные данные моего этапа агрегирования были обновляемыми. Ниже приведен код, который я использую:

     val spark = SparkSession.builder().master("local").getOrCreate()

    import spark.implicits._


    val df = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "testerIn")
      .load().selectExpr("CAST(value AS STRING)").as[String]


    val interimDF = df.join(df,"value")

    val newDF = interimDF.groupBy("value").count().toJSON

    newDF.writeStream.format("kafka").outputMode("update") .option("kafka.bootstrap.servers", "localhost:9092") . option("checkpointLocation","/path/to/directory")
      .option("topic", "tester").start()

    spark.streams.awaitAnyTermination()
  

Этот код выдает ошибку, поскольку режим обновления не поддерживается объединениями потоков в spark:

 Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported in Update output mode, only in Append output mode;;
  

Теперь я полностью понимаю, почему spark выдает эту ошибку, потому что, когда мы объединяемся; режим обновления вряд ли имеет какой-либо смысл (потому что мы просто получаем новую строку на выходе всякий раз, когда на входе появляется какая-либо новая строка, следовательно, append).

Если бы я вывел фрейм данных после моего объединения (interimDF) в Kafka в режиме добавления, а затем прочитал из него и выполнил этап агрегации (newDF) и записал его обратно в какой-либо другой поток в режиме обновления, моя проблема была бы решена. Это именно то, что я хочу сделать, но я хочу избежать стадии записи в Kafka в середине. Возможно ли это каким-либо образом? Я также готов принять хакерские решения или ссылку на запрос на извлечение, который кто-то, возможно, сделал в отношении подобных материалов.

Комментарии:

1. Аналогичная проблема. Я пытаюсь выполнить агрегацию, за которой следует объединение. Это, по-видимому, невозможно, поскольку запросы объединения поддерживают только Append, а запросы агрегирования не поддерживают Append.

2. С тех пор я над этим не работал. Но если вы хотите сотрудничать, я более чем уверен, что мы сможем сделать патч для spark, чтобы сделать это возможным.

3. К сожалению, в настоящее время я не в состоянии уделить этому время, но я действительно ценю предложение.