оптимизация набора данных внешнего соединения shuffle spark

#scala #apache-spark #join

#scala #apache-spark #Присоединиться

Вопрос:

Я использую Spark 2.1 с API DataFrames для выполнения :

 import org.apache.spark.sql.Encoders
import java.security.MessageDigest
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.{Dataset, Encoders, SaveMode, SparkSession}

case class C(id_1: String, id_2: String, a: Option[Int], b: String)

val schema = Encoders.product[C]

val data1 = Seq(
  ("d1_r0", "d1_t0", 1, "yyy"),
  ("d1_r1", "d1_t1", 2, "xxx"),
  ("d2_r2", "d2_t2", 3, "ppp"),
  ("d1_r3", "d1_t3", 4, "iii")
)

val df1 = data1.toDF("id_1", "id_2", "a", "b")
val ds1: Dataset[C] = df1.as(schema)

val data2 = Seq(
  ("d2_r0", "d2_t0", 1, "lll"),
  ("d1_r1", "d1_t1", 2, "mmm"),
  ("d2_r2", "d2_t2", 3, "ppp"),
  ("d2_r3", "d2_t3", 4, "nnn")
)

val df2 = data2.toDF("id_1", "id_2", "a", "b")
val ds2: Dataset[C] = df2.as(schema)

def getMD5Hash(x: C): String = {
  val str = (x.id_1   x.id_2   x.a   x.b)
  val msgDigest: MessageDigest = MessageDigest.getInstance("MD5")
  val MD5Hash = msgDigest
    .digest(str.getBytes())
    .map(0xff amp; _)
    .map { "x".format(_) }
    .foldLeft("") { _   _ }
  MD5Hash
}
def u(newV: C, oldV: C): Seq[C] = {
  Seq(C(oldV.id_1, oldV.id_2, oldV.a, newV.b))
}
def uOrI(b: String)(row: (C, C)): Seq[C] = {
  row match {
    case (newV, null) => Seq(newV)
    case (null, oldV) => Seq(C(oldV.id_1, oldV.id_2, oldV.a, b))
    case (newV, oldV) => {
      if (getMD5Hash(newV) == getMD5Hash(oldV)) Seq(oldV)
      else u(newV, oldV)
    }
  }
}

val df3 = ds1
  .joinWith(
    ds2,
    $"_1.id_1" === $"_2.id_1" amp;amp; $"_1.id_2" === $"_2.id_2","full_outer"
  ).flatMap(uOrI("jjjjjjjj"))

 

программа работает и выдает то, что я ожидаю, но в реальном наборе данных (более 1 миллиона строк для df1 и df2) решение выполняется очень медленно, 30 минут для завершения в кластерной цепочке с 10 узлами (16 cpu 128G ram каждый).

есть другое решение / идея сделать это для оптимизации перемешивания и времени?

Комментарии:

1. видите ли вы какие-либо возможности для широковещательных соединений?

2. насколько я знаю, трансляция рекомендуется, когда у нас есть маленький df и большой df для объединения, нет?

3. вы правы. По сути, мой вопрос касался ваших размеров фреймов данных и посмотрите, можете ли вы использовать широковещательное соединение, если позволяет размер вашего фрейма данных.