Считывание данных из Kafka и вывод на консоль с помощью Spark Structured Sreaming на Python

#apache-spark #pyspark #apache-kafka #apache-spark-sql #spark-structured-streaming

#apache-искра #пайспарк #апачи-кафка #apache-spark-sql #spark-structured-streaming

Вопрос:

У меня есть kafka_2.13-2.7.0 в Ubuntu 20.04. Я запускаю kafka server и zookeeper, затем создаю тему и отправляю в ней текстовый файл через nc -lk 9999 . Тема полна данных. Кроме того, в моей системе есть spark-3.0.1-bin-hadoop2.7. На самом деле, я хочу использовать тему kafka в качестве источника для структурированной потоковой передачи Spark с помощью python. Мой код выглядит так:

 spark = SparkSession 
    .builder 
    .appName("APP") 
    .getOrCreate()

df = spark 
    .readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "localhost:9092") 
    .option("subscribe", "sparktest") 
    .option("startingOffsets", "earliest") 
    .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()
 

Я запускаю приведенный выше код через spark-submit с помощью этой команды:

 ./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 /home/spark/PycharmProjects/testSparkStream/KafkaToSpark.py 
 

Код выполняется без каких-либо исключений, и я получаю этот вывод в том виде, в каком он есть на сайте Spark:

    root
    |-- key: binary (nullable = true)
    |-- value: binary (nullable = true)
    |-- topic: string (nullable = true)
    |-- partition: integer (nullable = true)
    |-- offset: long (nullable = true)
    |-- timestamp: timestamp (nullable = true)
    |-- timestampType: integer (nullable = true)
 

Мой вопрос заключается в том, что тема Кафки полна данных; но в результате выполнения кода в выходных данных нет никаких данных. Не могли бы вы, пожалуйста, объяснить мне, что здесь не так?

Ответ №1:

Код как есть не будет распечатывать какие-либо данные, а предоставит вам схему только один раз.

Вы можете следовать инструкциям, приведенным в общем руководстве по структурированной потоковой передаче и руководстве по интеграции Structured Streaming Kafka, чтобы узнать, как распечатать данные на консоль. Помните, что чтение данных в Spark — это отложенная операция, и ничто не выполняется без действия (обычно writeStream операции).

Если вы дополните код, как показано ниже, вы увидите, что выбранные данные (ключ и значение) выводятся на консоль:

 spark = SparkSession 
          .builder 
          .appName("APP") 
          .getOrCreate()

df = spark
      .readStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", "localhost:9092") 
      .option("subscribe", "sparktest") 
      .option("startingOffsets", "earliest") 
      .load()
      

query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
    .writeStream 
    .format("console") 
    .option("checkpointLocation", "path/to/HDFS/dir") 
    .start()

query.awaitTermination()