#python #dataframe #apache-spark #pyspark
#python #фрейм данных #apache-spark #pyspark
Вопрос:
Я нашел различные похожие вопросы, но ничего, что отвечало бы на мою конкретную проблему.
Мне нужно заполнить недостающие строки дат во фрейме данных pyspark последними значениями строк на основе столбца даты.
Мое текущее решение состоит в том, чтобы вычислить список пропущенных дат до сегодняшней даты, объединить с исходным df и заполнить все столбцы один за другим последним допустимым значением:
# get the maximum date from the df
max_date = df.select(F.max('date')).first()['max(date)')
today = datetime.date.today()
delta = today - max_date
dates_list = [(today - datetime.timedelta(days=x),) for x in range(delta.days)]
# if there are missing rows
if dates_list:
# create df with one column 'date'
dates_df = spark.createDataFrame(dates_list, schema=date_schema)
# join with original df
df = df.join(F.broadcast(dates_df), ['date'], 'outer')
w = Window.orderBy('date').rangeBetween(Window.unboundedPreceding, 0)
# fill all columns with latest non null col value
for c in df.columns:
if c != 'date':
df = df.withColumn(c, F.last(c, ignorenulls=True).over(w))
Проблема с этим кодом заключается в том, что исходный df содержит большое количество столбцов, и для каждого из них spark вычисляет окно для получения последнего ненулевого значения, и этот подход довольно неэффективен, что приводит к огромному логическому плану.
Я хотел бы реализовать это таким образом, чтобы просто получить содержимое строки с максимальной датой (поскольку оно не содержит нулевых значений) и изменить дату на список вычисленных дат до сегодняшнего дня.
Есть предложения о том, как реализовать этот подход?
Пример ввода:
date | col_one | col_two | col_three | .. | col_n
-------------------------------------------------------
2020-08-15 | 0.1 | 6.5 | 9.8 | .. | 0.7
2020-08-14 | 0.2 | 5.5 | 1.8 | .. | 3.7
2020-08-13 | 0.4 | 7.5 | 1.3 | .. | 0.5
2020-08-12 | 3.1 | 8.5 | 9.8 | .. | 1.7
2020-08-11 | 0.15 | 6.9 | 9.7 | .. | 0.2
Пример вывода:
date | col_one | col_two | col_three | .. | col_n
-------------------------------------------------------
2020-08-18 | 0.1 | 6.5 | 9.8 | .. | 0.7
2020-08-17 | 0.1 | 6.5 | 9.8 | .. | 0.7
2020-08-16 | 0.1 | 6.5 | 9.8 | .. | 0.7
2020-08-15 | 0.1 | 6.5 | 9.8 | .. | 0.7
2020-08-14 | 0.2 | 5.5 | 1.8 | .. | 3.7
2020-08-13 | 0.4 | 7.5 | 1.3 | .. | 0.5
2020-08-12 | 3.1 | 8.5 | 9.8 | .. | 1.7
2020-08-11 | 0.15 | 6.9 | 9.7 | .. | 0.2
Комментарии:
1. Не могли бы вы, пожалуйста, опубликовать несколько примеров ввода-вывода и o / p?
2. исходный df — это просто фрейм данных со столбцом даты и, как и 100 столбцов с плавающей запятой, выводимый пример просто содержит все даты до сегодняшнего дня с последним значением строки.
3. Пожалуйста, добавьте пример ввода и вывода к вашему вопросу, это очень помогает, и ваш комментарий тоже не помог.
4. Я привел два примера, надеюсь, теперь все более понятно
Ответ №1:
Возможным решением является фильтрация исходного фрейма данных с максимальной датой, заполнение столбца даты первой пропущенной датой и объединение содержимого той же строки с пропущенной датой для каждой оставшейся пропущенной даты. Затем, в конце, соедините обратно с исходным df:
# compute the list of all dates from maximum date available till today
max_date = df.select(F.max('date')).first()['max(date)']
today = datetime.date.today()
dates_list = [today - datetime.timedelta(days=x) for x in range((today - max_date).days)]
# take the row with latest date values
max_date_values = df.filter(F.col('date') == max_date)
missing_dates_values = None
# duplicate latest values for the dates we are missing till today
for missing_date in dates_list:
if missing_dates_values is None:
missing_dates_values = max_date_values.withColumn('date', F.lit(missing_date))
else:
missing_dates_values = missing_dates_values.unionByName(max_date_values.withColumn('date',
F.lit(missing_date)))
# union with the original df
if dates_list:
df = df.unionByName(missing_dates_values)
В моем случае план запроса spark намного проще по сравнению с before, что приводит к гораздо большей производительности.