Вызовите ошибку OutOfMemoryError при получении большого входного файла

#apache-spark

#apache-spark

Вопрос:

У меня есть приложение spark, которое считывает файл со 100 миллионами строк (каждая строка содержит код, например US1.234.567B1 ) и извлекает из него некоторые шаблоны следующим образом:

   val codes = sc.textFile("/data/codes.txt")

  def getPattern(code: String) = code.replaceAll("\d", "d")

  val patterns: RDD[(String, Int)] = codes
    .groupBy(getPattern)
    .mapValues(_.size)
    .sortBy(- _._2)

  patterns
    .map { case (pattern, size) => s"$sizet$pattern" }
    .saveAsTextFile("/tmp/patterns")
  

Я запускаю это на master=local[*], и он завершается с java.lang.OutOfMemoryError: GC overhead limit exceeded ошибкой.

Почему это так?

Я думал, что Spark может обрабатывать любой размер входных данных, если на нем достаточно места на жестком диске.

Комментарии:

1.два документа, объясняющих, почему следует избегать groupBy: databricks.gitbooks.io/databricks-spark-knowledge-base/content/… github.com/awesome-spark/spark-gotchas

Ответ №1:

Короче говоря, вы пытаетесь использовать анти-шаблон Spark:

 .groupBy(getPattern)
.mapValues(_.size)
  

это можно легко выразить, например, как:

 codes.keyBy(getPattern).mapValues(_ => 1L).reduceByKey(_   _).sortBy(_._2, false)
  

Я думал, что Spark может обрабатывать любой размер ввода.

Обычно это может масштабироваться до тех пор, пока вы не сделаете это невозможным. group / groupByKey на RDDS создайте локальные коллекции для каждого ключа. Каждый из них должен находиться в памяти одного исполнителя.

Ответ №2:

Да, spark может обрабатывать очень большие файлы, но единицей параллелизма является исполнитель. «Ошибка нехватки памяти» вызвана тем, что памяти spark executor или памяти spark driver недостаточно. Пожалуйста, попробуйте увеличить spark.executor.memory и spark.driver.memory, а также настройте количество исполнителей перед отправкой задания.

Вы можете установить эти значения в файле свойств или в SparkConf или непосредственно в командной строке во время отправки spark-submit. Ссылка http://spark.apache.org/docs/latest/configuration.html