Оптимизация цикла Pyspark -Sql

#sql #apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть сценарий ,в котором мне нужно отфильтровать столбец даты по условию даты, например, мне нужно делать это в течение всего месяца . Проблема в том, что при циклировании для каждой даты требуется время . Я хотел сделать весь месяц за один раз. Ниже приведен код.

 target_date = [1,2,3...30]  for i in target_date:   df = spark.sql(f'select * from table where x_date lt;={i} and y_date gt;={i}')  df = df.withColumn('load_date',f.lit(i))  df.write.partition('load_date').mode('append').parquet(output_path)  

Любые подходы, чтобы сделать это быстрее

Комментарии:

1. Сколько записей в этой таблице?

2. @LuizViola : около 20 М.

Ответ №1:

Может быть, вы сможете переместить запись за пределы цикла. Что-то вроде

 target_date = [1,2,3...30] df_final = []  for i in target_date:   df = spark.sql(f'select * from table where x_date lt;={i} and y_date gt;={i}')  df = df.withColumn('load_date',f.lit(i))  df_final = df_final.union(df)  df_final.write.partition('load_date').parquet(output_path)  

Ответ №2:

Я верю, что вы могли бы решить эту проблему с помощью такого перекрестного соединения, как это:

 load_dates = spark.createDataFrame([[i,] for i in range(1,31)], ['load_date']) load_dates.show()  ---------  |load_date|  ---------  | 1| | 2| | 3| | ...| | 30|  ---------   df = spark.sql(f'select * from table')  df.join(  load_dates,  on=(F.col('x_date') lt;= F.col('load_date')) amp; (F.col('y_date') gt;= F.col('load_date')),  how='inner', )  df.write.partitionBy('load_date').parquet(output_path)  

Ответ №3:

Вы должны быть в состоянии сделать это с помощью

  1. Создание массива load_dates в каждой строке
  2. Развертывание массива таким образом, чтобы у вас была уникальная дата load_date для каждой исходной строки
  3. Фильтрация, чтобы получить только те данные загрузки, которые вы хотите

Например

 target_dates = [1,2,3...30]  df = spark.sql(f'select * from table') # create an array of all load_dates in each row df = df.withColumn("load_date", F.array([F.lit(i) for i in target_dates])) # Explode the load_dates so that you get a new row for each load_date df = df.withColumn("load_date", F.explode("load_date")) # Filter only the load_dates you want to keep  df = df.filter("x_date lt;= load_date and y_date gt;=load_date") df.write.partition('load_date').mode('append').parquet(output_path)