3 СЛЕВА -СОЕДИНЕНИЕ в Spark SQL с API JAVA

#java #apache-spark #join #apache-spark-sql #left-join

#java #apache-spark #Присоединиться #apache-spark-sql #левое соединение

Вопрос:

У меня есть 3 набора данных из 3 таблиц:

 Dataset<TABLE1> bbdd_one = map.get("TABLE1").as(Encoders.bean(TABLE1.class)).alias("TABLE1");
Dataset<TABLE2> bbdd_two = map.get("TABLE2").as(Encoders.bean(TABLE2.class)).alias("TABLE2");
Dataset<TABLE3> bbdd_three = map.get("TABLE3").as(Encoders.bean(TABLE3.class)).alias("TABLE3");
 

и я хочу выполнить тройное левое соединение с ним и записать его в output .parquet

Оператор sql JOIN похож на этот:

 SELECT one.field, ........, two.field ....., three.field, ... four.field
FROM TABLE1 one
LEFT JOIN TABLE2 two ON two.field = one.field
LEFT JOIN TABLE3 three ON three.field = one.field AND three.field = one.field
LEFT JOIN TABLE3 four ON four.field = one.field AND four.field = one.otherfield
WHERE one.field = 'whatever'
 

Как это можно сделать с помощью JAVA API? Возможно ли это? Я сделал пример только с одним соединением, но с 3 кажется сложным.

PS: Мое другое соединение с JAVA API:

 Dataset<TJOINED> ds_joined = ds_table1
                        .join(ds_table2,
                                JavaConversions.asScalaBuffer(Arrays.asList("fieldInCommon1", "fieldInCommon2", "fieldInCommon3", "fieldInCommon4"))
                                        .seq(),
                                "inner")
                        .select("a lot of fields", ... "more fields")                                                               
                        .as(Encoders.bean(TJOINED.class));
 

Спасибо!

Ответ №1:

Вы пробовали связывать операторы соединения? Я не часто пишу код на Java, так что это всего лишь предположение

 Dataset<TJOINED> ds_joined = ds_table1
    .join(
        ds_table2,
        JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
        "left"
    )
    .join(
        ds_table3,
        JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
        "left"
    )
    .join(
        ds_table4,
        JavaConversions.asScalaBuffer(Arrays.asList(...)).seq(),
        "left"
    )
    .select(...)
    .as(Encoders.bean(TJOINED.class))
 

Обновление: если я правильно понимаю, ds_table3 и ds_table4 они одинаковы, и они объединены в разных полях. Тогда, возможно, этот обновленный ответ, который дан в Scala, поскольку это то, с чем я привык работать, может достичь того, чего вы хотите. Вот полный рабочий пример:

 import spark.implicits._

case class TABLE1(f1: Int, f2: Int, f3: Int, f4: Int, f5:Int)
case class TABLE2(f1: Int, f2: Int, vTable2: Int)
case class TABLE3(f3: Int, f4: Int, vTable3: Int)

val one = spark.createDataset[TABLE1](Seq(TABLE1(1,2,3,4,5), TABLE1(1,3,4,5,6)))
//one.show()
// --- --- --- --- --- 
//| f1| f2| f3| f4| f5|
// --- --- --- --- --- 
//|  1|  2|  3|  4|  5|
//|  1|  3|  4|  5|  6|
// --- --- --- --- --- 

val two = spark.createDataset[TABLE2](Seq(TABLE2(1,2,20)))
//two.show()
// --- --- ------- 
//| f1| f2|vTable2|
// --- --- ------- 
//|  1|  2|     20|
// --- --- ------- 

val three = spark.createDataset[TABLE3](Seq(TABLE3(3,4,20), TABLE3(3,5,50)))
//three.show()
// --- --- ------- 
//| f3| f4|vTable3|
// --- --- ------- 
//|  3|  4|     20|
//|  3|  5|     50|
// --- --- ------- 

val result = one
.join(two, Seq("f1", "f2"), "left")
.join(three, Seq("f3", "f4"), "left")
.join(
  three.withColumnRenamed("f4", "f5").withColumnRenamed("vTable3", "vTable4"),
  Seq("f3", "f5"),
  "left"
)
//result.show()
// --- --- --- --- --- ------- ------- ------- 
//| f3| f5| f4| f1| f2|vTable2|vTable3|vTable4|
// --- --- --- --- --- ------- ------- ------- 
//|  3|  5|  4|  1|  2|     20|     20|     50|
//|  4|  6|  5|  1|  3|   null|   null|   null|
// --- --- --- --- --- ------- ------- ------- 
 

Комментарии:

1. ваш подход хорош, но проблема заключается во 2-м и 3-м СОЕДИНЕНИИ (одна и та же таблица) и, в частности, в следующем: LEFT JOIN TABLE3 три НА три. поле = один.поле И три. поле = один.поле (имя совпадает) ЛЕВОЕ СОЕДИНЕНИЕ ТАБЛИЦА3 четыре НА четыре. поле = один.поле И четыре. field = one.otherfield (имя главной таблицы не совпадает с именем ТАБЛИЦЫ3)

2. выдает следующее исключение: БЕЗЫМЯННЫЙ с исключением org.apache.spark.sql.AnalysisException: ИСПОЛЬЗОВАНИЕ столбца one.otherfield не может быть разрешено в правой части соединения

3. Я предполагаю, что у нас будет что-то вроде этого: ТАБЛИЦА1 (f1, f2, f3, f4, f5) ТАБЛИЦА2 (f1, f2) ТАБЛИЦА3 (f3, f4) затем при 3-м объединении вы хотите сделать что-то вроде: LEFT JOIN TABLE3 four on four.f3 = one.f3 and four.f4 = one.f5 я думаю, этого можно достичь путем временного сохранения результата первых 2 объединенийи выполните последнее. Я привык работать с Dataset<Строка>, поэтому я не совсем понимаю, как все работает с этими типами данных

4. Ваши ответы сработали!, извините за опоздание, но большое спасибо!!