#python #scala #apache-spark #pyspark #apache-spark-sql
#python #scala #apache-spark #pyspark #apache-spark-sql
Вопрос:
Я выполнил одно преобразование, используя sqlcontext в spark, но тот же запрос, который я хочу написать, используя только фрейм данных Spark. Этот запрос включает в себя операцию объединения плюс оператор case SQL. SQL-запрос, написанный следующим образом:
refereshLandingData=spark.sql( "select a.Sale_ID, a.Product_ID,"
"CASE "
"WHEN (a.Quantity_Sold IS NULL) THEN b.Quantity_Sold "
"ELSE a.Quantity_Sold "
"END AS Quantity_Sold, "
"CASE "
"WHEN (a.Vendor_ID IS NULL) THEN b.Vendor_ID "
"ELSE a.Vendor_ID "
"END AS Vendor_ID, "
"a.Sale_Date, a.Sale_Amount, a.Sale_Currency "
"from landingData a left outer join preHoldData b on a.Sale_ID = b.Sale_ID" )
теперь мне нужен равнозначный код в spark dataframe как в scala, так и в python. Я попробовал некоторый код, но он
не работает. мой проверенный код выглядит следующим образом:
joinDf=landingData.join(preHoldData,landingData['Sale_ID']==preHoldData['Sale_ID'],'left_outer')
joinDf.withColumn
('QuantitySold',pf.when(pf.col(landingData('Quantity_Sold')).isNull(),pf.col(preHoldData('Quantity_Sold')))
.otherwise(pf.when(pf.col(preHoldData('Quantity_Sold')).isNull())),
pf.col(landingData('Quantity_Sold'))).show()
В приведенном выше коде соединение выполнено отлично, но условие case не работает.
Я получаю —> TypeError: объект ‘DataFrame’ не вызывается
Я использую версию spark 2.3.2 и python 3.7 и аналогично scala 2.11 в случае spark-scala
Пожалуйста, кто-нибудь предложит мне какой-либо эквивалентный код или руководство!
Комментарии:
1. проверьте свой код Python, потому что вы пытаетесь вызвать функцию из экземпляра dataframe
Ответ №1:
Вот решение scala: предполагая landingData
, что и preHoldData
являются вашими фреймами данных
val landingDataDf = landingData.withColumnRenamed("Quantity_Sold","Quantity_Sold_ld")
val preHoldDataDf = preHoldData.withColumnRenamed("Quantity_Sold","Quantity_Sold_phd")
val joinDf = landingDataDf.join(preHoldDataDf, Seq("Sale_ID"))
joinDf
.withColumn("Quantity_Sold",
when(col("Quantity_Sold_ld").isNull , col("Quantity_Sold_phd")).otherwise(col("Quantity_Sold_ld"))
). drop("Quantity_Sold_ld","Quantity_Sold_phd")
Вы можете сделать то же самое для Vendor_id
Проблема с вашим кодом заключается в том, что вы не можете ссылаться на другие / старые имена фреймов данных в withColumn
процессе работы. Это должно быть из фрейма данных, с которым вы работаете.
Комментарии:
1. этот код работает нормально, но проблема в том, что оба фрейма данных имеют одинаковые имена столбцов, и мы должны переименовать каждый из этих столбцов, иначе не сможем выбрать требуемый столбец. нам нужен только выбранный столбец, в этом случае не все.
2. @AliBinmazi вы можете легко удалять столбцы. Отредактировал ответ, чтобы удалить ненужные столбцы.
3. теперь этот код работает нормально, когда мы добавляем drop() . спасибо @Sanket9394
4. @AliBinmazi круто. Подумайте о том, чтобы принять его в качестве ответа, если вы нашли его полезным 🙂
Ответ №2:
Приведенный ниже код будет работать на scala, а для python вы можете немного настроить.
val preHoldData = spark.table("preHoldData").alias("a")
val landingData = spark.table("landingData").alias("b")
landingData.join(preHoldData,Seq("Sale_ID"),"leftouter")
.withColumn("Quantity_Sold",when(col("a.Quantity_Sold").isNull, col("b.Quantity_Sold")).otherwise(col("a.Quantity_Sold")))
.withColumn("Vendor_ID",when(col("a.Vendor_ID").isNull, col("b.Vendor_ID")).otherwise(col("a.Vendor_ID")))
.select(col("a.Sale_ID"),col("a.Product_ID"),col("Quantity_Sold"),col("Vendor_ID"),col("a.Sale_Date"),col("a.Sale_Amount"),col("a.Sale_Currency"))
Комментарии:
1. Он работает нормально до функции withcolumn, но когда мы применяем функцию .select, возникает проблема неоднозначности с col («Quantity_Sold»). Я пытался переименовать его, но все еще не работает.
2. Хорошо, проверьте @Sanket9394 его решение.. он четко объяснил … 🙂