Настройка группировки Spark

#apache-spark #apache-spark-sql #spark-dataframe #bigdata

#apache-spark #apache-spark-sql #большие данные

Вопрос:

Даны следующие файлы:

  • сотрудники
  • навыки
  • отчеты
  • и т.д.

Между сотрудниками и каждым из других файлов существует соотношение 1 к N, например, одному сотруднику соответствуют несколько навыков. Размер каждого файла составляет от 500 МБ до 1,5 ГБ, всего около 10 файлов. Для каждого сотрудника я хочу объединить / собрать всю информацию из всех файлов (навыки, отчеты и т.д.) И записать ее в структуру XML:

 <employees>
  <employee>
    <skills>
      <skill>...</skill>
      <skill>...</skill>
      ...
    </skills>
    <reports
      <report>...</report>
      <report>...</report>
      ...
    </reports>
    ...
  </employee>
  ...
</employees>
  

Я делаю что-то вроде:

 val employeesRdd = employeesDf.map(r => (r.getAs[String]("employeeId"), r))
val skillsRdd = skillsDf.map(r => (r.getAs[String]("employeeId"), r)).groupByKey()
val reportsRdd = reportsDf.map(r => (r.getAs[String]("employeeId"), r)).groupByKey()
...

employeesRdd
  .leftOuterJoin(skillsRdd)
  .leftOuterJoin(reportsRdd)
  ...
  .toLocalIterator
  ... // write <employee> nodes one by one
  

Проблема, с которой я сталкиваюсь, заключается в том, что все операции groupByKey () выполняются очень медленно, например, в течение многих часов. И после столь долгого запуска он взрывается из-за java.lang.Ошибка OutOfMemoryError: превышен лимит накладных расходов GC. Я использую Spark 1.5.1 в локальном режиме, при этом для jvm выделено около 20 ГБ.

Ответ №1:

разделение фрейма данных Spark должно быть лучшим выбором для вас.

разбивка на разделы поможет сохранить данные для соответствующей информации в ближайшее время. таким образом, это помогает процессу быстрого доступа к необходимой информации.

официальный документ , документ