#python #pandas #numpy #dask #dask-dataframe
#python #панды #numpy #dask #dask-фрейм данных
Вопрос:
Я работаю над преобразованием кода Pandas / Numpy в Dask для обработки больших наборов данных. Кажется, я не могу воссоздать следующий код Pandas / Numpy:
df['days_to_complete'] = np.busday_count(begindates=df['time_order_date'].values.astype('datetime64[D]'),enddates=df['time_complete_date'],weekmask='1111111',holidays=hols_list)
Это возвращает целое число дней между time_order_date и time_complete_date при рассмотрении списка рабочей недели и праздников. Он создает и заполняет новый столбец в моем фрейме данных, никаких проблем.
В Dask я попробовал следующее:
-
map_partitions вызывает функцию numpy:
ddf['days_to_complete'] = ddf.time_order.map_partitions(func=np.busday_count,args= ddf['time_order_date'].values.astype('datetime64[D]'),ddf['time_complete_date']),meta=(None, 'i8'))
-
Также map_partitions с использованием lambda:
ddf['days_to_complete'] = ddf.map_partitions(lambda ddf: ddf.assign(result = np.busday_count(begindates=ddf['time_order_date'].values.astype('datetime64[D]'),enddates=ddf['time_complete_date'],weekmask='1111111',holidays=hols_list)),meta=(None,'i8'))
и получите следующую ошибку после запуска ddf.compute():
TypeError: busday_count() got multiple values for argument 'begindates'
Как лучше всего использовать эту функцию numpy для параллельной обработки / Dask?
Я не добился успеха, используя документы / примеры Dask или другие потоки SO.
Я бы хотел также использовать Pandas CustomBusinessHour rollfoward, как я работаю в basic pandas здесь:
bis_hour = CustomBusinessHour(n=1,weekmask='Mon Tue Wed Thu Fri Sat Sun',holidays=hols_list,start = bus_hours_start,end = bus_hours_end,offset=0)
df['time_order_bis'] = pd.to_datetime(df['time_order'])
df['time_order_bis'] = df['time_order_bis'].apply(lambda row: bis_hour.rollforward(row))
Это «переносит» время заказа в установленные рабочие часы клиента (субботний заказ теперь составляет 7 утра понедельника, рабочий день). Спасибо!
Редактировать: я пробовал писать и вызывать функцию:
def bdays(df):
return np.busday_count(df.time_order_date.values.astype('datetime64[D]'),df.time_complete_date,weekmask='1111111',holidays=hols_list)
ddf['days_to_complete'] = ddf.map_partitions(bdays,df=ddf,meta=('days_to_complete','i8')).compute()
Я получаю следующую ошибку: TypeError: bdays() got multiple values for argument 'df'
Ответ №1:
У меня это работает! Ключ должен был возвращать массив Dask и не вычислять вещи слишком рано, что нарушает типы. Я рекомендую выполнять множество проверок типа () и идти шаг за шагом, вам нужны объекты Dask на всем пути, по сути, объекты pandas / массивы numpy могут нарушать разделение / параллелизм.
Функция:
def bdays(df=ddf):
return da.from_array(np.busday_count(df.time_order_date,df.time_complete_date,weekmask='1111111',holidays=hols_list))
Используйте map_partitions. Пожалуйста, обратите внимание, что для первого параметра функции выше требуется фрейм данных / раздел -> мы не указываем это в разделах карты! Только дополнительные параметры.
ddf['days_to_complete'] = ddf.map_partitions(bdays,meta=('days_to_complete','i8'))
Вычисления (compute()) перед присвоением новому столбцу в моем фрейме данных вызвали ошибки.
TypeError: set_index() missing 1 required positional argument: 'other'
Предложение по отладке:
Проверьте свои входные данные и протестируйте функцию только с одним разделом. bdays — это функция сверху.
type(ddf.map_partitions(bdays,meta='i8'))
output: dask.dataframe.core.Series