Python PythonRDD.Сбор и сохранение текстового файла ошибки() .фильтр() .счетчик()

#java #python-3.x #pyspark

Вопрос:

Вопрос и наблюдение

Имейте простое sparklog.txt файл для чтения с помощью Spark.textfile(), а затем выполните основные операции, такие как .count() и .filter() и .first(). Используя записную книжку jupyter. Python 3.7. Pyspark 2.4 установлен в Anaconda. Какова правильная настройка для вызова Spark, PySpark и Python .count(), .filter(), .first() ? Похоже, что python PythonRDD Collectandserv из Java неправильно вызывается в этом базовом коде Python.

Рабочий Код

Мой код работает для .collect().

 from pyspark import SparkContext sc = SparkContext() logFile = "file:\UsersmatthDesktopsparklog.txt" logData = sc.textFile(logFile).cache() logData.collect()  

Это работает для перечисления основных данных журнала, см. Ниже.

 ['17/06/09 20:10:40 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT]',  '17/06/09 20:10:40 INFO spark.SecurityManager: Changing view acls to: yarn,curi',  '17/06/09 20:10:40 INFO spark.SecurityManager: Changing modify acls to: yarn,curi',  '17/06/09 20:10:40 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, curi); users with modify permissions: Set(yarn, curi)',  '17/06/09 20:10:41 INFO spark.SecurityManager: Changing view acls to: yarn,curi',  '17/06/09 20:10:41 INFO spark.SecurityManager: Changing modify acls to: yarn,curi',  '17/06/09 20:10:41 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, curi); users with modify permissions: Set(yarn, curi)',  '17/06/09 20:10:41 INFO slf4j.Slf4jLogger: Slf4jLogger started',  '17/06/09 20:10:41 INFO Remoting: Starting remoting',  '17/06/09 20:10:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutorActorSystem@mesos-slave-07:55904]',  "17/06/09 20:10:41 INFO util.Utils: Successfully started service 'sparkExecutorActorSystem' on port 55904.",  '17/06/09 20:10:41 INFO storage.DiskBlockManager: Created local directory at /opt/hdfs/nodemanager/usercache/curi/appcache/application_1485248649253_0147/blockmgr-70293f72-844a-4b39-9ad6-fb0ad7e364e4',  '17/06/09 20:10:41 INFO storage.MemoryStore: MemoryStore started with capacity 17.7 GB',  '17/06/09 20:10:42 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@10.10.34.11:48069',  '17/06/09 20:10:42 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver',  '17/06/09 20:10:42 INFO executor.Executor: Starting executor ID 5 on host mesos-slave-07',  "17/06/09 20:10:42 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40984.",  

Код Не Работает

Не работает для .count() , .filter() и .first().

 numAs = logData.filter(lambda s: 'a' in s).count() logData.count() logData.first()  

Возникло сообщение об ошибке

Для последних команд .filter (),. count (),. first () появилось следующее сообщение об ошибке:

 Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 14, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.  at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)  at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)  at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)  at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)  at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)  at org.apache.spark.scheduler.Task.run(Task.scala:121)  at org.apache.spark.executor.Executor$TaskRunner$anonfun$10.apply(Executor.scala:402)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)  at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)  at java.lang.Thread.run(Unknown Source) Caused by: java.net.SocketTimeoutException: Accept timed out  at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)  at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)  at java.net.AbstractPlainSocketImpl.accept(Unknown Source)  at java.net.PlainSocketImpl.accept(Unknown Source)  at java.net.ServerSocket.implAccept(Unknown Source)  at java.net.ServerSocket.accept(Unknown Source)  at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)  ... 14 more  Driver stacktrace:  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1887)  at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)  at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)  at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)  at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)  at scala.Option.foreach(Option.scala:257)  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)  at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49)  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)  at org.apache.spark.rdd.RDD$anonfun$collect$1.apply(RDD.scala:945)  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)  at org.apache.spark.rdd.RDD.collect(RDD.scala:944)  at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)  at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)  at java.lang.reflect.Method.invoke(Unknown Source)  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)  at py4j.Gateway.invoke(Gateway.java:282)  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)  at py4j.commands.CallCommand.execute(CallCommand.java:79)  at py4j.GatewayConnection.run(GatewayConnection.java:238)  at java.lang.Thread.run(Unknown Source) Caused by: org.apache.spark.SparkException: Python worker failed to connect back.  at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)  at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)  at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)  at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)  at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)  at org.apache.spark.scheduler.Task.run(Task.scala:121)  at org.apache.spark.executor.Executor$TaskRunner$anonfun$10.apply(Executor.scala:402)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)  at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)  ... 1 more Caused by: java.net.SocketTimeoutException: Accept timed out  at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)  at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)  at java.net.AbstractPlainSocketImpl.accept(Unknown Source)  at java.net.PlainSocketImpl.accept(Unknown Source)  at java.net.ServerSocket.implAccept(Unknown Source)  at java.net.ServerSocket.accept(Unknown Source)  at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)  ...