Как преобразовать сложный SQL-запрос в spark-dataframe с помощью python или Scala

#python #scala #apache-spark #pyspark #apache-spark-sql

#python #scala #apache-spark #pyspark #apache-spark-sql

Вопрос:

Я выполнил одно преобразование, используя sqlcontext в spark, но тот же запрос, который я хочу написать, используя только фрейм данных Spark. Этот запрос включает в себя операцию объединения плюс оператор case SQL. SQL-запрос, написанный следующим образом:

 refereshLandingData=spark.sql( "select a.Sale_ID, a.Product_ID,"
                           "CASE "
                           "WHEN (a.Quantity_Sold IS NULL) THEN b.Quantity_Sold "
                           "ELSE a.Quantity_Sold "
                           "END AS Quantity_Sold, "
                           "CASE "
                           "WHEN (a.Vendor_ID IS NULL) THEN b.Vendor_ID "
                           "ELSE a.Vendor_ID "
                           "END AS Vendor_ID, "
                           "a.Sale_Date, a.Sale_Amount, a.Sale_Currency "
                           "from landingData a left outer join preHoldData b on a.Sale_ID = b.Sale_ID" )
  

теперь мне нужен равнозначный код в spark dataframe как в scala, так и в python. Я попробовал некоторый код, но он
не работает. мой проверенный код выглядит следующим образом:

 joinDf=landingData.join(preHoldData,landingData['Sale_ID']==preHoldData['Sale_ID'],'left_outer')

joinDf.withColumn
('QuantitySold',pf.when(pf.col(landingData('Quantity_Sold')).isNull(),pf.col(preHoldData('Quantity_Sold')))
.otherwise(pf.when(pf.col(preHoldData('Quantity_Sold')).isNull())),
 pf.col(landingData('Quantity_Sold'))).show()
  

В приведенном выше коде соединение выполнено отлично, но условие case не работает.
Я получаю —> TypeError: объект ‘DataFrame’ не вызывается
Я использую версию spark 2.3.2 и python 3.7 и аналогично scala 2.11 в случае spark-scala
Пожалуйста, кто-нибудь предложит мне какой-либо эквивалентный код или руководство!

Комментарии:

1. проверьте свой код Python, потому что вы пытаетесь вызвать функцию из экземпляра dataframe

Ответ №1:

Вот решение scala: предполагая landingData , что и preHoldData являются вашими фреймами данных

 
 val landingDataDf = landingData.withColumnRenamed("Quantity_Sold","Quantity_Sold_ld")
 val preHoldDataDf = preHoldData.withColumnRenamed("Quantity_Sold","Quantity_Sold_phd")

 val joinDf = landingDataDf.join(preHoldDataDf, Seq("Sale_ID"))


 joinDf
 .withColumn("Quantity_Sold",
    when(col("Quantity_Sold_ld").isNull , col("Quantity_Sold_phd")).otherwise(col("Quantity_Sold_ld"))
 ). drop("Quantity_Sold_ld","Quantity_Sold_phd")

  

Вы можете сделать то же самое для Vendor_id

Проблема с вашим кодом заключается в том, что вы не можете ссылаться на другие / старые имена фреймов данных в withColumn процессе работы. Это должно быть из фрейма данных, с которым вы работаете.

Комментарии:

1. этот код работает нормально, но проблема в том, что оба фрейма данных имеют одинаковые имена столбцов, и мы должны переименовать каждый из этих столбцов, иначе не сможем выбрать требуемый столбец. нам нужен только выбранный столбец, в этом случае не все.

2. @AliBinmazi вы можете легко удалять столбцы. Отредактировал ответ, чтобы удалить ненужные столбцы.

3. теперь этот код работает нормально, когда мы добавляем drop() . спасибо @Sanket9394

4. @AliBinmazi круто. Подумайте о том, чтобы принять его в качестве ответа, если вы нашли его полезным 🙂

Ответ №2:

Приведенный ниже код будет работать на scala, а для python вы можете немного настроить.

 val preHoldData = spark.table("preHoldData").alias("a")
val landingData = spark.table("landingData").alias("b")

landingData.join(preHoldData,Seq("Sale_ID"),"leftouter")
.withColumn("Quantity_Sold",when(col("a.Quantity_Sold").isNull, col("b.Quantity_Sold")).otherwise(col("a.Quantity_Sold")))
.withColumn("Vendor_ID",when(col("a.Vendor_ID").isNull, col("b.Vendor_ID")).otherwise(col("a.Vendor_ID")))
.select(col("a.Sale_ID"),col("a.Product_ID"),col("Quantity_Sold"),col("Vendor_ID"),col("a.Sale_Date"),col("a.Sale_Amount"),col("a.Sale_Currency"))

  

Комментарии:

1. Он работает нормально до функции withcolumn, но когда мы применяем функцию .select, возникает проблема неоднозначности с col («Quantity_Sold»). Я пытался переименовать его, но все еще не работает.

2. Хорошо, проверьте @Sanket9394 его решение.. он четко объяснил … 🙂