#python #apache-spark #pyspark #spark-streaming #parquet
#python #apache-spark #pyspark #искровая потоковая передача #паркет
Вопрос:
У меня есть потоковое задание PySpark, которое считывает JSON, разделенный новой строкой, и выполняет некоторые агрегации, а затем сохраняет их в некоторые папки.
Я хочу изменить файлы, которые передаются из JSON в parquet.
Вот как я создаю контекст:
conf = pyspark.SparkConf().set("spark.driver.host", "127.0.0.1")
conf.set("spark.sql.execution.arrow.enabled", "true")
conf.set("spark.sql.parquet.compression.codec", "snappy")
sc = pyspark.SparkContext(
master="local[5]", appName="parquet_job", conf=conf
)
DATA_PATH = "the_data_path"
BATCH_TIMEOUT = 10
ssc = StreamingContext(sc, BATCH_TIMEOUT)
sqlC = SQLContext(sc)
dstream = ssc.textFileStream(DATA_PATH)
далее, каждый раз, когда искра считывает новый файл, я делаю
dstream.foreachRDD(parse_data)
ssc.start()
ssc.awaitTermination()
while not stopped:
pass
ssc.stop()
sc.stop()
и parse_data
это функция, которая выполняет несколько агрегаций с различными другими функциями.
Чтение файлов JSON прошло нормально. Когда я начал читать файлы parquet, ничего не работало.
Я пытался
schema = StructType([
StructField('a', StringType(), True),
StructField('b', StringType(), True),
StructField('c', StringType(), True),
...
])
# rdd is the dstream from foreachRDD
lines = sqlC.createDataFrame(rdd, schema)
Но я всегда получаю следующую ошибку:
AttributeError: 'RDD' object has no attribute '_get_object_id'
Или
TypeError: StructType can not accept object 'PAR1x15x04x15x1cx15 Lx15x02x15x04x12x00x00x0e4' in type <class 'str'>
Комментарии:
1. Вы нашли какое-либо решение своей проблемы?
2. @Divuneh мы изменили подход и перешли на пакетное чтение с помощью spark.read.parquet