#scala #apache-spark #pyspark
#scala #apache-spark #pyspark
Вопрос:
У меня есть несколько CSV-файлов с заголовками, но я обнаружил, что некоторые файлы имеют разный порядок столбцов. Есть ли способ справиться с этим с помощью Spark, где я могу определить порядок выбора для каждого файла, чтобы в главном DF не было несоответствия, при котором col x мог бы иметь значения из col y?
Мое текущее чтение —
val masterDF = spark.read.option("header", "true").csv(allFiles:_*)
Ответ №1:
Извлеките all file names
и сохраните в переменной списка.
-
Затем определите
schema
of со всеми столбцами в нем. -
iterate
для каждого файла используется заголовок true, поэтому мы читаем каждый файл отдельно. -
unionA
заполните новый фрейм данных существующим фреймом данных.
Example:
file_lst=['<path1>','<path2>']
from pyspark.sql.functions import *
from pyspark.sql.types import *
#define schema for the required columns
schema = StructType([StructField("column1",StringType(),True),StructField("column2",StringType(),True)])
#create an empty dataframe
df=spark.createDataFrame([],schema)
for i in file_lst:
tmp_df=spark.read.option("header","true").csv(i).select("column1","column2")
df=df.unionAll(tmp_df)
#display results
df.show()
Комментарии:
1. Не слишком ли это дорого? Создание df и последующее объединение примерно для 100K файлов? Есть ли лучший способ сделать это?
2. одна из проблем, на которую мы хотим указать здесь, заключается в том, что unionAll будет суммировать количество разделов. и, наконец, когда вы отправляете вывод в нисходящий поток, у вас в конечном итоге будут файлы небольшого размера, которые снижают производительность в нисходящих приложениях … обязательно перераспределите и передайте данные в нисходящий поток.
3. @Привет. Мир, я не мог придумать никакой другой альтернативы, если имена столбцов расположены в другом порядке с последней версии spark.