#scala #apache-spark #rdd
Вопрос:
Используя Scala, я пытаюсь объединить 2 RDDS следующего типа:
case class Posting(postingType: Int, id: Int, acceptedAnswer: Option[Int], parentId: Option[QID], tags: Option[String]) extends Serializable
и следующие определения типов, используемые для реализации:
type Question = Posting
type Answer = Posting
type QID = Int
На первом этапе я фильтрую RDD[Публикацию] для выделения подмножеств вопросов и ответов, опираясь на то, что вопросы идентифицируются с помощью идентификатора postTypeId == 1. Ответы с идентификатором == QID имеют (а) Идентификатор postTypeId == 2 и (б) Родительский идентификатор == QID.
Затем я сопоставляю результаты с парами ключ-значение и пытаюсь объединить результаты:
val answersSeq = postings.filter(p => p.postingType == 2)
val answersMap = answersSeq.map(a => (a.parentId.get, a))
val questionsSeq = postings.filter(p => p.postingType == 1)
val questionsMap = questionsSeq.map(p => (p.id, p))
val resultMap = questionsMap.join(answersMap)
И для операции соединения я получаю «StackOverflow»-исключение.
Я подозреваю, что это связано с опцией[QID]. Если это так, то я пока понятия не имею, как это исправить. Или есть другие причины?
Комментарии:
1. Можете ли вы разместить код настройки для данных?