#scala #apache-spark #google-bigquery #airflow #parquet
#scala #apache-spark #google-bigquery #воздушный поток #паркет #parquet
Вопрос:
У меня есть приложение Spark, которое загружает файлы CSV, преобразует их в файл Parquet, сохраняет файл Parquet в моем хранилище Data Lake, а затем загружает данные в таблицу BigQuery.
Проблема в том, что когда в CSV слишком много старого значения метки времени, преобразование происходит, но столбец метки времени не может быть показан в таблице BigQuery.
Когда я установил для конфигурации spark.sql.parquet.outputTimestampType
значение TIMESTAMP_MICROS
, я получил эту ошибку в BigQuery:
Cannot return an invalid timestamp value of -62135607600000000 microseconds relative to the Unix epoch. The range of valid timestamp values is [0001-01-1 00:00:00, 9999-12-31 23:59:59.999999]; error in writing field reference_date
Когда я установил для конфигурации spark.sql.parquet.outputTimestampType
значение TIMESTAMP_MILLIS
, я получил эту ошибку в Airflow:
Error while reading data, error message: Invalid timestamp value -62135607600000 for field 'reference_date' of type 'INT64' (logical type 'TIMESTAMP_MILLIS'): generic::out_of_range: Invalid timestamp value: -62135607600000
- Файл CSV:
id,reference_date
"6829baef-bcd9-412a-a2f3-abdfed02jsd","0001-01-02 21:00:00"
- Прочитайте файл CSV (и преобразуйте
reference_date
в столбец временной метки):
def castDFColumn(
df: DataFrame,
column: String,
dataType: DataType
): DataFrame = df.withColumn(column, df(column).cast(dataType))
...
var df = spark
.read
.format("csv")
.option("header", true)
.load("myfile.csv")
df = castDFColumn(df, "reference_date", TimestampType)
- Преобразовать в файл Parquet file:
df
.write
.mode("overwrite")
.parquet("path/to/save")
- Конфигурации среды выполнения приложения Spark:
val conf = new SparkConf().setAppName("Load CSV")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MILLIS/TIMESTAMP_MICROS")
conf.set("spark.sql.session.timeZone", "UTC")
Похоже, что временная метка изменяется на 0000-12-31 21:00:00
или что-то в этом роде, выходя за пределы допустимого диапазона INT64
временной метки.
Кто-нибудь проходил через это?
Комментарии:
1. Значения временных меток в CSV недопустимы, поскольку я вижу, что год должен быть
0001
. Действительная эпоха должна начинаться с года1970
. Я предлагаю изучить, как создается файл CSV, разрешить ситуацию, а затем на более раннем этапе, прежде чем записывать его в формате Parquet, исправить дату.