Spark и scala: цепочка функций, если

#scala #apache-spark

#scala #apache-spark

Вопрос:

Я работал над ETL для Spark с использованием Scala, в этом ETL я хочу добавить 3 аргумента, чтобы соответственно определить repartionBy , partitionBy , orderBy для записи моего фрейма данных в хранилище. Однако эти аргументы должны быть необязательными.

Я действительно не хочу писать ужасное if...else утверждение, которое принимало бы любую комбинацию из 8 возможностей.
У меня есть функция:

 def writer(
  outputFormat: String,
  outputFile: String,
  outputMode: SaveMode,
  outputRepartionBy: String,
  outputParitionBy: String,
  outputOrderBy: String,
  dryRun: Boolean = false
)(df: DataFrame): Unit = {

        if (dryRun){
            df.show(500, false)
        }else{
            if (outputFormat == "parquet" || outputFormat == "orc" ) {
                df.write.format(outputFormat).mode(outputMode).save( outputFile )
            } else {
                df.write.format(outputFormat).save(outputFile)
            }
        }
    }
 

Можно ли было бы сделать что-то вроде :

 df.write
.if( outputRepartionBy != null ){ repartitionby( outputRepartionBy ) }
.format( outputFormat )
.mode(outputMode)
.save( outputFile )
 

Будет ли это правильным способом цепочки функции, если условие выполнено, и если нет, есть ли такие возможности в scala / spark?

Редактировать: я использую Spark 2.3.1 с Scala 2.11.12

Комментарии:

1. Какую версию Spark вы используете?

2. Смотрите мою правку @kfkhalili

Ответ №1:

вы можете сделать что-то вроде

 val temp=df.write.format(outputformat)
val writer =if ( outputRepartionBy != null )  temp.repartitionby(outputRepartitionBy)  else temp
writer.mode(outputMode).save(outputFile)
 

Комментарии:

1. это был всего лишь пример, я должен был добавить инструкции orderby и partitionby к моему примеру, я понимаю, что вы имеете в виду. Однако я немного разочарован scala в этом, я бы поспорил, что существовало что-то более приятное

Ответ №2:

Я использовал это сообщение в блоге для достижения желаемой логики, оно выглядит намного приятнее и очень аккуратно.

 sealed class ConditionalApplicative[T] private (val value: T) { // if condition class wrapper
   class ElseApplicative(value: T, elseCondition: Boolean) extends ConditionalApplicative[T](value) {
   // else condition class wrapper extends ConditionalApplicative to avoid double wrapping 
   // in case: $if(condition1) { .. }. $if(condition2) { .. }
      def $else(f: T => T): T = if(elseCondition) f(value) else value
   }

   // if method for external scope condition
   def $if(condition: => Boolean)(f: T => T): ElseApplicative =
      if(condition) new ElseApplicative(f(value), false) 
      else new ElseApplicative(value, true)

   // if method for internal scope condition
   def $if(condition: T => Boolean) (f: T => T): ElseApplicative =
      if(condition(value)) new ElseApplicative(f(value), false) 
      else new ElseApplicative(value, true) 
}

object ConditionalApplicative { // Companion object for using ConditionalApplicative[T] generic
   implicit def lift2ConditionalApplicative[T](any: T): ConditionalApplicative[T] =
      new ConditionalApplicative(any)

   implicit def else2T[T](els: ConditionalApplicative[T]#ElseApplicative): T =
      els.value
}  
 

импортируя это в мой метод, я могу сделать что-то подобное :

  def writer(outputFormat: String, outputFile: String, outputMode: SaveMode, outputRepartionBy: String,outputParitionBy :String, outputOrderBy :String, dryRun: Boolean = false)(df: DataFrame): Unit = {
        import etl.tool.ConditionalApplicative._
        if (dryRun){
            df.show(500, false)
        }else{
            if (outputFormat == "parquet" | outputFormat == "orc" ) {
                df
                    .$if(outputOrderBy != null){
                        _.orderBy(col(outputOrderBy))
                    }.$if(outputRepartionBy != null){
                        _.repartition(col(outputRepartionBy))
                    }.write.format(outputFormat).mode(outputMode)
                    .$if(outputParitionBy != null){
                        _.partitionBy(outputParitionBy)
                    }.save( outputFile )

            } else {
                df.write.format(outputFormat).save(outputFile)
            }
        }
    }
 

Результирующий код говорит сам за себя. Хотя мое понимание базовой логики ограничено.

Комментарии:

1. @Yuriy Я использовал ваш код, надеюсь, вы не слишком возражаете