#apache-spark #pyspark #spark-streaming
#apache-spark #pyspark #spark-streaming
Вопрос:
У меня есть статический файл (log_file) с некоторыми записями 10K на моем локальном диске (Windows). Структура следующая.
"date","time","size","r_version","r_arch","r_os","package","version","country","ip_id"
"2012-10-01","00:30:13",35165,"2.15.1","i686","linux-gnu","quadprog","1.5-4","AU",1
Я хочу прочитать записи этого журнала, используя текстовый поток сокета с пакетным интервалом в 10 секунд, а позже мне нужно выполнить несколько операций spark либо с вычислением RDD, либо с вычислением DF. Я прочитал приведенный ниже код только для того, чтобы прочитать данные за интервал времени, разделить их в форме RDD и показать.
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
conf = SparkConf().setMaster("local[*]").setAppName("Assignment4")
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 10)
data = ssc.socketTextStream("file:///SparkL2/log_file.txt",2222)
linesrdd = data.map(lambda x: x.split(","))
linesrdd.pprint()
ssc.start()
ssc.awaitTermination()
Я сохранил этот код и выполнил отправку spark из командной строки Anaconda. Я столкнулся с ошибкой в функции socketTextStream, вероятно, потому, что я неправильно ее использую.
(base) PS C:UsersHP> cd c:SparkL2
(base) PS C:SparkL2> spark-submit Assignment5.py
20/09/09 21:42:42 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.net.UnknownHostException: file:///SparkL2/log_file.txt
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:196)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)
at java.net.Socket.connect(Socket.java:606)
at java.net.Socket.connect(Socket.java:555)
at java.net.Socket.<init>(Socket.java:451)
at java.net.Socket.<init>(Socket.java:228)
at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint.$anonfun$startReceiver$1(ReceiverTracker.scala:596)
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint.$anonfun$startReceiver$1$adapted(ReceiverTracker.scala:586)
at org.apache.spark.SparkContext.$anonfun$submitJob$1(SparkContext.scala:2242)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Может ли кто-нибудь помочь мне в этом. Я очень новичок в pyspark и хочу изучить его самостоятельно.
Комментарии:
1. В коде есть пара проблем: 1) Заменить
.setMaster("local")
на.setMaster("local[*]")
, чтобы использовать как минимум 2 ядра процессора, 2) Вместо этого использовать Spark Structured Streaming . Можете ли вы отредактировать свой вопрос и поделиться ошибкой?2. Привет @JacekLaskowski, я добавил код ошибки. Я думаю, что приведенная ниже строка написана неправильно. У вас есть какие-либо подсказки, чтобы помочь мне. data = ssc.socketTextStream(«file:///SparkL2/log_file.txt «,2222)