Невозможно получить данные от Kafka для запуска потоковой передачи

#python #apache-spark #pyspark #apache-kafka #spark-streaming

#python #apache-spark #pyspark #apache-kafka #потоковая передача spark

Вопрос:

Я пытаюсь сгенерировать некоторые случайные данные через kafka producer, используя java-код в eclipse IDE. Я получаю те же данные в kafka consumer, которые также созданы с использованием кода Java в той же IDE. Моя работа зависит от потоковых данных. Итак, мне нужна потоковая передача spark для получения случайных данных, сгенерированных kafka. Для потоковой передачи spark я использую код python в jupyter-notebook. Чтобы интегрировать kafka с spark, «spark-streaming-kafka-0-10_2.12-3.0.0.jar » файл должен быть добавлен в spark jar. Я также попытался добавить файл jar в pyspark. Вот мой код spark

 import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
n_secs = 3
topic = "generate"
spark = SparkSession.builder.master("local[*]") 
        .appName("kafkaStreaming") 
        .config("/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/spark-streaming-kafka-0-10_2.12-3.0.0.jar") 
        .getOrCreate()
sc = spark.sparkContext

ssc = StreamingContext(sc, n_secs)
kStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'localhost:9092',
                        'group.id':'test-group',
                        'auto.offset.reset':'latest'})
lines = kStream.map(lambda x: x[1])
words = lines.flatmap(lambda line: line.split(" "))
print(words)
ssc.start()
time.sleep(100)
ssc.stop(stopSparkContext=True,stopGraceFully=True)
  

В приведенном выше коде я добавил файл jar с помощью метода SparkSession.config(). После создания DStream я пытаюсь получить данные от kafka с помощью KafkaUtils.createDirectStream(), указав название темы, серверы начальной загрузки и так далее. После этого я конвертирую данные в rdd и печатаю результат. Это общий поток моей работы. Сначала я выполнял код производителя kafka на Java, и он генерирует некоторые данные и потребляется потребителем kafka. До этого он работал правильно. При выполнении кода spark streaming в python отображается некоторая ошибка, подобная этой

 ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/home/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving

Py4JError                                 Traceback (most recent call last)
<ipython-input-17-873ece723182> in <module>
     36                         'bootstrap.servers':'localhost:9092',
     37                         'group.id':'test-group',
---> 38                         'auto.offset.reset':'latest'})
     39 
     40 lines = kStream.map(lambda x: x[1])

~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/streaming/kafka.py in createDirectStream(ssc, topics, kafkaParams, fromOffsets, keyDecoder, valueDecoder, messageHandler)
    144             func = funcWithoutMessageHandler
    145             jstream = helper.createDirectStreamWithoutMessageHandler(
--> 146                 ssc._jssc, kafkaParams, set(topics), jfromOffsets)
    147         else:
    148             ser = AutoBatchedSerializer(PickleSerializer())

~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

~/Downloads/Spark/spark-2.4.6-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    334             raise Py4JError(
    335                 "An error occurred while calling {0}{1}{2}".
--> 336                 format(target_id, ".", name))
    337     else:
    338         type = answer[1]

Py4JError: An error occurred while calling o270.createDirectStreamWithoutMessageHandler
  

Пожалуйста, кто-нибудь, помогите мне решить эту проблему…

Ответ №1:

Есть несколько вещей, которые я вижу из самого кода:

  • ваш артефакт jar предназначен для spark 3.0, и вы используете spark версии 2.4.6. (подсказка: последние 3 цифры в имени файла — это версия spark)
  • вы добавили свой jar-файл в свой параметр конфигурации. Я бы посоветовал сначала проверить, является ли используемый вами файл jar тем, который вам нужен, используя его в spark-submit command as --jar <jar-file-path> .
  • попробуйте сначала распечатать свой прямой поток, а не выполнять с ним всевозможные преобразования. Вы можете сделать:
 kStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'localhost:9092',
                        'group.id':'test-group',
                        'auto.offset.reset':'latest'})
kStream.pprint()
ssc.start()
# stream will run for 50 sec
ssc.awaitTerminationOrTimeout(50)
ssc.stop()
sc.stop()
  
  • Как только вы убедитесь, что получаете свои данные, вы можете использовать foreachRDD, transform или другие API для работы с вашими данными