Не удается запустить потоковую передачу spark-kafka в Windows

#python #apache-spark #pyspark #apache-kafka #spark-streaming

Вопрос:

Я пытаюсь реализовать потоковую передачу spark с помощью kafka в Windows. инициированный zookeeper,сервер кафки,создал тему с помощью следующих команд

 .binwindowszookeeper-server-start.bat .configzookeeper.properties

.binwindowskafka-server-start.bat .configserver.properties

.binwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Kafka_streaming_data

.binwindowskafka-topics.bat --list --zookeeper localhost:2181
 

Затем инициировал потоковую передачу искры с помощью команды ниже

spark-отправить —пакеты org.apache.spark:потоковая передача искр-кафка-0-8_2.11:2.3.0 pyspark_kafka.py

pyspark_kafka.py

 import findspark

# TODO: your path will likely not have 'matthew' in it. Change it to reflect your path.
findspark.init('C:spark-2.3.0-bin-hadoop2.7')

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


n_secs = 1
topic = "Kafka_streaming_data"

conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[*]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
    
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'localhost:9092', 
                        #'group.id':'video-group', 
                        'fetch.message.max.bytes':'15728640'})
                        #'auto.offset.reset':'largest'})
                        # Group ID is completely arbitrary

lines = kafkaStream.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a b)
counts.pprint()

ssc.start()
#time.sleep(600) # Run stream for 10 minutes just in case no detection of producer
# ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)
 

Произошла следующая ошибка

 2021-10-19 23:09:56 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:spark-2.3.0-bin-hadoop2.7pythonpysparkrdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:spark-2.3.0-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 229, in main
  File "C:spark-2.3.0-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py", line 224, in process
  File "C:spark-2.3.0-bin-hadoop2.7pythonlibpyspark.zippysparkserializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
RuntimeError: generator raised StopIteration
 

Комментарии:

1. Какую версию Кафки вы используете? Какие версии python и версии Java? Это полная стопка? Кроме того, вам нужно поместить строку os.environ перед инициализацией findspark