#scala #apache-spark
#scala #apache-spark
Вопрос:
Я работал над ETL для Spark с использованием Scala, в этом ETL я хочу добавить 3 аргумента, чтобы соответственно определить repartionBy
, partitionBy
, orderBy
для записи моего фрейма данных в хранилище. Однако эти аргументы должны быть необязательными.
Я действительно не хочу писать ужасное if...else
утверждение, которое принимало бы любую комбинацию из 8 возможностей.
У меня есть функция:
def writer(
outputFormat: String,
outputFile: String,
outputMode: SaveMode,
outputRepartionBy: String,
outputParitionBy: String,
outputOrderBy: String,
dryRun: Boolean = false
)(df: DataFrame): Unit = {
if (dryRun){
df.show(500, false)
}else{
if (outputFormat == "parquet" || outputFormat == "orc" ) {
df.write.format(outputFormat).mode(outputMode).save( outputFile )
} else {
df.write.format(outputFormat).save(outputFile)
}
}
}
Можно ли было бы сделать что-то вроде :
df.write
.if( outputRepartionBy != null ){ repartitionby( outputRepartionBy ) }
.format( outputFormat )
.mode(outputMode)
.save( outputFile )
Будет ли это правильным способом цепочки функции, если условие выполнено, и если нет, есть ли такие возможности в scala / spark?
Редактировать: я использую Spark 2.3.1 с Scala 2.11.12
Комментарии:
1. Какую версию Spark вы используете?
2. Смотрите мою правку @kfkhalili
Ответ №1:
вы можете сделать что-то вроде
val temp=df.write.format(outputformat)
val writer =if ( outputRepartionBy != null ) temp.repartitionby(outputRepartitionBy) else temp
writer.mode(outputMode).save(outputFile)
Комментарии:
1. это был всего лишь пример, я должен был добавить инструкции orderby и partitionby к моему примеру, я понимаю, что вы имеете в виду. Однако я немного разочарован scala в этом, я бы поспорил, что существовало что-то более приятное
Ответ №2:
Я использовал это сообщение в блоге для достижения желаемой логики, оно выглядит намного приятнее и очень аккуратно.
sealed class ConditionalApplicative[T] private (val value: T) { // if condition class wrapper
class ElseApplicative(value: T, elseCondition: Boolean) extends ConditionalApplicative[T](value) {
// else condition class wrapper extends ConditionalApplicative to avoid double wrapping
// in case: $if(condition1) { .. }. $if(condition2) { .. }
def $else(f: T => T): T = if(elseCondition) f(value) else value
}
// if method for external scope condition
def $if(condition: => Boolean)(f: T => T): ElseApplicative =
if(condition) new ElseApplicative(f(value), false)
else new ElseApplicative(value, true)
// if method for internal scope condition
def $if(condition: T => Boolean) (f: T => T): ElseApplicative =
if(condition(value)) new ElseApplicative(f(value), false)
else new ElseApplicative(value, true)
}
object ConditionalApplicative { // Companion object for using ConditionalApplicative[T] generic
implicit def lift2ConditionalApplicative[T](any: T): ConditionalApplicative[T] =
new ConditionalApplicative(any)
implicit def else2T[T](els: ConditionalApplicative[T]#ElseApplicative): T =
els.value
}
импортируя это в мой метод, я могу сделать что-то подобное :
def writer(outputFormat: String, outputFile: String, outputMode: SaveMode, outputRepartionBy: String,outputParitionBy :String, outputOrderBy :String, dryRun: Boolean = false)(df: DataFrame): Unit = {
import etl.tool.ConditionalApplicative._
if (dryRun){
df.show(500, false)
}else{
if (outputFormat == "parquet" | outputFormat == "orc" ) {
df
.$if(outputOrderBy != null){
_.orderBy(col(outputOrderBy))
}.$if(outputRepartionBy != null){
_.repartition(col(outputRepartionBy))
}.write.format(outputFormat).mode(outputMode)
.$if(outputParitionBy != null){
_.partitionBy(outputParitionBy)
}.save( outputFile )
} else {
df.write.format(outputFormat).save(outputFile)
}
}
}
Результирующий код говорит сам за себя. Хотя мое понимание базовой логики ограничено.
Комментарии:
1. @Yuriy Я использовал ваш код, надеюсь, вы не слишком возражаете