#apache-spark #pyspark #apache-spark-sql #pyspark-dataframes
#apache-spark #apache-spark-sql #pyspark
Вопрос:
Я пишу Avro на основе файла parquet. Я прочитал файл, как показано ниже:
Чтение данных
dfParquet = spark.read.format("parquet").option("mode", "FAILFAST")
.load("/Users/rashmik/flight-time.parquet")
Запись данных
Я записал файл в формате Avro, как показано ниже:
dfParquetRePartitioned.write
.format("avro")
.mode("overwrite")
.option("path", "datasink/avro")
.partitionBy("OP_CARRIER")
.option("maxRecordsPerFile", 100000)
.save()
Как и ожидалось, я получил данные, разделенные на OP_CARRIER
.
Чтение разделенных Avro данных из определенного раздела
В другом задании мне нужно прочитать данные из выходных данных вышеупомянутого задания, то есть из datasink/avro
каталога. Я использую приведенный ниже код для чтения из datasink/avro
dfAvro = spark.read.format("avro")
.option("mode","FAILFAST")
.load("datasink/avro/OP_CARRIER=AA")
Данные считываются успешно, но, как и ожидалось, OP_CARRIER
столбец недоступен в dfAvro
фрейме данных, поскольку это столбец раздела первого задания. Теперь мое требование — включить OP_CARRIER
поле также во 2-й фрейм данных, т.е. в dfAvro
. Может ли кто-нибудь помочь мне с этим?
Я ссылаюсь на документацию из документа spark, но я не могу найти соответствующую информацию. Любой указатель будет очень полезен.
Комментарии:
1.
.load("datasink/avro")
Ответ №1:
Вы реплицируете то же значение столбца с другим псевдонимом.
dfParquetRePartitioned.withColumn("OP_CARRIER_1", lit(df.OP_CARRIER))
.write
.format("avro")
.mode("overwrite")
.option("path", "datasink/avro")
.partitionBy("OP_CARRIER")
.option("maxRecordsPerFile", 100000)
.save()
Это дало бы вам то, что вы хотели. Но с другим псевдонимом.
Или вы также можете сделать это во время чтения. Если местоположение является динамическим, вы можете легко добавить столбец.
path = "datasink/avro/OP_CARRIER=AA"
newcol = path.split("/")[-1].split("=")
dfAvro = spark.read.format("avro")
.option("mode","FAILFAST")
.load(path).withColumn(newcol[0], lit(newcol[1]))
Если значение является статическим, его проще добавить во время чтения данных.