Временная метка изменяется при записи в parquet

#scala #apache-spark #google-bigquery #airflow #parquet

#scala #apache-spark #google-bigquery #воздушный поток #паркет #parquet

Вопрос:

У меня есть приложение Spark, которое загружает файлы CSV, преобразует их в файл Parquet, сохраняет файл Parquet в моем хранилище Data Lake, а затем загружает данные в таблицу BigQuery.

Проблема в том, что когда в CSV слишком много старого значения метки времени, преобразование происходит, но столбец метки времени не может быть показан в таблице BigQuery.

Когда я установил для конфигурации spark.sql.parquet.outputTimestampType значение TIMESTAMP_MICROS , я получил эту ошибку в BigQuery:

 Cannot return an invalid timestamp value of -62135607600000000 microseconds relative to the Unix epoch. The range of valid timestamp values is [0001-01-1 00:00:00, 9999-12-31 23:59:59.999999]; error in writing field reference_date
  

Когда я установил для конфигурации spark.sql.parquet.outputTimestampType значение TIMESTAMP_MILLIS , я получил эту ошибку в Airflow:

 Error while reading data, error message: Invalid timestamp value -62135607600000 for field 'reference_date' of type 'INT64' (logical type 'TIMESTAMP_MILLIS'): generic::out_of_range: Invalid timestamp value: -62135607600000
  
  • Файл CSV:
 id,reference_date
"6829baef-bcd9-412a-a2f3-abdfed02jsd","0001-01-02 21:00:00"
  
  • Прочитайте файл CSV (и преобразуйте reference_date в столбец временной метки):
 def castDFColumn(
  df: DataFrame,
  column: String,
  dataType: DataType
): DataFrame = df.withColumn(column, df(column).cast(dataType))

...
var df = spark
  .read
  .format("csv")
  .option("header", true)
  .load("myfile.csv")

df = castDFColumn(df, "reference_date", TimestampType)
  
  • Преобразовать в файл Parquet file:
 df
  .write
  .mode("overwrite")
  .parquet("path/to/save")
  
  • Конфигурации среды выполнения приложения Spark:
 val conf = new SparkConf().setAppName("Load CSV")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS/TIMESTAMP_MICROS")
conf.set("spark.sql.session.timeZone", "UTC")
  

Похоже, что временная метка изменяется на 0000-12-31 21:00:00 или что-то в этом роде, выходя за пределы допустимого диапазона INT64 временной метки.

Кто-нибудь проходил через это?

Комментарии:

1. Значения временных меток в CSV недопустимы, поскольку я вижу, что год должен быть 0001 . Действительная эпоха должна начинаться с года 1970 . Я предлагаю изучить, как создается файл CSV, разрешить ситуацию, а затем на более раннем этапе, прежде чем записывать его в формате Parquet, исправить дату.