Принудительная очистка фрейма данных при отображении / подсчете

#scala #apache-spark

#scala #apache-spark

Вопрос:

Я пытаюсь напечатать количество фрейма данных, а затем первые несколько его строк, прежде чем окончательно отправить его для дальнейшей обработки.

Как ни странно, после вызова count() фрейм данных становится пустым.

 val modifiedDF = funcA(sparkDF)
val deltaDF = modifiedDF.except(sparkDF)
  
 println(deltaDF.count()) // prints 10
println(deltaDF.count())  //prints 0, similar behavior with show  
  
 funcB(deltaDF) //gets null dataframe
  

Я смог проверить то же самое с помощью deltaDF.collect.foreach(println) и последующих вызовов count .

Однако, если я не вызываю count or show , а просто отправляю его как есть, funcB получает весь DF с 10 строками.

Ожидается ли это?

Определение funcA() и его зависимостей:

 def funcA(inputDataframe: DataFrame): DataFrame = {
    val col_name = "colA"
    val modified_df = inputDataframe.withColumn(col_name, customUDF(col(col_name)))
    val modifiedDFRaw = modified_df.limit(10)
    modifiedDFRaw.withColumn("colA", modifiedDFRaw.col("colA").cast("decimal(38,10)"))
}


val customUDF = udf[Option[java.math.BigDecimal], java.math.BigDecimal](myUDF)


def myUDF(sval: java.math.BigDecimal): Option[java.math.BigDecimal] = {
        val strg_name = Option(sval).getOrElse(return None)
        if (change_cnt < 20)  { 
                    change_cnt = change_cnt   1
                     Some(strg_name.multiply(new java.math.BigDecimal("1000")))
        } else {
            Some(strg_name)
        } 
    }

  

Ответ №1:

Прежде всего, функция, используемая как UserDefinedFunction , должна быть как минимум идемпотентной, но оптимально чистой. В противном случае результаты просто недетерминированы. Хотя в последних версиях предусмотрена некоторая панель управления (можно намекнуть Spark, что функцию не следует выполнять повторно), здесь это вам не поможет.

Более того, наличие изменяемого стабильного состояния (не совсем ясно, что является источником change_cnt , но оно одновременно записывается и считывается в udf ) as simply no go — Spark не обеспечивает глобальное изменяемое состояние.

В целом ваш код:

  • Изменяет некоторую локальную копию некоторого объекта.
  • Принимает решение на основе такого объекта.

К сожалению, оба компонента просто не подлежат восстановлению. Вам придется вернуться к этапу планирования и переосмыслить свой дизайн.

Ответ №2:

Ваш фрейм данных представляет собой распределенный набор данных, и попытка выполнить count () возвращает непредсказуемые результаты, поскольку count () может отличаться в каждом узле. Прочитайте документацию о RDDS ниже. Это применимо и к фреймам данных.

https://spark.apache.org/docs/2.3.0/rdd-programming-guide.html#understanding-closures-
https://spark.apache.org/docs/2.3.0/rdd-programming-guide.html#printing-elements-of-an-rdd