проблемы с потоковой передачей pyspark и импортом утилит

#python #pyspark #apache-kafka

Вопрос:

Я пытаюсь запустить приведенный ниже код

 import findspark
findspark.init('/opt/spark')
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'

import sys
import time
from pyspark.context import SparkContext
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

n_secs = 1
topic = "video-stream-event"
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[*]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
                        'bootstrap.servers':'127.0.0.1  :9092', 
                        'group.id':'video-group', 
                        'fetch.message.max.bytes':'15728640',
                        'auto.offset.reset':'largest'})
#lines = kafkaStream.map(lambda x: x[1])
print(kafkastream)
 

Я получил следующую ошибку

из pyspark.streaming.kafka импортируйте модуль KafkaUtils ModuleNotFoundError: нет модуля с именем ‘pyspark.streaming.kafka’ log4j: ПРЕДУПРЕЖДЕНИЕ Для logger (org.apache.spark.util не удалось найти никаких добавлений.ShutdownHookManager). log4j: ПРЕДУПРЕЖДЕНИЕ Пожалуйста, правильно инициализируйте систему log4j.

Используемые python == 3.7 и pyspark == 3.1.2 изменены на pyspark 2.4.5 и 2.4.6 и выполнили тот же код, получили приведенную ниже ошибку

 > 21/10/18 14:05:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind
> to another address WARNING: An illegal reflective access operation has
> occurred WARNING: Illegal reflective access by
> org.apache.spark.unsafe.Platform
> (file:/opt/spark/jars/spark-unsafe_2.11-2.4.5.jar) to method
> java.nio.Bits.unaligned() WARNING: Please consider reporting this to
> the maintainers of org.apache.spark.unsafe.Platform WARNING: Use
> --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be
> denied in a future release 21/10/18 14:05:47 WARN NativeCodeLoader:
> Unable to load native-hadoop library for your platform... using
> builtin-java classes where applicable Traceback (most recent call
> last):   File "/home/deepika/Desktop/kafka/kafka_pyspark.py", line 12,
> in <module>
>     from pyspark.context import SparkContext   File "/home/deepika/Downloads/code_dump/spark/python/pyspark/__init__.py",
> line 51, in <module>
>     from pyspark.context import SparkContext   File "/home/deepika/Downloads/code_dump/spark/python/pyspark/context.py",
> line 31, in <module>
>     from pyspark import accumulators   File "/home/deepika/Downloads/code_dump/spark/python/pyspark/accumulators.py",
> line 97, in <module>
>     from pyspark.serializers import read_int, PickleSerializer   File "/home/deepika/Downloads/code_dump/spark/python/pyspark/serializers.py",
> line 72, in <module>
>     from pyspark import cloudpickle   File "/home/deepika/Downloads/code_dump/spark/python/pyspark/cloudpickle.py",
> line 145, in <module>
>     _cell_set_template_code = _make_cell_set_template_code()   File "/home/deepika/Downloads/code_dump/spark/python/pyspark/cloudpickle.py",
> line 126, in _make_cell_set_template_code
>     return types.CodeType( TypeError: an integer is required (got type bytes) log4j:WARN No appenders could be found for logger
> (org.apache.spark.util.ShutdownHookManager). log4j:WARN Please
> initialize the log4j system properly. log4j:WARN See
> http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
 

Есть идеи, что теперь делать?Я хочу запустить приведенный выше код.Пробовал версии python 3.7 и 3.8 и pysprk, заканчивающиеся этими 2 ошибками
Установлен pyspark по этой ссылке :

Ссылка для установки pyspark

Ответ №1:

  1. Вы должны использовать spark-sql-kafka-0-10
  2. Вам нужно перемещаться findspark.init() после os.environ строки. Кроме того, вам на самом деле не нужна эта строка, поскольку вы можете предоставить пакеты через findspark.
 SPARK_VERSION = '3.1.2'
SCALA_VERSION = '2.12'

import findspark
findspark.add_packages(['org.apache.spark:spark-sql-kafka-0-10_'   SCALA_VERSION   ':'   SPARK_VERSION ])

findspark.init()
 
from pyspark import SparkContext, SparkConf
 

Кроме того, если вы только начинаете работать со Spark, используйте последнюю версию

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

Что касается ошибки log4j, вам необходимо создать log4j.properties файл в $SPARK_HOME/conf