#sql #apache-spark #pyspark #apache-spark-sql
Вопрос:
У меня есть сценарий ,в котором мне нужно отфильтровать столбец даты по условию даты, например, мне нужно делать это в течение всего месяца . Проблема в том, что при циклировании для каждой даты требуется время . Я хотел сделать весь месяц за один раз. Ниже приведен код.
target_date = [1,2,3...30] for i in target_date: df = spark.sql(f'select * from table where x_date lt;={i} and y_date gt;={i}') df = df.withColumn('load_date',f.lit(i)) df.write.partition('load_date').mode('append').parquet(output_path)
Любые подходы, чтобы сделать это быстрее
Комментарии:
1. Сколько записей в этой таблице?
2. @LuizViola : около 20 М.
Ответ №1:
Может быть, вы сможете переместить запись за пределы цикла. Что-то вроде
target_date = [1,2,3...30] df_final = [] for i in target_date: df = spark.sql(f'select * from table where x_date lt;={i} and y_date gt;={i}') df = df.withColumn('load_date',f.lit(i)) df_final = df_final.union(df) df_final.write.partition('load_date').parquet(output_path)
Ответ №2:
Я верю, что вы могли бы решить эту проблему с помощью такого перекрестного соединения, как это:
load_dates = spark.createDataFrame([[i,] for i in range(1,31)], ['load_date']) load_dates.show() --------- |load_date| --------- | 1| | 2| | 3| | ...| | 30| --------- df = spark.sql(f'select * from table') df.join( load_dates, on=(F.col('x_date') lt;= F.col('load_date')) amp; (F.col('y_date') gt;= F.col('load_date')), how='inner', ) df.write.partitionBy('load_date').parquet(output_path)
Ответ №3:
Вы должны быть в состоянии сделать это с помощью
- Создание массива load_dates в каждой строке
- Развертывание массива таким образом, чтобы у вас была уникальная дата load_date для каждой исходной строки
- Фильтрация, чтобы получить только те данные загрузки, которые вы хотите
Например
target_dates = [1,2,3...30] df = spark.sql(f'select * from table') # create an array of all load_dates in each row df = df.withColumn("load_date", F.array([F.lit(i) for i in target_dates])) # Explode the load_dates so that you get a new row for each load_date df = df.withColumn("load_date", F.explode("load_date")) # Filter only the load_dates you want to keep df = df.filter("x_date lt;= load_date and y_date gt;=load_date") df.write.partition('load_date').mode('append').parquet(output_path)