#scala #apache-spark
#scala #apache-spark
Вопрос:
Я загружаю CSV-файл с помощью CSV-загрузчика Spark и преобразую его в определенный, Dataset
предоставляя case class
схему и используя .as[T]
.
spark.read
.option("header", "false")
.option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS")
.schema(schemaOf[T])
.csv(filePath)
.as[T]
Мой вопрос здесь в том, что у меня есть несколько систем, отправляющих один и тот же файл и
скажем, если одна система отправляет файл, содержащий меньше двух столбцов из моего определенного schema
тогда я хотел бы просто поместить null
для этих двух столбцов и загрузить все остальные столбцы.
И для всех других систем загружайте все поля при отправке, соответствующие schema
.
Как мне сделать это эффективным способом? Я не хочу создавать case class
для каждой системы.
Комментарии:
1. Вы можете. данные laod, если столбец null в схеме определяет nullable
2. Вы хотели сказать, что сам входной csv-файл не будет иметь значения для некоторого столбца или в нем будут пустые данные для соответствующего столбца?
3. В нем даже не было бы некоторых столбцов, например, если фактический
case class
orschema
состоит из 25 столбцов, тогда он может содержать 23 столбца (22 запятых).
Ответ №1:
Вы можете Dataframe
сначала обработать свои csv-данные перед преобразованием в Dataset
. Таким образом, вы можете легко добавлять / удалять столбцы в соответствии с вашим классом case с помощью служебных функций, таких как:
implicit class DataFrameOps(df: DataFrame) {
def withColumnIfNotExists(colName: String, col: Column): DataFrame = {
if(df.columns.contains(colName)) df
else df.withColumn(colName, col)
}
}
// then use it like this
???.csv(filePath).withColumnIfNotExists("missing_col", lit(null).cast("string"))