#java #apache-spark #join #apache-spark-sql #left-join
#java #apache-spark #Присоединиться #apache-spark-sql #левое соединение
Вопрос:
У меня есть 3 набора данных из 3 таблиц:
Dataset<TABLE1> bbdd_one = map.get("TABLE1").as(Encoders.bean(TABLE1.class)).alias("TABLE1");
Dataset<TABLE2> bbdd_two = map.get("TABLE2").as(Encoders.bean(TABLE2.class)).alias("TABLE2");
Dataset<TABLE3> bbdd_three = map.get("TABLE3").as(Encoders.bean(TABLE3.class)).alias("TABLE3");
и я хочу выполнить тройное левое соединение с ним и записать его в output .parquet
Оператор sql JOIN похож на этот:
SELECT one.field, ........, two.field ....., three.field, ... four.field
FROM TABLE1 one
LEFT JOIN TABLE2 two ON two.field = one.field
LEFT JOIN TABLE3 three ON three.field = one.field AND three.field = one.field
LEFT JOIN TABLE3 four ON four.field = one.field AND four.field = one.otherfield
WHERE one.field = 'whatever'
Как это можно сделать с помощью JAVA API? Возможно ли это? Я сделал пример только с одним соединением, но с 3 кажется сложным.
PS: Мое другое соединение с JAVA API:
Dataset<TJOINED> ds_joined = ds_table1
.join(ds_table2,
JavaConversions.asScalaBuffer(Arrays.asList("fieldInCommon1", "fieldInCommon2", "fieldInCommon3", "fieldInCommon4"))
.seq(),
"inner")
.select("a lot of fields", ... "more fields")
.as(Encoders.bean(TJOINED.class));
Спасибо!
Ответ №1:
Вы пробовали связывать операторы соединения? Я не часто пишу код на Java, так что это всего лишь предположение
Dataset<TJOINED> ds_joined = ds_table1
.join(
ds_table2,
JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
"left"
)
.join(
ds_table3,
JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
"left"
)
.join(
ds_table4,
JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
"left"
)
.select(...)
.as(Encoders.bean(TJOINED.class))
Обновление: если я правильно понимаю, ds_table3
и ds_table4
они одинаковы, и они объединены в разных полях. Тогда, возможно, этот обновленный ответ, который дан в Scala, поскольку это то, с чем я привык работать, может достичь того, чего вы хотите. Вот полный рабочий пример:
import spark.implicits._
case class TABLE1(f1: Int, f2: Int, f3: Int, f4: Int, f5:Int)
case class TABLE2(f1: Int, f2: Int, vTable2: Int)
case class TABLE3(f3: Int, f4: Int, vTable3: Int)
val one = spark.createDataset[TABLE1](Seq(TABLE1(1,2,3,4,5), TABLE1(1,3,4,5,6)))
//one.show()
// --- --- --- --- ---
//| f1| f2| f3| f4| f5|
// --- --- --- --- ---
//| 1| 2| 3| 4| 5|
//| 1| 3| 4| 5| 6|
// --- --- --- --- ---
val two = spark.createDataset[TABLE2](Seq(TABLE2(1,2,20)))
//two.show()
// --- --- -------
//| f1| f2|vTable2|
// --- --- -------
//| 1| 2| 20|
// --- --- -------
val three = spark.createDataset[TABLE3](Seq(TABLE3(3,4,20), TABLE3(3,5,50)))
//three.show()
// --- --- -------
//| f3| f4|vTable3|
// --- --- -------
//| 3| 4| 20|
//| 3| 5| 50|
// --- --- -------
val result = one
.join(two, Seq("f1", "f2"), "left")
.join(three, Seq("f3", "f4"), "left")
.join(
three.withColumnRenamed("f4", "f5").withColumnRenamed("vTable3", "vTable4"),
Seq("f3", "f5"),
"left"
)
//result.show()
// --- --- --- --- --- ------- ------- -------
//| f3| f5| f4| f1| f2|vTable2|vTable3|vTable4|
// --- --- --- --- --- ------- ------- -------
//| 3| 5| 4| 1| 2| 20| 20| 50|
//| 4| 6| 5| 1| 3| null| null| null|
// --- --- --- --- --- ------- ------- -------
Комментарии:
1. ваш подход хорош, но проблема заключается во 2-м и 3-м СОЕДИНЕНИИ (одна и та же таблица) и, в частности, в следующем: LEFT JOIN TABLE3 три НА три. поле = один.поле И три. поле = один.поле (имя совпадает) ЛЕВОЕ СОЕДИНЕНИЕ ТАБЛИЦА3 четыре НА четыре. поле = один.поле И четыре. field = one.otherfield (имя главной таблицы не совпадает с именем ТАБЛИЦЫ3)
2. выдает следующее исключение: БЕЗЫМЯННЫЙ с исключением org.apache.spark.sql.AnalysisException: ИСПОЛЬЗОВАНИЕ столбца
one.otherfield
не может быть разрешено в правой части соединения3. Я предполагаю, что у нас будет что-то вроде этого: ТАБЛИЦА1 (f1, f2, f3, f4, f5) ТАБЛИЦА2 (f1, f2) ТАБЛИЦА3 (f3, f4) затем при 3-м объединении вы хотите сделать что-то вроде:
LEFT JOIN TABLE3 four on four.f3 = one.f3 and four.f4 = one.f5
я думаю, этого можно достичь путем временного сохранения результата первых 2 объединенийи выполните последнее. Я привык работать с Dataset<Строка>, поэтому я не совсем понимаю, как все работает с этими типами данных4. Ваши ответы сработали!, извините за опоздание, но большое спасибо!!