#apache-spark #pyspark #apache-kafka #apache-spark-sql #spark-structured-streaming
#apache-искра #пайспарк #апачи-кафка #apache-spark-sql #spark-structured-streaming
Вопрос:
У меня есть kafka_2.13-2.7.0 в Ubuntu 20.04. Я запускаю kafka server и zookeeper, затем создаю тему и отправляю в ней текстовый файл через nc -lk 9999
. Тема полна данных. Кроме того, в моей системе есть spark-3.0.1-bin-hadoop2.7. На самом деле, я хочу использовать тему kafka в качестве источника для структурированной потоковой передачи Spark с помощью python. Мой код выглядит так:
spark = SparkSession
.builder
.appName("APP")
.getOrCreate()
df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "sparktest")
.option("startingOffsets", "earliest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()
Я запускаю приведенный выше код через spark-submit с помощью этой команды:
./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 /home/spark/PycharmProjects/testSparkStream/KafkaToSpark.py
Код выполняется без каких-либо исключений, и я получаю этот вывод в том виде, в каком он есть на сайте Spark:
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
Мой вопрос заключается в том, что тема Кафки полна данных; но в результате выполнения кода в выходных данных нет никаких данных. Не могли бы вы, пожалуйста, объяснить мне, что здесь не так?
Ответ №1:
Код как есть не будет распечатывать какие-либо данные, а предоставит вам схему только один раз.
Вы можете следовать инструкциям, приведенным в общем руководстве по структурированной потоковой передаче и руководстве по интеграции Structured Streaming Kafka, чтобы узнать, как распечатать данные на консоль. Помните, что чтение данных в Spark — это отложенная операция, и ничто не выполняется без действия (обычно writeStream
операции).
Если вы дополните код, как показано ниже, вы увидите, что выбранные данные (ключ и значение) выводятся на консоль:
spark = SparkSession
.builder
.appName("APP")
.getOrCreate()
df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "sparktest")
.option("startingOffsets", "earliest")
.load()
query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("console")
.option("checkpointLocation", "path/to/HDFS/dir")
.start()
query.awaitTermination()