Преобразование RDD-паркета из StreamingContext и textFileStream

#python #apache-spark #pyspark #spark-streaming #parquet

#python #apache-spark #pyspark #искровая потоковая передача #паркет

Вопрос:

У меня есть потоковое задание PySpark, которое считывает JSON, разделенный новой строкой, и выполняет некоторые агрегации, а затем сохраняет их в некоторые папки.

Я хочу изменить файлы, которые передаются из JSON в parquet.

Вот как я создаю контекст:

 conf = pyspark.SparkConf().set("spark.driver.host", "127.0.0.1")
conf.set("spark.sql.execution.arrow.enabled", "true")
conf.set("spark.sql.parquet.compression.codec", "snappy")
sc = pyspark.SparkContext(
    master="local[5]", appName="parquet_job", conf=conf
)
DATA_PATH = "the_data_path"
BATCH_TIMEOUT = 10

ssc = StreamingContext(sc, BATCH_TIMEOUT)
sqlC = SQLContext(sc)

dstream = ssc.textFileStream(DATA_PATH)
  

далее, каждый раз, когда искра считывает новый файл, я делаю

 dstream.foreachRDD(parse_data)

ssc.start()
ssc.awaitTermination()
while not stopped:
    pass
ssc.stop()
sc.stop()

  

и parse_data это функция, которая выполняет несколько агрегаций с различными другими функциями.

Чтение файлов JSON прошло нормально. Когда я начал читать файлы parquet, ничего не работало.

Я пытался

         schema = StructType([
            StructField('a', StringType(), True),
            StructField('b', StringType(), True),
            StructField('c', StringType(), True),
            ...
        ])
        # rdd is the dstream from foreachRDD
        lines = sqlC.createDataFrame(rdd, schema)

  

Но я всегда получаю следующую ошибку:

AttributeError: 'RDD' object has no attribute '_get_object_id'

Или

TypeError: StructType can not accept object 'PAR1x15x04x15x1cx15 Lx15x02x15x04x12x00x00x0e4' in type <class 'str'>

Комментарии:

1. Вы нашли какое-либо решение своей проблемы?

2. @Divuneh мы изменили подход и перешли на пакетное чтение с помощью spark.read.parquet