#python-2.7 #apache-spark #apache-kafka #spark-streaming
#python-2.7 #apache-искра #апач-кафка #искрящийся поток
Вопрос:
Я пытаюсь запустить потоковое приложение, которое подсчитывает твиты для определенных пользователей. Код производителя:
# -*- coding: utf-8 -*- import tweepy import json import base64 from kafka import KafkaProducer import kafka.errors # Twitter API credentials CONSUMER_KEY = "***" CONSUMER_SECRET = "***" ACCESS_TOKEN = "***" ACCESS_TOKEN_SECRET = "***" # Kafka topic name TOPIC_NAME = "tweet-kafka" # Kafka server KAFKA_HOST = "localhost" KAFKA_PORT = "9092" #a list of ids, the actual ids have been hidden in this question ids = ["11111", "222222"] auth= tweepy.OAuthHandler(CONSUMER_KEY,CONSUMER_SECRET) auth.set_access_token(ACCESS_TOKEN,ACCESS_TOKEN_SECRET) class KafkaCommunicator: def __init__(self, producer, topic): self.producer = producer self.topic = topic def send(self, message): self.producer.send(self.topic, message.encode("utf-8")) def close(self): self.producer.close() class MyStreamListener(tweepy.StreamListener): """Listener to tweet stream from twitter.""" def __init__(self,communicator,api=None): super(MyStreamListener,self).__init__() self.communicator = communicator self.num_tweets=0 def on_data(self, raw_data): data = json.loads(raw_data) #print(data) if "user" in data: user_id = data["user"]["id_str"] if user_id in ids: print("Time: " data["created_at"] "; id: " user_id "; screen_name: " data["user"]["screen_name"] ) # put message into Kafka self.communicator.send(data["user"]["screen_name"]) return True def on_error(self, status): print(status) return True def create_communicator(): """Create Kafka producer.""" producer = KafkaProducer(bootstrap_servers=KAFKA_HOST ":" KAFKA_PORT) return KafkaCommunicator(producer, TOPIC_NAME) def create_stream(communicator): """Set stream for twitter api with custom listener.""" listener = MyStreamListener(communicator=communicator) stream =tweepy.Stream(auth,listener) return stream def run_processing(stream): # Start filtering messages stream.filter(follow=ids) def main(): communicator = None tweet_stream = None try: communicator = create_communicator() tweet_stream = create_stream(communicator) run_processing(tweet_stream) except KeyboardInterrupt: pass except kafka.errors.NoBrokersAvailable: print("Kafka broker not found.") finally: if communicator: communicator.close() if tweet_stream: tweet_stream.disconnect() if __name__ == "__main__": main()
Код потокового приложения:
# -*- coding: utf-8 -*- import sys import os spark_path = "D:/spark/spark-2.4.7-bin-hadoop2.7" # spark installed folder os.environ['SPARK_HOME'] = spark_path os.environ['HADOOP_HOME'] = spark_path sys.path.insert(0, spark_path "/bin") sys.path.insert(0, spark_path "/python/pyspark/") sys.path.insert(0, spark_path "/python/lib/pyspark.zip") sys.path.insert(0, spark_path "/python/lib/py4j-0.10.7-src.zip") os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 pyspark-shell' os.environ['PYSPARK_DRIVER_PYTHON_OPTS']= "notebook" os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ['PYSPARK_PYTHON'] = sys.executable os.environ['PYTHONHASHSEED'] = "0" os.environ['SPARK_YARN_USER_ENV'] = PYTHONHASHSEED = "0" from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils SPARK_APP_NAME = "SparkStreamingKafkaTwitter" SPARK_CHECKPOINT_TMP_DIR = "D:/tmp" SPARK_BATCH_INTERVAL = 10 SPARK_LOG_LEVEL = "OFF" KAFKA_BOOTSTRAP_SERVERS = "localhost:9092" #Default Zookeeper Consumer Address KAFKA_TOPIC = "tweet-kafka" import json def create_streaming_context(): """Create Spark streaming context.""" conf = SparkConf().set("spark.executor.memory", "2g") .set("spark.driver.memory", "2g") .set("spark.driver.bindAddress", "0.0.0.0") # Create Spark Context sc = SparkContext(master = "local[2]", appName=SPARK_APP_NAME, conf = conf) # Set log level sc.setLogLevel(SPARK_LOG_LEVEL) # Create Streaming Context ssc = StreamingContext(sc, SPARK_BATCH_INTERVAL) # Sets the context to periodically checkpoint the DStream operations for master # fault-tolerance. The graph will be checkpointed every batch interval. # It is used to update results of stateful transformations as well ssc.checkpoint(SPARK_CHECKPOINT_TMP_DIR) return ssc def create_stream(ssc): """ Create subscriber (consumer) to the Kafka topic (works on RDD that is mini-batch). """ return ( KafkaUtils.createDirectStream( ssc, topics=[KAFKA_TOPIC], kafkaParams={"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS}) .map(lambda x:x[1]) ) def main(): # Init Spark streaming context ssc = create_streaming_context() # Get tweets stream kafka_stream = create_stream(ssc) # using reduce, count the number of user's tweets for x minute every 30 seconds # descending sort the result # Print result # for 1 minute tweets_for_1_min = kafka_stream.reduceByKeyAndWindow(lambda x,y: x y, lambda x,y: x - y, windowDuration=60, slideDuration=30) sorted_tweets_for_1_min = tweets_for_1_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: x[1], ascending=False)) sorted_tweets_for_1_min.pprint() # for 10 minute tweets_for_10_min = kafka_stream.reduceByKeyAndWindow(lambda x,y: x y, lambda x,y: x - y, windowDuration=600, slideDuration=30) sorted_tweets_for_10_min = tweets_for_10_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: [1], ascending=False)) sorted_tweets_for_10_min.pprint() # Start Spark Streaming ssc.start() # Waiting for termination ssc.awaitTermination() if __name__ == "__main__": main()
I have installed the following:
- jdk1.8.0_311 and jre1.8.0_311
- python 2.7
- hadoop-2.7.1 which works properly
- spark-2.4.7-bin-hadoop2.7
- kafka_2.13-3.0.0 Я правильно установил переменные среды, но во время выполнения после выполнения команды «Отправить» я получаю следующее исключение:
spark-submit --master local[2] --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 d:task1tweet_kafka_streaming_app.py
Исключение возникло при обработке потока:
------------------------------------------- Time: 2021-12-06 15:28:30 ------------------------------------------- ------------------------------------------- Time: 2021-12-06 15:28:30 ------------------------------------------- Traceback (most recent call last): File "d:/task1/tweet_kafka_streaming_app.py", line 95, in lt;modulegt; main() File "d:/task1/tweet_kafka_streaming_app.py", line 91, in main ssc.awaitTermination() File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkstreamingcontext.py", line 192, in awaitTermination varName = k[len("spark.executorEnv."):] File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpy4j-0.10.7-src.zippy4jjava_gateway.py", line 1257, in __call__ File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpy4j-0.10.7-src.zippy4jprotocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o32.awaitTermination. : org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkstreamingutil.py", line 68, in call r = self.func(t, *rdds) File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkstreamingdstream.py", line 297, in lt;lambdagt; func = lambda t, rdd: oldfunc(rdd) File "d:/task1/tweet_kafka_streaming_app.py", line 79, in lt;lambdagt; sorted_tweets_for_1_min = tweets_for_1_min.transform(lambda x_rdd: x_rdd.sortBy(lambda x: x[1], ascending=False)) File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 699, in sortBy return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values() File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 667, in sortByKey rddSize = self.count() File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 1055, in count return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 1046, in sum return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 917, in fold vals = self.mapPartitions(func).collect() File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 816, in collect sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpy4j-0.10.7-src.zippy4jjava_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpy4j-0.10.7-src.zippy4jprotocol.py", line 328, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 20, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 377, in main File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 372, in process File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 2499, in pipeline_func File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 352, in func File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 1861, in combineLocally File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkshuffle.py", line 238, in mergeValues for k, v in iterator: ValueError: too many values to unpack at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456) at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRunner.scala:592) at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRunner.scala:575) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1925) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1913) at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1912) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1912) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948) at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:948) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:948) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2146) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084) at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$anonfun$collect$1.apply(RDD.scala:990) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.collect(RDD.scala:989) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Unknown Source) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 377, in main File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 372, in process File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 2499, in pipeline_func File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 352, in func File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkrdd.py", line 1861, in combineLocally File "D:sparkspark-2.4.7-bin-hadoop2.7pythonlibpyspark.zippysparkshuffle.py", line 238, in mergeValues for k, v in iterator: ValueError: too many values to unpack at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456) at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRunner.scala:592) at org.apache.spark.api.python.PythonRunner$anon$1.read(PythonRunner.scala:575) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ... 1 more at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95) at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78) at org.apache.spark.streaming.api.python.PythonTransformedDStream.compute(PythonDStream.scala:246) at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:342) at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1$anonfun$apply$7.apply(DStream.scala:342) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1$anonfun$1.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:336) at org.apache.spark.streaming.dstream.DStream$anonfun$getOrCompute$1.apply(DStream.scala:334) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) at org.apache.spark.streaming.DStreamGraph$anonfun$1.apply(DStreamGraph.scala:122) at org.apache.spark.streaming.DStreamGraph$anonfun$1.apply(DStreamGraph.scala:121) at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) at org.apache.spark.streaming.scheduler.JobGenerator$anonfun$3.apply(JobGenerator.scala:249) at org.apache.spark.streaming.scheduler.JobGenerator$anonfun$3.apply(JobGenerator.scala:247) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$processEvent(JobGenerator.scala:183) at org.apache.spark.streaming.scheduler.JobGenerator$anon$1.onReceive(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobGenerator$anon$1.onReceive(JobGenerator.scala:88) at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49) Exception in thread Thread-4 (most likely raised during interpreter shutdown): C:WINDOWSsystem32gt;
Комментарии:
1. В идеале вы должны использовать python3, но ошибка, похоже, заключается в сортировке после окна
2. без сортировки это сработало просто отлично, но все равно я хочу отсортировать результаты
3. @OneCricketeer пожалуйста, не могли бы вы сказать мне, какая конкретно версия python3?
4. Spark поддерживает Python3.4 и выше, так что последняя версия должна быть в порядке. Кроме того, если вы просто делаете все локально, вы также можете попробовать более новые версии Spark. Лично я склонен использовать Структурированную потоковую передачу, поэтому у меня нет никаких предложений, кроме этого
Ответ №1:
Я решил эту проблему благодаря подсказке, данной @OneCricketeer. Я обновил python до версии 3.8, но столкнулся с другими ошибками. Понижение до python 3.7, который поддерживает Spark 2.4.8 или Spark 2.4.7 с Hadoop 2.7, и мой мир снова сияет.