Как включить разделенный столбец в метод чтения фрейма данных pyspark

#apache-spark #pyspark #apache-spark-sql #pyspark-dataframes

#apache-spark #apache-spark-sql #pyspark

Вопрос:

Я пишу Avro на основе файла parquet. Я прочитал файл, как показано ниже:

Чтение данных

 dfParquet = spark.read.format("parquet").option("mode", "FAILFAST")
    .load("/Users/rashmik/flight-time.parquet")
  

Запись данных

Я записал файл в формате Avro, как показано ниже:

 dfParquetRePartitioned.write 
    .format("avro") 
    .mode("overwrite") 
    .option("path", "datasink/avro") 
    .partitionBy("OP_CARRIER") 
    .option("maxRecordsPerFile", 100000) 
    .save()
  

Как и ожидалось, я получил данные, разделенные на OP_CARRIER .

Чтение разделенных Avro данных из определенного раздела

В другом задании мне нужно прочитать данные из выходных данных вышеупомянутого задания, то есть из datasink/avro каталога. Я использую приведенный ниже код для чтения из datasink/avro

 dfAvro = spark.read.format("avro") 
    .option("mode","FAILFAST") 
    .load("datasink/avro/OP_CARRIER=AA")
  

Данные считываются успешно, но, как и ожидалось, OP_CARRIER столбец недоступен в dfAvro фрейме данных, поскольку это столбец раздела первого задания. Теперь мое требование — включить OP_CARRIER поле также во 2-й фрейм данных, т.е. в dfAvro . Может ли кто-нибудь помочь мне с этим?

Я ссылаюсь на документацию из документа spark, но я не могу найти соответствующую информацию. Любой указатель будет очень полезен.

Комментарии:

1. .load("datasink/avro")

Ответ №1:

Вы реплицируете то же значение столбца с другим псевдонимом.

 dfParquetRePartitioned.withColumn("OP_CARRIER_1", lit(df.OP_CARRIER)) 
.write 
.format("avro") 
.mode("overwrite") 
.option("path", "datasink/avro") 
.partitionBy("OP_CARRIER") 
.option("maxRecordsPerFile", 100000) 
.save()
  

Это дало бы вам то, что вы хотели. Но с другим псевдонимом.
Или вы также можете сделать это во время чтения. Если местоположение является динамическим, вы можете легко добавить столбец.

 path = "datasink/avro/OP_CARRIER=AA"
newcol = path.split("/")[-1].split("=") 
dfAvro = spark.read.format("avro") 
.option("mode","FAILFAST") 
.load(path).withColumn(newcol[0], lit(newcol[1]))
  

Если значение является статическим, его проще добавить во время чтения данных.