Проблема обработки данных файла в кафку, а это вызывает структурированную потоковую передачу

#pyspark #apache-kafka #spark-structured-streaming

Вопрос:

Я пытаюсь создать файл для кафки, а затем использовать его в потоковой передаче spark.

historical_data_level_1.json -> producer.py ->> consumer.py

Проблема в том, что, когда сообщения поступают в spark, они поступают странно, поэтому я понимаю, что проблема в producer.py

когда я проверяю df

 kafka_df.selectExpr("CAST(value AS STRING)").show(30, False)
 

в результате получается:

  ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|value                                                                                                                                                                                                              |
 ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|"{"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:50.873031 00:00", "precio_actual": 1.2065, "bid": NaN, "ask": NaN, "high": 1.20905, "low": 1.2053, "close": 1.2065}n"                   |
|"{"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:50.873720 00:00", "precio_actual": 1.208235, "bid": 1.20823, "ask": 1.20824, "high": 1.20905, "low": 1.2053, "close": 1.2065}n"         |
|"{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.914310 00:00", "precio_actual": 109.09, "bid": NaN, "ask": NaN, "high": 109.2, "low": 108.935, "close": 109.09}n"                    |
|"{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.914867 00:00", "precio_actual": 109.10249999999999, "bid": 109.102, "ask": 109.103, "high": 109.2, "low": 108.935, "close": 109.09}n"|
|"{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.975038 00:00", "precio_actual": 109.102, "bid": 109.101, "ask": 109.103, "high": 109.2, "low": 108.935, "close": 109.09}n"           |
|"{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:51.059851 00:00", "precio_actual": 109.1015, "bid": 109.101, "ask": 109.102, "high": 109.2, "low": 108.935, "close": 109.09}n"          |
|"{"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:51.101304 00:00", "precio_actual": 1.208235, "bid": 1.20823, "ask": 1.20824, "high": 1.20905, "low": 1.2053, "close": 1.2065}n"         |
 

Если я копирую и вставляю данные в терминал (производитель), то он работает правильно, поэтому проблема в том, как я отправляю сообщения в spark

Не могли бы вы мне помочь?

У меня есть следующий файл historical_data_level_1.json с этими данными:

 {"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:50.873031 00:00", "precio_actual": 1.2065, "bid": NaN, "ask": NaN, "high": 1.20905, "low": 1.2053, "close": 1.2065}
{"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:50.873720 00:00", "precio_actual": 1.208235, "bid": 1.20823, "ask": 1.20824, "high": 1.20905, "low": 1.2053, "close": 1.2065}
{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.914310 00:00", "precio_actual": 109.09, "bid": NaN, "ask": NaN, "high": 109.2, "low": 108.935, "close": 109.09}
{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.914867 00:00", "precio_actual": 109.10249999999999, "bid": 109.102, "ask": 109.103, "high": 109.2, "low": 108.935, "close": 109.09}
 

У меня есть следующий продюсер producer.py

 import json
from kafka import KafkaProducer
import settings

def producer():
    kafka_producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    with open(settings.HISTORICAL_DATA_FOLDER "historical_data_level_1.json") as historical_data_level_1:
        for line in historical_data_level_1:
            kafka_producer.send("test", json.dumps(line).encode('utf-8'))

if __name__ == '__main__':
    producer()
 

и

У меня есть глупый потребитель consumer.py

 from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType

if __name__ == "__main__":
    spark = SparkSession 
        .builder 
        .appName("File Streaming Demo3") 
        .master("local[3]") 
        .config("spark.streaming.stopGracefullyOnShutdown", "true") 
        .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1') 
        .getOrCreate()

    kafka_df = spark.readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers", "localhost:9092") 
        .option("subscribe", "test") 
        .option("startingOffsets", "earliest") 
        .load()

    kafka_df.printSchema()

    schema = StructType([
        StructField('localSymbol', StringType()),
        StructField('time', StringType()),
        StructField('precio_actual', StringType()),
        StructField('bid', StringType()),
        StructField('ask', StringType()),
        StructField('high', StringType()),
        StructField('low', StringType()),
        StructField('close', StringType()),
    ])

    value_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("value"))

    value_df.printSchema()

    level_1_df_process = value_df 
        .select(
            'value.*'
        )

    invoice_writer_query = level_1_df_process.writeStream 
        .format("json") 
        .queryName("Flattened Invoice Writer") 
        .outputMode("append") 
        .option("path", "output") 
        .option("checkpointLocation", "chk-point-dir") 
        .trigger(processingTime="1 minute") 
        .start()


    invoice_writer_query.awaitTermination()
 

Комментарии:

1. структурированная потоковая передача?

2. да, структурированная потоковая передача

Ответ №1:

Это решается отправкой producer.py словарь вместо строки, оставив код следующим образом

 import json
from kafka import KafkaProducer
import settings

def producer():
    kafka_producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
    with open(settings.HISTORICAL_DATA_FOLDER "historical_data_level_1.json") as historical_data_level_1:
        for line in historical_data_level_1:
            line_python_dict = json.loads(line)
            kafka_producer.send("test", json.dumps(line_python_dict).encode('utf-8'))


if __name__ == '__main__':
    producer()