#scala #apache-spark #apache-spark-sql
#scala #apache-spark #apache-spark-sql
Вопрос:
Мне нужно условно изменить значение вложенного поля в фрейме данных (или создать новое поле с вложенными значениями). Я хотел бы сделать это без использования UDF, но я действительно хотел бы избежать RDD / map, поскольку производственные таблицы могут содержать много сотен миллионов записей, и карта в этом состоянии не кажется мне эффективной / быстрой.
Ниже приведен тестовый пример:
case class teste(var testID: Int = 0, var testDesc: String = "", var testValue: String = "")
val DFMain = Seq( ("A",teste(1, "AAA", "10")),("B",teste(2, "BBB", "20")),("C",teste(3, "CCC", "30"))).toDF("F1","F2")
val DFNewData = Seq( ("A",teste(1, "AAA", "40")),("B",teste(2, "BBB", "50")),("C",teste(3, "CCC", "60"))).toDF("F1","F2")
val DFJoined = DFMain.join(DFNewData,DFMain("F2.testID")===DFNewData("F2.testID"),"left").
select(DFMain("F1"), DFMain("F2"), DFNewData("F2.testValue").as("NewValue")).
withColumn("F2.testValue",$"NewValue")
DFJoined.show()
Это добавит новый столбец, но мне нужно, чтобы значение F2.testValue было равно значению newValue внутри структуры, когда оно превышает 50.
Исходные данные:
--- ------------
| F1| F2|
--- ------------
| A|[1, AAA, 10]|
| B|[2, BBB, 20]|
| C|[3, CCC, 30]|
--- ------------
Желаемый результат:
--- ------------
| F1| F2|
--- ------------
| A|[1, AAA, 10]|
| B|[2, BBB, 50]|
| C|[3, CCC, 60]|
--- ------------
Ответ №1:
Не могли бы вы попробовать это.
case class teste(var testID: Int = 0, var testDesc: String = "", var testValue: String = "")
val DFMain = Seq( ("A",teste(1, "AAA", "10")),("B",teste(2, "BBB", "20")),("C",teste(3, "CCC", "30"))).toDF("F1","F2")
DFMain.show(false)
--- ------------
|F1 |F2 |
--- ------------
|A |[1, AAA, 10]|
|B |[2, BBB, 20]|
|C |[3, CCC, 30]|
--- ------------
val DFNewData = Seq( ("A",teste(1, "AAA", "40")),("B",teste(2, "BBB", "50")),("C",teste(3, "CCC", "60"))).toDF("F1","F2")
val DFJoined = DFMain.join(DFNewData,DFMain("F2.testID")===DFNewData("F2.testID"),"left").
select(DFMain("F1"), DFMain("F2"), DFNewData("F2.testValue").as("NewValue"))
.withColumn("F2_testValue",$"NewValue")
DFJoined.show
--- ------------ -------- ------------
| F1| F2|NewValue|F2_testValue|
--- ------------ -------- ------------
| A|[1, AAA, 10]| 40| 40|
| B|[2, BBB, 20]| 50| 50|
| C|[3, CCC, 30]| 60| 60|
--- ------------ -------- ------------
DFJoined.printSchema
root
|-- F1: string (nullable = true)
|-- F2: struct (nullable = true)
| |-- testID: integer (nullable = false)
| |-- testDesc: string (nullable = true)
| |-- testValue: string (nullable = true)
|-- NewValue: string (nullable = true)
|-- F2_testValue: string (nullable = true)
DFJoined.withColumn("f2_new", expr(" case when F2_testValue>=50 then concat_ws('|',F2.testID,F2.testDesc,F2_testValue) else concat_ws('|',F2.testID,F2.testDesc,F2.testValue) end "))
.withColumn("f2_new3",struct(split($"f2_new","[|]")(0),split($"f2_new","[|]")(1),split($"f2_new","[|]")(2) ) )
.show(false)
--- ------------ -------- ------------ -------- ------------
|F1 |F2 |NewValue|F2_testValue|f2_new |f2_new3 |
--- ------------ -------- ------------ -------- ------------
|A |[1, AAA, 10]|40 |40 |1|AAA|10|[1, AAA, 10]|
|B |[2, BBB, 20]|50 |50 |2|BBB|50|[2, BBB, 50]|
|C |[3, CCC, 30]|60 |60 |3|CCC|60|[3, CCC, 60]|
--- ------------ -------- ------------ -------- ------------
f2_new3 — желаемый результат.
Причина обходного пути заключается в том, что приведенный ниже вариант не работает.
DFJoined.withColumn("f2_new", expr(" case when F2_testValue>=50 then struct(F2.testID,F2.testDesc,F2_testValue) else struct(F2.testID,F2.testDesc,F2.testValue) end ")).show()
Комментарии:
1. Большое спасибо, чувак, это похоже на решение, которое я пытался, и я смог смешать оба, чтобы избежать обходного пути. Укажите «выражение» следующим образом: expr(» случай, когда newValue>=50, затем named_struct(‘F2’, named_struct(‘TestID’,F2.TestID,’testDesc’,F2.testDesc,’testValue’,newValue)) else named_struct(‘F2’, named_struct(‘TestID’, F2.TestID,’testDesc’,F2.testDesc,’testValue’,F2.testValue)) конец»)
Ответ №2:
В дополнение к ответу stack0114106, я также нашел это решение проблемы, они более или менее похожи:
val DFFinal = DFJoined.selectExpr("""
named_struct(
'F1', F1,
'F2', named_struct(
'testID', F2.testID,
'testDesc', F2.testDesc,
'testValue', case when NewValue>=50 then NewValue else F2.testValue end
)
) as named_struct
""").select($"named_struct.F1", $"named_struct.F2")