Классификатор случайного леса Spark выдает java.lang.Ошибка OutOfMemoryError во время обучения

#scala #apache-spark

#scala #apache-spark

Вопрос:

Я пытаюсь выполнить многоклассовую классификацию в spark. У меня есть 300 000 предопределенных наборов классификаций. При преобразовании данных нет никаких проблем, но когда я пытаюсь обучить модель, у меня заканчивается память. Как я могу решить эту проблему?

 object Test {

  var num = 50
  var savePath = "c:/Temp/SparkModel/"
  var stemmer = Resha.Instance

  var STOP_WORDS: Set[String] = Set()

  def cropSentence(s: String) = {
    s.replaceAll("\([^\)]*\)", "")
      .replaceAll(" - ", " ")
      .replaceAll("-", " ")
      .replaceAll("   ", " ")
      .replaceAll(",", " ").trim()
  }

  def main(args: Array[String]): Unit = {

    val sc = new SparkConf().setAppName("Test").setMaster("local[*]")
      .set("spark.sql.warehouse.dir", "D:/Temp/wh")
      .set("spark.executor.memory", "12g")
      .set("spark.driver.memory", "4g")
      .set("spark.hadoop.validateOutputSpecs", "false")

    val spark = SparkSession.builder.appName("Java Spark").config(sc).getOrCreate()
    import spark.implicits._

    val mainDataset = spark.sparkContext.textFile("file:///C:/Temp/classifications.csv")
      .map( _.split(";"))
      .map(tokens => {      
         var list=new ListBuffer[String]()
      var token0=cropSentence(tokens(0).toLowerCase(Locale.forLanguageTag("TR-tr")));      
      token0.split("\s ").map {list =stemmer.stem(_)}   
      (tokens(1), tokens(0),list.toList.mkString(" "))
      }).toDF("className","productNameOrg","productName")


    val classIndexer = new StringIndexer()
      .setInputCol("className")
      .setOutputCol("label")

    val classIndexerModel = classIndexer.fit(mainDataset)
    var mainDS=classIndexerModel.transform(mainDataset)
    classIndexerModel.write.overwrite.save(savePath   "ClassIndexer")

    //Tokenizer
              val tokenizer = new Tokenizer()                                
                           .setInputCol("productName")                     
                           .setOutputCol("words_nonfiltered")
    //StopWords
              val remover = new StopWordsRemover()
                             .setInputCol("words_nonfiltered")
                             .setOutputCol("words")
                             .setStopWords( Array[String]("stop1","stop2","stop3"))
    //CountVectorize

              val countVectorizer = new CountVectorizer()
                             .setInputCol("words")
                             .setOutputCol("features")

              val  rfc = new RandomForestClassifier ()                          
                      .setLabelCol("label")
                      .setNumTrees(50)
                      .setMaxDepth(15)
                      .setFeatureSubsetStrategy("auto")
                      .setFeaturesCol("features")
                      .setImpurity("gini")
                      .setMaxBins(32)


           val pipeline = new Pipeline().setStages(Array(tokenizer,remover,countVectorizer,rfc))
           val train =mainDS
           val model = pipeline.fit(train) <============= OOM
           model.write.overwrite.save(savePath "RandomForestClassifier")

  }
}
  

Ошибка:

 16/10/21 00:54:23 INFO ExternalAppendOnlyMap: Thread 101 spilling in-memory map of 2.9 GB to disk (1 time so far)
16/10/21 00:56:58 INFO ExternalAppendOnlyMap: Thread 98 spilling in-memory map of 2.7 GB to disk (2 times so far)
16/10/21 00:57:05 INFO ExternalAppendOnlyMap: Thread 101 spilling in-memory map of 2.7 GB to disk (2 times so far)
Exception in thread "shuffle-server-0" java.lang.OutOfMemoryError: Java heap space
16/10/21 01:02:37 WARN SingleThreadEventExecutor: Unexpected exception from an event executor: 
java.lang.OutOfMemoryError: Java heap space
16/10/21 01:02:43 WARN TaskMemoryManager: leak 1575.8 MB memory from org.apache.spark.util.collection.ExternalAppendOnlyMap@18f269e7
16/10/21 01:02:42 ERROR Utils: uncaught error in thread Spark Context Cleaner, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
    at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:176)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1249)
    at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:172)
    at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:67)
16/10/21 01:02:37 WARN NioEventLoop: Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
16/10/21 01:02:44 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 126580 ms exceeds timeout 120000 ms
16/10/21 01:03:56 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 126580 ms
16/10/21 01:03:58 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 25)
java.lang.OutOfMemoryError: Java heap space
Exception in thread "Spark Context Cleaner" java.lang.OutOfMemoryError: Java heap space
Exception in thread "Executor task launch worker-4" java.lang.OutOfMemoryError: Java heap space
16/10/21 01:06:00 WARN TaskSetManager: Lost task 1.0 in stage 12.0 (TID 26, localhost): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 126580 ms
16/10/21 01:06:00 ERROR TaskSetManager: Task 1 in stage 12.0 failed 1 times; aborting job
  

Комментарии:

1. Вы запускаете его в режиме кластера или в режиме клиента?

2. Работает в клиентском режиме с 8 ядрами.

Ответ №1:

Обычно это происходит, когда память вашего драйвера не настроена должным образом.

Что вы делаете неправильно, так это то, что вы передаете 4g в качестве памяти драйвера и устанавливаете его из Spark Conf , но, как указано в документации, это не будет работать в режиме клиента, следовательно, вы должны передать это объяснение при отправке приложения.

Конфигурацию смотрите здесь:https://spark.apache.org/docs/1.6.1/configuration.html#available-properties

Комментарии:

1. Я передаю этот параметр с помощью spark-submit и получаю ту же ошибку в строке: val model = pipeline.fit(train); <== ООМ