Pyspark: сброс соединения с помощью toLocalIterator

#python #apache-spark #pyspark

#python #apache-spark #pyspark

Вопрос:

Я пытаюсь выполнить локальную итерацию по данным rdd, используя цикл, подобный for row in rdd.toLocalIterator(): в pyspark, и я получаю эту ошибку:

 19/03/21 17:01:36 ERROR PythonRDD: Error while sending iterator
java.net.SocketException: Connection reset
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
        at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:515)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:527)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:527)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:527)
        at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:728)
        at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:728)
        at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:728)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1340)
        at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:729)
  

Это хорошо работает, когда я использую rdd.collect() для небольшой таблицы, но использует слишком много памяти для больших наборов данных.

Я уже пытался увеличить spark.driver.memory , spark.executor.memory и spark.executor.heartbeatInterval .

версия Spark: 2.2.1

Как я могу это исправить?

Это минимальный код, который я использую:

 for source_row in rdd.toLocalIterator():
    print(source_row)
  

У меня такая же ошибка с:

 iterator = rdd.toLocalIterator()
next(iterator)
  

Комментарии:

1. Сколько разделов у вас на данный момент?

2. 180 разделов.

3. Если вы считаете, что один из 180 разделов может быть достаточно большим, чтобы поместиться в памяти драйвера, тогда вы могли бы увеличить номер раздела до 250 или даже 500.

4. Я попытался увеличить его, но у меня все еще есть ошибка. Соединение сбрасывается через 7 секунд.

5. Не могли бы вы опубликовать полный код или наиболее значительную его часть, в которой задействован toLocalIterator?