#python #dataframe #apache-spark #pyspark
#python #фрейм данных #apache-spark #pyspark
Вопрос:
У меня есть список исторических значений для настройки устройства и фрейма данных с временными метками.
Мне нужно создать новый столбец в моем фрейме данных на основе сравнения столбца временных меток в фрейме данных и временной метки значения параметра в моем списке.
settings_history = [[1, '2021-01-01'], [2, '2021-01-12']]
dataframe = df.withColumn(
'setting_col', when(col('device_timestamp') <= settings_history[0][1], settings_history[0][0])
.when(col('device_timestamp') <= settings_history[1][1], settings_history[1][0])
)
Количество записей в settings_history
массиве является динамическим, и мне нужно найти способ реализовать что-то вроде описанного выше, но я получаю синтаксическую ошибку. Кроме того, я пытался использовать цикл for в моей функции withColumn, но это тоже не сработало.
Мой необработанный фрейм данных имеет такие значения, как:
device_timestamp
2020-05-21
2020-12-19
2021-01-03
2021-01-11
Моя цель — иметь что-то вроде:
device_timestamp setting_col
2020-05-21 1
2020-12-19 1
2021-01-03 2
2021-01-11 2
Я использую Databricks в Azure для своей работы.
Ответ №1:
Вы можете использовать reduce
для объединения when
условий вместе:
from functools import reduce
settings_history = [[1, '2021-01-01'], [2, '2021-01-12']]
new_col = reduce(
lambda c, history: c.when(col('device_timestamp') <= history[1], history[0]),
settings_history[1:],
when(col('device_timestamp') <= settings_history[0][1], settings_history[0][0])
)
dataframe = df.withColumn('setting_col', new_col)
Комментарии:
1. Как насчет добавления
.otherwise()
условия? Я понял, что все, что после2021-01-12
значения, все равно должно быть установлено на значение2
. В настоящее время я добился этого, используя другоеwhen()
сравнение с обратным равенством, но это немного странно при чтении кода и, возможно, немного сложнее для понимания. Я попытался добавить простое.otherwise()
после окончанияwhen
, но я получил синтаксическую ошибку.2. @RaduGheorghiu вы должны добавить
.otherwise()
послеreduce(...)
, напримерreduce(...).otherwise(...)
.
Ответ №2:
when_expression
В этом случае будет полезно что-то вроде приведенной ниже созданной функции. где when
условие создается на основе любой информации, которую вы предоставляете в списке settings_array
.
import pandas as pd
from pyspark.sql import functions as F
def when_expression(settings_array):
when_condition = None
for a, b in settings_array:
if when_condition is None:
when_condition = F.when(F.col('device_timestamp') <= a, F.lit(b))
else:
when_condition = when_condition.when(F.col('device_timestamp') <= a, F.lit(b))
return when_condition
settings_array = [
[2, 3], # if <= 2 make it 3
[5, 7], # if <= 5 make it 7
[10, 100], # if <= 10 make it 100
]
df = pd.DataFrame({'device_timestamp': range(10)})
df = spark.createDataFrame(df)
df.show()
when_condition = when_expression(settings_array)
print(when_condition)
df = df.withColumn('setting_col', when_condition)
df.show()
Вывод:
----------------
|device_timestamp|
----------------
| 0|
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
----------------
Column<b'CASE WHEN (device_timestamp <= 2) THEN 3 WHEN (device_timestamp <= 5) THEN 7 WHEN (device_timestamp <= 10) THEN 100 END'>
---------------- -----------
|device_timestamp|setting_col|
---------------- -----------
| 0| 3|
| 1| 3|
| 2| 3|
| 3| 7|
| 4| 7|
| 5| 7|
| 6| 100|
| 7| 100|
| 8| 100|
| 9| 100|
---------------- -----------