Scala изменяет значение вложенного столбца

#scala #apache-spark #apache-spark-sql

#scala #apache-spark #apache-spark-sql

Вопрос:

Мне нужно условно изменить значение вложенного поля в фрейме данных (или создать новое поле с вложенными значениями). Я хотел бы сделать это без использования UDF, но я действительно хотел бы избежать RDD / map, поскольку производственные таблицы могут содержать много сотен миллионов записей, и карта в этом состоянии не кажется мне эффективной / быстрой.

Ниже приведен тестовый пример:

 
case class teste(var testID: Int  = 0, var testDesc: String = "", var testValue: String = "")


val DFMain = Seq( ("A",teste(1, "AAA", "10")),("B",teste(2, "BBB", "20")),("C",teste(3, "CCC", "30"))).toDF("F1","F2")
val DFNewData = Seq( ("A",teste(1, "AAA", "40")),("B",teste(2, "BBB", "50")),("C",teste(3, "CCC", "60"))).toDF("F1","F2")

val DFJoined = DFMain.join(DFNewData,DFMain("F2.testID")===DFNewData("F2.testID"),"left").
        select(DFMain("F1"), DFMain("F2"), DFNewData("F2.testValue").as("NewValue")).
        withColumn("F2.testValue",$"NewValue")

DFJoined.show()

  

Это добавит новый столбец, но мне нужно, чтобы значение F2.testValue было равно значению newValue внутри структуры, когда оно превышает 50.

Исходные данные:

  --- ------------ 
| F1|          F2|
 --- ------------ 
|  A|[1, AAA, 10]|
|  B|[2, BBB, 20]|
|  C|[3, CCC, 30]|
 --- ------------ 
  

Желаемый результат:

  --- ------------ 
| F1|          F2|
 --- ------------ 
|  A|[1, AAA, 10]|
|  B|[2, BBB, 50]|
|  C|[3, CCC, 60]|
 --- ------------ 
  

Ответ №1:

Не могли бы вы попробовать это.

 case class teste(var testID: Int  = 0, var testDesc: String = "", var testValue: String = "")
val DFMain = Seq( ("A",teste(1, "AAA", "10")),("B",teste(2, "BBB", "20")),("C",teste(3, "CCC", "30"))).toDF("F1","F2")
DFMain.show(false)

 --- ------------ 
|F1 |F2          |
 --- ------------ 
|A  |[1, AAA, 10]|
|B  |[2, BBB, 20]|
|C  |[3, CCC, 30]|
 --- ------------ 

val DFNewData = Seq( ("A",teste(1, "AAA", "40")),("B",teste(2, "BBB", "50")),("C",teste(3, "CCC", "60"))).toDF("F1","F2")

val DFJoined = DFMain.join(DFNewData,DFMain("F2.testID")===DFNewData("F2.testID"),"left").
        select(DFMain("F1"), DFMain("F2"), DFNewData("F2.testValue").as("NewValue"))
  .withColumn("F2_testValue",$"NewValue")

DFJoined.show

 --- ------------ -------- ------------ 
| F1|          F2|NewValue|F2_testValue|
 --- ------------ -------- ------------ 
|  A|[1, AAA, 10]|      40|          40|
|  B|[2, BBB, 20]|      50|          50|
|  C|[3, CCC, 30]|      60|          60|
 --- ------------ -------- ------------ 

DFJoined.printSchema

root
 |-- F1: string (nullable = true)
 |-- F2: struct (nullable = true)
 |    |-- testID: integer (nullable = false)
 |    |-- testDesc: string (nullable = true)
 |    |-- testValue: string (nullable = true)
 |-- NewValue: string (nullable = true)
 |-- F2_testValue: string (nullable = true)

DFJoined.withColumn("f2_new", expr(" case when F2_testValue>=50 then concat_ws('|',F2.testID,F2.testDesc,F2_testValue) else concat_ws('|',F2.testID,F2.testDesc,F2.testValue) end "))
.withColumn("f2_new3",struct(split($"f2_new","[|]")(0),split($"f2_new","[|]")(1),split($"f2_new","[|]")(2) ) )
.show(false)

 --- ------------ -------- ------------ -------- ------------ 
|F1 |F2          |NewValue|F2_testValue|f2_new  |f2_new3     |
 --- ------------ -------- ------------ -------- ------------ 
|A  |[1, AAA, 10]|40      |40          |1|AAA|10|[1, AAA, 10]|
|B  |[2, BBB, 20]|50      |50          |2|BBB|50|[2, BBB, 50]|
|C  |[3, CCC, 30]|60      |60          |3|CCC|60|[3, CCC, 60]|
 --- ------------ -------- ------------ -------- ------------ 
  

f2_new3 — желаемый результат.

Причина обходного пути заключается в том, что приведенный ниже вариант не работает.

 DFJoined.withColumn("f2_new", expr(" case when F2_testValue>=50 then struct(F2.testID,F2.testDesc,F2_testValue) else struct(F2.testID,F2.testDesc,F2.testValue) end ")).show()
  

Комментарии:

1. Большое спасибо, чувак, это похоже на решение, которое я пытался, и я смог смешать оба, чтобы избежать обходного пути. Укажите «выражение» следующим образом: expr(» случай, когда newValue>=50, затем named_struct(‘F2’, named_struct(‘TestID’,F2.TestID,’testDesc’,F2.testDesc,’testValue’,newValue)) else named_struct(‘F2’, named_struct(‘TestID’, F2.TestID,’testDesc’,F2.testDesc,’testValue’,F2.testValue)) конец»)

Ответ №2:

В дополнение к ответу stack0114106, я также нашел это решение проблемы, они более или менее похожи:

 
val DFFinal = DFJoined.selectExpr("""
        named_struct(
          'F1', F1,
          'F2', named_struct(
            'testID', F2.testID,
            'testDesc', F2.testDesc,
            'testValue', case when NewValue>=50 then NewValue else F2.testValue end
            )
        ) as named_struct
      """).select($"named_struct.F1", $"named_struct.F2")