#apache-spark
#apache-spark
Вопрос:
У меня есть приложение spark, которое считывает файл со 100 миллионами строк (каждая строка содержит код, например US1.234.567B1
) и извлекает из него некоторые шаблоны следующим образом:
val codes = sc.textFile("/data/codes.txt")
def getPattern(code: String) = code.replaceAll("\d", "d")
val patterns: RDD[(String, Int)] = codes
.groupBy(getPattern)
.mapValues(_.size)
.sortBy(- _._2)
patterns
.map { case (pattern, size) => s"$sizet$pattern" }
.saveAsTextFile("/tmp/patterns")
Я запускаю это на master=local[*], и он завершается с java.lang.OutOfMemoryError: GC overhead limit exceeded
ошибкой.
Почему это так?
Я думал, что Spark может обрабатывать любой размер входных данных, если на нем достаточно места на жестком диске.
Комментарии:
1.два документа, объясняющих, почему следует избегать groupBy: databricks.gitbooks.io/databricks-spark-knowledge-base/content/… github.com/awesome-spark/spark-gotchas
Ответ №1:
Короче говоря, вы пытаетесь использовать анти-шаблон Spark:
.groupBy(getPattern)
.mapValues(_.size)
это можно легко выразить, например, как:
codes.keyBy(getPattern).mapValues(_ => 1L).reduceByKey(_ _).sortBy(_._2, false)
Я думал, что Spark может обрабатывать любой размер ввода.
Обычно это может масштабироваться до тех пор, пока вы не сделаете это невозможным. group
/ groupByKey
на RDDS создайте локальные коллекции для каждого ключа. Каждый из них должен находиться в памяти одного исполнителя.
Ответ №2:
Да, spark может обрабатывать очень большие файлы, но единицей параллелизма является исполнитель. «Ошибка нехватки памяти» вызвана тем, что памяти spark executor или памяти spark driver недостаточно. Пожалуйста, попробуйте увеличить spark.executor.memory и spark.driver.memory, а также настройте количество исполнителей перед отправкой задания.
Вы можете установить эти значения в файле свойств или в SparkConf или непосредственно в командной строке во время отправки spark-submit. Ссылка http://spark.apache.org/docs/latest/configuration.html