#python #pyspark #apache-kafka
Вопрос:
Я пытаюсь запустить приведенный ниже код
import findspark
findspark.init('/opt/spark')
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.0 pyspark-shell'
import sys
import time
from pyspark.context import SparkContext
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
n_secs = 1
topic = "video-stream-event"
conf = SparkConf().setAppName("KafkaStreamProcessor").setMaster("local[*]")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, n_secs)
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {
'bootstrap.servers':'127.0.0.1 :9092',
'group.id':'video-group',
'fetch.message.max.bytes':'15728640',
'auto.offset.reset':'largest'})
#lines = kafkaStream.map(lambda x: x[1])
print(kafkastream)
Я получил следующую ошибку
из pyspark.streaming.kafka импортируйте модуль KafkaUtils ModuleNotFoundError: нет модуля с именем ‘pyspark.streaming.kafka’ log4j: ПРЕДУПРЕЖДЕНИЕ Для logger (org.apache.spark.util не удалось найти никаких добавлений.ShutdownHookManager). log4j: ПРЕДУПРЕЖДЕНИЕ Пожалуйста, правильно инициализируйте систему log4j.
Используемые python == 3.7 и pyspark == 3.1.2 изменены на pyspark 2.4.5 и 2.4.6 и выполнили тот же код, получили приведенную ниже ошибку
> 21/10/18 14:05:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind
> to another address WARNING: An illegal reflective access operation has
> occurred WARNING: Illegal reflective access by
> org.apache.spark.unsafe.Platform
> (file:/opt/spark/jars/spark-unsafe_2.11-2.4.5.jar) to method
> java.nio.Bits.unaligned() WARNING: Please consider reporting this to
> the maintainers of org.apache.spark.unsafe.Platform WARNING: Use
> --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be
> denied in a future release 21/10/18 14:05:47 WARN NativeCodeLoader:
> Unable to load native-hadoop library for your platform... using
> builtin-java classes where applicable Traceback (most recent call
> last): File "/home/deepika/Desktop/kafka/kafka_pyspark.py", line 12,
> in <module>
> from pyspark.context import SparkContext File "/home/deepika/Downloads/code_dump/spark/python/pyspark/__init__.py",
> line 51, in <module>
> from pyspark.context import SparkContext File "/home/deepika/Downloads/code_dump/spark/python/pyspark/context.py",
> line 31, in <module>
> from pyspark import accumulators File "/home/deepika/Downloads/code_dump/spark/python/pyspark/accumulators.py",
> line 97, in <module>
> from pyspark.serializers import read_int, PickleSerializer File "/home/deepika/Downloads/code_dump/spark/python/pyspark/serializers.py",
> line 72, in <module>
> from pyspark import cloudpickle File "/home/deepika/Downloads/code_dump/spark/python/pyspark/cloudpickle.py",
> line 145, in <module>
> _cell_set_template_code = _make_cell_set_template_code() File "/home/deepika/Downloads/code_dump/spark/python/pyspark/cloudpickle.py",
> line 126, in _make_cell_set_template_code
> return types.CodeType( TypeError: an integer is required (got type bytes) log4j:WARN No appenders could be found for logger
> (org.apache.spark.util.ShutdownHookManager). log4j:WARN Please
> initialize the log4j system properly. log4j:WARN See
> http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Есть идеи, что теперь делать?Я хочу запустить приведенный выше код.Пробовал версии python 3.7 и 3.8 и pysprk, заканчивающиеся этими 2 ошибками
Установлен pyspark по этой ссылке :
Ответ №1:
- Вы должны использовать
spark-sql-kafka-0-10
- Вам нужно перемещаться
findspark.init()
послеos.environ
строки. Кроме того, вам на самом деле не нужна эта строка, поскольку вы можете предоставить пакеты через findspark.
SPARK_VERSION = '3.1.2'
SCALA_VERSION = '2.12'
import findspark
findspark.add_packages(['org.apache.spark:spark-sql-kafka-0-10_' SCALA_VERSION ':' SPARK_VERSION ])
findspark.init()
from pyspark import SparkContext, SparkConf
Кроме того, если вы только начинаете работать со Spark, используйте последнюю версию
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Что касается ошибки log4j, вам необходимо создать log4j.properties
файл в $SPARK_HOME/conf