#scala #apache-spark #join
#scala #apache-spark #Присоединиться
Вопрос:
Я использую Spark 2.1 с API DataFrames для выполнения :
import org.apache.spark.sql.Encoders
import java.security.MessageDigest
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.{Dataset, Encoders, SaveMode, SparkSession}
case class C(id_1: String, id_2: String, a: Option[Int], b: String)
val schema = Encoders.product[C]
val data1 = Seq(
("d1_r0", "d1_t0", 1, "yyy"),
("d1_r1", "d1_t1", 2, "xxx"),
("d2_r2", "d2_t2", 3, "ppp"),
("d1_r3", "d1_t3", 4, "iii")
)
val df1 = data1.toDF("id_1", "id_2", "a", "b")
val ds1: Dataset[C] = df1.as(schema)
val data2 = Seq(
("d2_r0", "d2_t0", 1, "lll"),
("d1_r1", "d1_t1", 2, "mmm"),
("d2_r2", "d2_t2", 3, "ppp"),
("d2_r3", "d2_t3", 4, "nnn")
)
val df2 = data2.toDF("id_1", "id_2", "a", "b")
val ds2: Dataset[C] = df2.as(schema)
def getMD5Hash(x: C): String = {
val str = (x.id_1 x.id_2 x.a x.b)
val msgDigest: MessageDigest = MessageDigest.getInstance("MD5")
val MD5Hash = msgDigest
.digest(str.getBytes())
.map(0xff amp; _)
.map { "x".format(_) }
.foldLeft("") { _ _ }
MD5Hash
}
def u(newV: C, oldV: C): Seq[C] = {
Seq(C(oldV.id_1, oldV.id_2, oldV.a, newV.b))
}
def uOrI(b: String)(row: (C, C)): Seq[C] = {
row match {
case (newV, null) => Seq(newV)
case (null, oldV) => Seq(C(oldV.id_1, oldV.id_2, oldV.a, b))
case (newV, oldV) => {
if (getMD5Hash(newV) == getMD5Hash(oldV)) Seq(oldV)
else u(newV, oldV)
}
}
}
val df3 = ds1
.joinWith(
ds2,
$"_1.id_1" === $"_2.id_1" amp;amp; $"_1.id_2" === $"_2.id_2","full_outer"
).flatMap(uOrI("jjjjjjjj"))
программа работает и выдает то, что я ожидаю, но в реальном наборе данных (более 1 миллиона строк для df1 и df2) решение выполняется очень медленно, 30 минут для завершения в кластерной цепочке с 10 узлами (16 cpu 128G ram каждый).
есть другое решение / идея сделать это для оптимизации перемешивания и времени?
Комментарии:
1. видите ли вы какие-либо возможности для широковещательных соединений?
2. насколько я знаю, трансляция рекомендуется, когда у нас есть маленький df и большой df для объединения, нет?
3. вы правы. По сути, мой вопрос касался ваших размеров фреймов данных и посмотрите, можете ли вы использовать широковещательное соединение, если позволяет размер вашего фрейма данных.