Функция внутренней карты в Spark

#scala #apache-spark

#scala #apache-spark

Вопрос:

У меня есть два RDD :

 RDD1[String, Double]
  

пример данных :

 ("a" , 1.0)
("b" , 2.0)
("c" , 3.0)
("d" , 4.0)
  

Это соответствует паре ключ-значение.


 RDD2[String , (String , String)
  

пример данных :

 ("a" , ("b" , "c"))
("b" , ("a" , "b"))
("c" , ("a" , "d"))
("d" , ("a" , "b"))
  

RDD1 содержит значения, требуемые RDD2

Поэтому я хочу иметь возможность доступа к значениям из RDD2 в RDD1, таким как :

 ("a" , ("b" , "c")) will map to ("a" , (2.0 , 3.0))
  

2.0 и 3.0 — соответствующие значения в RDD1

Как я могу добиться этого с помощью Scala Spark? Возможные решения — преобразовать RDD1 в HashMap, а затем просто «получить» значения в операции отображения RDD2 :

 RDD2.map(m => RDD1HashMap.get(m._2._1))
  

Есть ли альтернативный метод для достижения этой цели?

Комментарии:

1. это зависит от того, насколько велик RDD1, если он маленький, это должна быть хэш-карта, если нет, вы, вероятно, можете достичь этого с помощью объединения

Ответ №1:

Если RDD1 она мала, вы обязательно должны иметь ее в хэш-карте, которую вы используете в качестве широковещательной переменной (предположим, что все, что меньше 10 миллионов, должно быть в порядке). Если нет, у вас есть два варианта.

  1. используйте поиск PairRDDFunction, это может быть крайне неэффективным / незаконным (хотя локально он работал нормально).

    RDD1.cache()
    RDD2.map(m => RDD1.lookup(m._2._1))

  2. Второй вариант несколько сложнее, вам нужно выполнить два объединения (spark по-прежнему не поддерживает одновременное объединение более 2 наборов данных)

    val joinedDataSet = RDD2.map((k,v)=> (v._1,(k,v._2))).
    join(RDD1).map((k,v)=>(v._1._2,(v._2,v._1._1))).
    join(RDD2).map((k,v)=>(v._1._2(v._1._1,v._2)))

Это должен быть набор данных, который вы хотели, я понимаю, что RDD чрезвычайно запутан, вы можете захотеть использовать классы case и / или выполнить два объединения отдельно, а затем объединить эти RDD вместе, чтобы сделать его более понятным (если немного менее эффективным). Также заметил, что по какой-то причине scala не может выполнить вывод типа для лямбд, я думаю, я бы попробовал один из 2 других вариантов, прежде чем прибегать к этому.