Apache spark работает с операторами case

#apache-spark #pyspark #spark-dataframe #rdd #pyspark-sql

#apache-spark #pyspark #spark-фрейм данных #rdd #pyspark-sql

Вопрос:

Я имею дело с преобразованием кода SQL в код PySpark и наткнулся на некоторые инструкции SQL. Я не знаю, как подойти к статьям case в pyspark? Я планирую создать RDD, а затем использовать rdd.map, а затем выполнить некоторые логические проверки. Это правильный подход? Пожалуйста, помогите!

По сути, мне нужно просмотреть каждую строку в RDD или DF и, исходя из некоторой логики, мне нужно отредактировать одно из значений столбца.

      case  
               when (e."a" Like 'a%' Or e."b" Like 'b%') 
                And e."aa"='BW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitA'

               when (e."a" Like 'b%' Or e."b" Like 'a%') 
                And e."aa"='AW' And cast(e."abc" as decimal(10,4))=75.0 Then 'callitB'

else

'CallitC'
  

Комментарии:

1. Зачем вам нужно что-либо конвертировать? PySpark может отлично запускать SparkSQL

2. Потому что это длинный оператор SQL case (20 строк). я бы предпочел сделать это прагматично, используя некоторую логику.

3. Вы могли бы использовать pyspark.sql.functions.when() . Не уверен, как это обрабатывает несколько случаев, хотя

4. вы могли бы записать все это как логику в функции map. вы пробовали это?

Ответ №1:

Это несколько способов записи If-Else / When-Then-Else / When-Otherwise выражения pyspark .

Пример фрейма данных

 df = spark.createDataFrame([(1,1),(2,2),(3,3)],['id','value'])

df.show()

# --- ----- 
#| id|value|
# --- ----- 
#|  1|    1|
#|  2|    2|
#|  3|    3|
# --- ----- 

#Desired Output:
# --- ----- ---------- 
#| id|value|value_desc|
# --- ----- ---------- 
#|  1|    1|       one|
#|  2|    2|       two|
#|  3|    3|     other|
# --- ----- ---------- 
  

Вариант №1: withColumn() использование when-в противном случае

 from pyspark.sql.functions import when

df.withColumn("value_desc",when(df.value == 1, 'one').when(df.value == 2, 'two').otherwise('other')).show()
  

Вариант №2: select() использование when-в противном случае

 from pyspark.sql.functions import when

df.select("*",when(df.value == 1, 'one').when(df.value == 2, 'two').otherwise('other').alias('value_desc')).show()
  

Вариант 3: selectExpr() использование SQL-эквивалентного выражения CASE

 df.selectExpr("*","CASE WHEN value == 1 THEN  'one' WHEN value == 2 THEN  'two' ELSE 'other' END AS value_desc").show()
  

Выражение, подобное SQL, также может быть записано withColumn() и select() с использованием функции pyspark.sql.functions.expr. Вот примеры.

Вариант 4: select() использование функции expr

 from pyspark.sql.functions import expr 

df.select("*",expr("CASE WHEN value == 1 THEN  'one' WHEN value == 2 THEN  'two' ELSE 'other' END AS value_desc")).show()
  

Вариант 5: withColumn() использование функции expr

 from pyspark.sql.functions import expr 

df.withColumn("value_desc",expr("CASE WHEN value == 1 THEN  'one' WHEN value == 2 THEN  'two' ELSE 'other' END AS value_desc")).show()
  

Вывод:

 # --- ----- ---------- 
#| id|value|value_desc|
# --- ----- ---------- 
#|  1|    1|       one|
#|  2|    2|       two|
#|  3|    3|     other|
# --- ----- ---------- 
  

Ответ №2:

Я не силен в python. Но я попытаюсь дать некоторые указания на то, что я сделал в scala.

Вопрос: rdd.map а затем выполните некоторые логические проверки. Это правильный подход?

Это один из подходов.

withColumn другой подход

DataFrame.withColumn метод в PySpark поддерживает добавление нового столбца или замену существующих столбцов с тем же именем.

В этом контексте вам приходится иметь дело с Column синтаксисом via — spark udf или когда в противном случае

например :

 from pyspark.sql import functions as F
df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()


 ----- -------------------------------------------------------- 
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0|
 ----- -------------------------------------------------------- 
|Alice|                                                      -1|
|  Bob|                                                       1|
 ----- -------------------------------------------------------- 


from pyspark.sql import functions as F
df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()

 ----- --------------------------------- 
| name|CASE WHEN (age > 3) THEN 1 ELSE 0|
 ----- --------------------------------- 
|Alice|                                0|
|  Bob|                                1|
 ----- --------------------------------- 
  

вы также можете использовать udf вместо when otherwise as.

Комментарии:

1. если вы согласны с ответом! пожалуйста, примите в качестве владельца.

2. если вы в порядке, пожалуйста, примите ответ в качестве владельца и проголосуйте