#java #python-3.x #pyspark
Вопрос:
Вопрос и наблюдение
Имейте простое sparklog.txt файл для чтения с помощью Spark.textfile(), а затем выполните основные операции, такие как .count() и .filter() и .first(). Используя записную книжку jupyter. Python 3.7. Pyspark 2.4 установлен в Anaconda. Какова правильная настройка для вызова Spark, PySpark и Python .count(), .filter(), .first() ? Похоже, что python PythonRDD Collectandserv из Java неправильно вызывается в этом базовом коде Python.
Рабочий Код
Мой код работает для .collect().
from pyspark import SparkContext sc = SparkContext() logFile = "file:\UsersmatthDesktopsparklog.txt" logData = sc.textFile(logFile).cache() logData.collect()
Это работает для перечисления основных данных журнала, см. Ниже.
['17/06/09 20:10:40 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT]', '17/06/09 20:10:40 INFO spark.SecurityManager: Changing view acls to: yarn,curi', '17/06/09 20:10:40 INFO spark.SecurityManager: Changing modify acls to: yarn,curi', '17/06/09 20:10:40 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, curi); users with modify permissions: Set(yarn, curi)', '17/06/09 20:10:41 INFO spark.SecurityManager: Changing view acls to: yarn,curi', '17/06/09 20:10:41 INFO spark.SecurityManager: Changing modify acls to: yarn,curi', '17/06/09 20:10:41 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, curi); users with modify permissions: Set(yarn, curi)', '17/06/09 20:10:41 INFO slf4j.Slf4jLogger: Slf4jLogger started', '17/06/09 20:10:41 INFO Remoting: Starting remoting', '17/06/09 20:10:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutorActorSystem@mesos-slave-07:55904]', "17/06/09 20:10:41 INFO util.Utils: Successfully started service 'sparkExecutorActorSystem' on port 55904.", '17/06/09 20:10:41 INFO storage.DiskBlockManager: Created local directory at /opt/hdfs/nodemanager/usercache/curi/appcache/application_1485248649253_0147/blockmgr-70293f72-844a-4b39-9ad6-fb0ad7e364e4', '17/06/09 20:10:41 INFO storage.MemoryStore: MemoryStore started with capacity 17.7 GB', '17/06/09 20:10:42 INFO executor.CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@10.10.34.11:48069', '17/06/09 20:10:42 INFO executor.CoarseGrainedExecutorBackend: Successfully registered with driver', '17/06/09 20:10:42 INFO executor.Executor: Starting executor ID 5 on host mesos-slave-07', "17/06/09 20:10:42 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40984.",
Код Не Работает
Не работает для .count() , .filter() и .first().
numAs = logData.filter(lambda s: 'a' in s).count() logData.count() logData.first()
Возникло сообщение об ошибке
Для последних команд .filter (),. count (),. first () появилось следующее сообщение об ошибке:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 1 times, most recent failure: Lost task 0.0 in stage 7.0 (TID 14, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source) at java.net.AbstractPlainSocketImpl.accept(Unknown Source) at java.net.PlainSocketImpl.accept(Unknown Source) at java.net.ServerSocket.implAccept(Unknown Source) at java.net.ServerSocket.accept(Unknown Source) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164) ... 14 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1887) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1875) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1874) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046) at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$anonfun$collect$1.apply(RDD.scala:945) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:944) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Unknown Source) Caused by: org.apache.spark.SparkException: Python worker failed to connect back. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ... 1 more Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source) at java.net.AbstractPlainSocketImpl.accept(Unknown Source) at java.net.PlainSocketImpl.accept(Unknown Source) at java.net.ServerSocket.implAccept(Unknown Source) at java.net.ServerSocket.accept(Unknown Source) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164) ...