Spark: загрузка CSV с разным количеством столбцов

#scala #apache-spark

#scala #apache-spark

Вопрос:

Я загружаю CSV-файл с помощью CSV-загрузчика Spark и преобразую его в определенный, Dataset предоставляя case class схему и используя .as[T] .

 spark.read
  .option("header", "false")
  .option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS")
  .schema(schemaOf[T])
  .csv(filePath)
  .as[T]
  

Мой вопрос здесь в том, что у меня есть несколько систем, отправляющих один и тот же файл и
скажем, если одна система отправляет файл, содержащий меньше двух столбцов из моего определенного schema
тогда я хотел бы просто поместить null для этих двух столбцов и загрузить все остальные столбцы.

И для всех других систем загружайте все поля при отправке, соответствующие schema .

Как мне сделать это эффективным способом? Я не хочу создавать case class для каждой системы.

Комментарии:

1. Вы можете. данные laod, если столбец null в схеме определяет nullable

2. Вы хотели сказать, что сам входной csv-файл не будет иметь значения для некоторого столбца или в нем будут пустые данные для соответствующего столбца?

3. В нем даже не было бы некоторых столбцов, например, если фактический case class or schema состоит из 25 столбцов, тогда он может содержать 23 столбца (22 запятых).

Ответ №1:

Вы можете Dataframe сначала обработать свои csv-данные перед преобразованием в Dataset . Таким образом, вы можете легко добавлять / удалять столбцы в соответствии с вашим классом case с помощью служебных функций, таких как:

 implicit class DataFrameOps(df: DataFrame) {
  def withColumnIfNotExists(colName: String, col: Column): DataFrame = {
    if(df.columns.contains(colName)) df
    else df.withColumn(colName, col)
  }
}

// then use it like this
???.csv(filePath).withColumnIfNotExists("missing_col", lit(null).cast("string"))