#pyspark #apache-kafka #spark-structured-streaming
Вопрос:
Я пытаюсь создать файл для кафки, а затем использовать его в потоковой передаче spark.
historical_data_level_1.json -> producer.py ->> consumer.py
Проблема в том, что, когда сообщения поступают в spark, они поступают странно, поэтому я понимаю, что проблема в producer.py
когда я проверяю df
kafka_df.selectExpr("CAST(value AS STRING)").show(30, False)
в результате получается:
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|value |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"{"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:50.873031 00:00", "precio_actual": 1.2065, "bid": NaN, "ask": NaN, "high": 1.20905, "low": 1.2053, "close": 1.2065}n" |
|"{"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:50.873720 00:00", "precio_actual": 1.208235, "bid": 1.20823, "ask": 1.20824, "high": 1.20905, "low": 1.2053, "close": 1.2065}n" |
|"{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.914310 00:00", "precio_actual": 109.09, "bid": NaN, "ask": NaN, "high": 109.2, "low": 108.935, "close": 109.09}n" |
|"{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.914867 00:00", "precio_actual": 109.10249999999999, "bid": 109.102, "ask": 109.103, "high": 109.2, "low": 108.935, "close": 109.09}n"|
|"{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.975038 00:00", "precio_actual": 109.102, "bid": 109.101, "ask": 109.103, "high": 109.2, "low": 108.935, "close": 109.09}n" |
|"{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:51.059851 00:00", "precio_actual": 109.1015, "bid": 109.101, "ask": 109.102, "high": 109.2, "low": 108.935, "close": 109.09}n" |
|"{"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:51.101304 00:00", "precio_actual": 1.208235, "bid": 1.20823, "ask": 1.20824, "high": 1.20905, "low": 1.2053, "close": 1.2065}n" |
Если я копирую и вставляю данные в терминал (производитель), то он работает правильно, поэтому проблема в том, как я отправляю сообщения в spark
Не могли бы вы мне помочь?
У меня есть следующий файл historical_data_level_1.json с этими данными:
{"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:50.873031 00:00", "precio_actual": 1.2065, "bid": NaN, "ask": NaN, "high": 1.20905, "low": 1.2053, "close": 1.2065}
{"localSymbol": "EUR.USD", "time": "2021-05-07 10:10:50.873720 00:00", "precio_actual": 1.208235, "bid": 1.20823, "ask": 1.20824, "high": 1.20905, "low": 1.2053, "close": 1.2065}
{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.914310 00:00", "precio_actual": 109.09, "bid": NaN, "ask": NaN, "high": 109.2, "low": 108.935, "close": 109.09}
{"localSymbol": "USD.JPY", "time": "2021-05-07 10:10:50.914867 00:00", "precio_actual": 109.10249999999999, "bid": 109.102, "ask": 109.103, "high": 109.2, "low": 108.935, "close": 109.09}
У меня есть следующий продюсер producer.py
import json
from kafka import KafkaProducer
import settings
def producer():
kafka_producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
with open(settings.HISTORICAL_DATA_FOLDER "historical_data_level_1.json") as historical_data_level_1:
for line in historical_data_level_1:
kafka_producer.send("test", json.dumps(line).encode('utf-8'))
if __name__ == '__main__':
producer()
и
У меня есть глупый потребитель consumer.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, expr
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType
if __name__ == "__main__":
spark = SparkSession
.builder
.appName("File Streaming Demo3")
.master("local[3]")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1')
.getOrCreate()
kafka_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.load()
kafka_df.printSchema()
schema = StructType([
StructField('localSymbol', StringType()),
StructField('time', StringType()),
StructField('precio_actual', StringType()),
StructField('bid', StringType()),
StructField('ask', StringType()),
StructField('high', StringType()),
StructField('low', StringType()),
StructField('close', StringType()),
])
value_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("value"))
value_df.printSchema()
level_1_df_process = value_df
.select(
'value.*'
)
invoice_writer_query = level_1_df_process.writeStream
.format("json")
.queryName("Flattened Invoice Writer")
.outputMode("append")
.option("path", "output")
.option("checkpointLocation", "chk-point-dir")
.trigger(processingTime="1 minute")
.start()
invoice_writer_query.awaitTermination()
Комментарии:
1. структурированная потоковая передача?
2. да, структурированная потоковая передача
Ответ №1:
Это решается отправкой producer.py словарь вместо строки, оставив код следующим образом
import json
from kafka import KafkaProducer
import settings
def producer():
kafka_producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
with open(settings.HISTORICAL_DATA_FOLDER "historical_data_level_1.json") as historical_data_level_1:
for line in historical_data_level_1:
line_python_dict = json.loads(line)
kafka_producer.send("test", json.dumps(line_python_dict).encode('utf-8'))
if __name__ == '__main__':
producer()