Как читать из статического файла, используя текстовый поток сокета с пакетным интервалом в 10 секунд в Spark с Python?

#apache-spark #pyspark #spark-streaming

#apache-spark #pyspark #spark-streaming

Вопрос:

У меня есть статический файл (log_file) с некоторыми записями 10K на моем локальном диске (Windows). Структура следующая.

 "date","time","size","r_version","r_arch","r_os","package","version","country","ip_id"
"2012-10-01","00:30:13",35165,"2.15.1","i686","linux-gnu","quadprog","1.5-4","AU",1
  

Я хочу прочитать записи этого журнала, используя текстовый поток сокета с пакетным интервалом в 10 секунд, а позже мне нужно выполнить несколько операций spark либо с вычислением RDD, либо с вычислением DF. Я прочитал приведенный ниже код только для того, чтобы прочитать данные за интервал времени, разделить их в форме RDD и показать.

 from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext

conf = SparkConf().setMaster("local[*]").setAppName("Assignment4")
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 10)

data = ssc.socketTextStream("file:///SparkL2/log_file.txt",2222)
                            
linesrdd = data.map(lambda x: x.split(","))

linesrdd.pprint()
ssc.start()
ssc.awaitTermination()
  

Я сохранил этот код и выполнил отправку spark из командной строки Anaconda. Я столкнулся с ошибкой в функции socketTextStream, вероятно, потому, что я неправильно ее использую.

 (base) PS C:UsersHP> cd c:SparkL2
(base) PS C:SparkL2> spark-submit Assignment5.py
20/09/09 21:42:42 ERROR ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.net.UnknownHostException: file:///SparkL2/log_file.txt
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:196)
        at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)
        at java.net.Socket.connect(Socket.java:606)
        at java.net.Socket.connect(Socket.java:555)
        at java.net.Socket.<init>(Socket.java:451)
        at java.net.Socket.<init>(Socket.java:228)
        at org.apache.spark.streaming.dstream.SocketReceiver.onStart(SocketInputDStream.scala:61)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
        at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint.$anonfun$startReceiver$1(ReceiverTracker.scala:596)
        at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint.$anonfun$startReceiver$1$adapted(ReceiverTracker.scala:586)
        at org.apache.spark.SparkContext.$anonfun$submitJob$1(SparkContext.scala:2242)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
  

Может ли кто-нибудь помочь мне в этом. Я очень новичок в pyspark и хочу изучить его самостоятельно.

Комментарии:

1. В коде есть пара проблем: 1) Заменить .setMaster("local") на .setMaster("local[*]") , чтобы использовать как минимум 2 ядра процессора, 2) Вместо этого использовать Spark Structured Streaming . Можете ли вы отредактировать свой вопрос и поделиться ошибкой?

2. Привет @JacekLaskowski, я добавил код ошибки. Я думаю, что приведенная ниже строка написана неправильно. У вас есть какие-либо подсказки, чтобы помочь мне. data = ssc.socketTextStream(«file:///SparkL2/log_file.txt «,2222)