Ошибка приложения потоковой передачи Twitter в Windows 10 при отправке файла python на локальный сервер

#python-2.7 #apache-spark #apache-kafka #spark-streaming

#python-2.7 #apache-искра #апач-кафка #искрящийся поток

Вопрос:

Я пытаюсь запустить потоковое приложение, которое подсчитывает твиты для определенных пользователей. Код производителя:

 # -*- coding: utf-8 -*- import tweepy import json import base64 from kafka import KafkaProducer import kafka.errors  # Twitter API credentials CONSUMER_KEY = "***" CONSUMER_SECRET = "***" ACCESS_TOKEN = "***" ACCESS_TOKEN_SECRET = "***"  # Kafka topic name TOPIC_NAME = "tweet-kafka"  # Kafka server KAFKA_HOST = "localhost" KAFKA_PORT = "9092"  #a list of ids, the actual ids have been hidden in this question ids = ["11111", "222222"]  auth= tweepy.OAuthHandler(CONSUMER_KEY,CONSUMER_SECRET) auth.set_access_token(ACCESS_TOKEN,ACCESS_TOKEN_SECRET)  class KafkaCommunicator:  def __init__(self, producer, topic):  self.producer = producer  self.topic = topic   def send(self, message):  self.producer.send(self.topic, message.encode("utf-8"))   def close(self):  self.producer.close()   class MyStreamListener(tweepy.StreamListener):  """Listener to tweet stream from twitter."""  def __init__(self,communicator,api=None):  super(MyStreamListener,self).__init__()  self.communicator = communicator  self.num_tweets=0   def on_data(self, raw_data):  data = json.loads(raw_data)  #print(data)  if "user" in data:  user_id = data["user"]["id_str"]  if user_id in ids:  print("Time: "   data["created_at"]   "; id: "   user_id   "; screen_name: "   data["user"]["screen_name"] )  # put message into Kafka  self.communicator.send(data["user"]["screen_name"])  return True    def on_error(self, status):  print(status)  return True  def create_communicator():  """Create Kafka producer."""  producer = KafkaProducer(bootstrap_servers=KAFKA_HOST   ":"   KAFKA_PORT)  return KafkaCommunicator(producer, TOPIC_NAME)   def create_stream(communicator):  """Set stream for twitter api with custom listener."""  listener = MyStreamListener(communicator=communicator)  stream =tweepy.Stream(auth,listener)  return stream  def run_processing(stream):  # Start filtering messages  stream.filter(follow=ids)  def main():  communicator = None  tweet_stream = None  try:  communicator = create_communicator()  tweet_stream = create_stream(communicator)  run_processing(tweet_stream)  except KeyboardInterrupt:  pass  except kafka.errors.NoBrokersAvailable:  print("Kafka broker not found.")  finally:  if communicator:  communicator.close()  if tweet_stream:  tweet_stream.disconnect()   if __name__ == "__main__":  main()  

Код потокового приложения:

 # -*- coding: utf-8 -*- import sys import os  spark_path = "D:/spark/spark-2.4.7-bin-hadoop2.7" # spark installed folder os.environ['SPARK_HOME'] = spark_path os.environ['HADOOP_HOME'] = spark_path sys.path.insert(0, spark_path   "/bin") sys.path.insert(0, spark_path   "/python/pyspark/") sys.path.insert(0, spark_path   "/python/lib/pyspark.zip") sys.path.insert(0, spark_path   "/python/lib/py4j-0.10.7-src.zip")  os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 pyspark-shell' os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook" os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ['PYSPARK_PYTHON'] = sys.executable os.environ['PYTHONHASHSEED'] = "0" os.environ['SPARK_YARN_USER_ENV'] = PYTHONHASHSEED = "0"  from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils  SPARK_APP_NAME = "SparkStreamingKafkaTwitter" SPARK_CHECKPOINT_TMP_DIR = "D:/tmp" SPARK_BATCH_INTERVAL = 10 SPARK_LOG_LEVEL = "OFF"  KAFKA_BOOTSTRAP_SERVERS = "localhost:9092" #Default Zookeeper Consumer Address KAFKA_TOPIC = "tweet-kafka"  import json  def create_streaming_context():  """Create Spark streaming context."""  conf = SparkConf().set("spark.executor.memory", "2g")  .set("spark.driver.memory", "2g")  .set("spark.driver.bindAddress", "0.0.0.0")  # Create Spark Context  sc = SparkContext(master = "local[2]", appName=SPARK_APP_NAME, conf = conf)  # Set log level  sc.setLogLevel(SPARK_LOG_LEVEL)  # Create Streaming Context  ssc = StreamingContext(sc, SPARK_BATCH_INTERVAL)  # Sets the context to periodically checkpoint the DStream operations for master  # fault-tolerance. The graph will be checkpointed every batch interval.  # It is used to update results of stateful transformations as well  ssc.checkpoint(SPARK_CHECKPOINT_TMP_DIR)  return ssc  def create_stream(ssc):  """  Create subscriber (consumer) to the Kafka topic (works on RDD that is mini-batch).  """  return (  KafkaUtils.createDirectStream(  ssc, topics=[KAFKA_TOPIC],  kafkaParams={"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS})  .map(lambda x:x[1])  )  def main():   # Init Spark streaming context  ssc = create_streaming_context()   # Get tweets stream  kafka_stream = create_stream(ssc)   # using reduce, count the number of user's tweets for x minute every 30 seconds  # descending sort the result  # Print result    # for 1 minute  tweets_for_1_min = kafka_stream.reduceByKeyAndWindow(lambda x,y: x   y, lambda x,y: x - y, windowDuration=60, slideDuration=30)  sorted_tweets_for_1_min = tweets_for_1_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: x[1], ascending=False))  sorted_tweets_for_1_min.pprint()    # for 10 minute  tweets_for_10_min = kafka_stream.reduceByKeyAndWindow(lambda x,y: x   y, lambda x,y: x - y, windowDuration=600, slideDuration=30)  sorted_tweets_for_10_min = tweets_for_10_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: [1], ascending=False))  sorted_tweets_for_10_min.pprint()   # Start Spark Streaming  ssc.start()   # Waiting for termination  ssc.awaitTermination()   if __name__ == "__main__":  main()   

I have installed the following:

  1. jdk1.8.0_311 and jre1.8.0_311
  2. python 2.7
  3. hadoop-2.7.1 which works properly
  4. spark-2.4.7-bin-hadoop2.7
  5. kafka_2.13-3.0.0 Я правильно установил переменные среды, но во время выполнения после выполнения команды «Отправить» я получаю следующее исключение:
 spark-submit --master local[2] --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 d:task1tweet_kafka_streaming_app.py  

Исключение возникло при обработке потока:

 ------------------------------------------- Time: 2021-12-06 15:28:30 -------------------------------------------  ------------------------------------------- Time: 2021-12-06 15:28:30 -------------------------------------------  Traceback (most recent call last):  File "d:/task1/tweet_kafka_streaming_app.py", line 95, in lt;modulegt;  main()  File "d:/task1/tweet_kafka_streaming_app.py", line 91, in main  ssc.awaitTermination()  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkstreamingcontext.py", line 192, in awaitTermination  varName = k[len("spark.executorEnv."):]  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpy4j-0.10.7-src.zippy4jjava_gateway.py", line 1257, in __call__   File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpy4j-0.10.7-src.zippy4jprotocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o32.awaitTermination. : org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last):  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkstreamingutil.py", line 68, in call  r = self.func(t, *rdds)  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkstreamingdstream.py", line 297, in lt;lambdagt;  func = lambda t, rdd: oldfunc(rdd)  File "d:/task1/tweet_kafka_streaming_app.py", line 79, in lt;lambdagt;  sorted_tweets_for_1_min = tweets_for_1_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: x[1], ascending=False))  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 699, in sortBy  return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 667, in sortByKey  rddSize = self.count()  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 1055, in count  return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 1046, in sum  return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 917, in fold  vals = self.mapPartitions(func).collect()  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 816, in collect  sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpy4j-0.10.7-src.zippy4jjava_gateway.py", line 1257, in __call__  answer, self.gateway_client, self.target_id, self.name)  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpy4j-0.10.7-src.zippy4jprotocol.py", line 328, in get_return_value  format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 20, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 377, in main  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 372, in process  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 2499, in pipeline_func  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 352, in func  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 1861, in combineLocally  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkshuffle.py", line 238, in mergeValues  for k, v in iterator: ValueError: too many values to unpack   at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)  at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRunner.scala:592)  at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRunner.scala:575)  at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)  at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)  at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)  at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:409)  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)  at org.apache.spark.scheduler.Task.run(Task.scala:123)  at org.apache.spark.executor.Executor$TaskRunner$anonfun$10.apply(Executor.scala:408)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)  at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)  at java.lang.Thread.run(Unknown Source)  Driver stacktrace:  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1925)  at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1913)  at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1912)  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912)  at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)  at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948)  at scala.Option.foreach(Option.scala:257)  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948)  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146)  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)  at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49)  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)  at org.apache.spark.rdd.RDD$anonfun$collect$1.apply(RDD.scala:990)  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)  at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)  at org.apache.spark.rdd.RDD.collect(RDD.scala:989)  at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)  at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)  at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)  at java.lang.reflect.Method.invoke(Unknown Source)  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)  at py4j.Gateway.invoke(Gateway.java:282)  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)  at py4j.commands.CallCommand.execute(CallCommand.java:79)  at py4j.GatewayConnection.run(GatewayConnection.java:238)  at java.lang.Thread.run(Unknown Source) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 377, in main  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 372, in process  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 2499, in pipeline_func  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 352, in func  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 1861, in combineLocally  File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkshuffle.py", line 238, in mergeValues  for k, v in iterator: ValueError: too many values to unpack   at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)  at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRunner.scala:592)  at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRunner.scala:575)  at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)  at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)  at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)  at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:409)  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)  at org.apache.spark.scheduler.Task.run(Task.scala:123)  at org.apache.spark.executor.Executor$TaskRunner$anonfun$10.apply(Executor.scala:408)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)  at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)  ... 1 more    at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)  at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)  at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246)  at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:342)  at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:342)  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)  at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:341)  at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:341)  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)  at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:336)  at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:334)  at scala.Option.orElse(Option.scala:289)  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)  at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)  at org.apache.spark.streaming.DStreamGraph$anonfun$1.apply(DStreamGraph.scala:122)  at org.apache.spark.streaming.DStreamGraph$anonfun$1.apply(DStreamGraph.scala:121)  at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241)  at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241)  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)  at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)  at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)  at org.apache.spark.streaming.scheduler.JobGenerator$anonfun$3.apply(JobGenerator.scala:249)  at org.apache.spark.streaming.scheduler.JobGenerator$anonfun$3.apply(JobGenerator.scala:247)  at scala.util.Try$.apply(Try.scala:192)  at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)  at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$processEvent(JobGenerator.scala:183)  at org.apache.spark.streaming.scheduler.JobGenerator$anon$1.onReceive(JobGenerator.scala:89)  at org.apache.spark.streaming.scheduler.JobGenerator$anon$1.onReceive(JobGenerator.scala:88)  at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49)  Exception in thread Thread-4 (most likely raised during interpreter shutdown):  C:WINDOWSsystem32gt;  

Комментарии:

1. В идеале вы должны использовать python3, но ошибка, похоже, заключается в сортировке после окна

2. без сортировки это сработало просто отлично, но все равно я хочу отсортировать результаты

3. @OneCricketeer пожалуйста, не могли бы вы сказать мне, какая конкретно версия python3?

4. Spark поддерживает Python3.4 и выше, так что последняя версия должна быть в порядке. Кроме того, если вы просто делаете все локально, вы также можете попробовать более новые версии Spark. Лично я склонен использовать Структурированную потоковую передачу, поэтому у меня нет никаких предложений, кроме этого

Ответ №1:

Я решил эту проблему благодаря подсказке, данной @OneCricketeer. Я обновил python до версии 3.8, но столкнулся с другими ошибками. Понижение до python 3.7, который поддерживает Spark 2.4.8 или Spark 2.4.7 с Hadoop 2.7, и мой мир снова сияет.