Как использовать фрейм данных Dask с Numpy Busday_count?

#python #pandas #numpy #dask #dask-dataframe

#python #панды #numpy #dask #dask-фрейм данных

Вопрос:

Я работаю над преобразованием кода Pandas / Numpy в Dask для обработки больших наборов данных. Кажется, я не могу воссоздать следующий код Pandas / Numpy:

 df['days_to_complete'] = np.busday_count(begindates=df['time_order_date'].values.astype('datetime64[D]'),enddates=df['time_complete_date'],weekmask='1111111',holidays=hols_list)
  

Это возвращает целое число дней между time_order_date и time_complete_date при рассмотрении списка рабочей недели и праздников. Он создает и заполняет новый столбец в моем фрейме данных, никаких проблем.

В Dask я попробовал следующее:

  1. map_partitions вызывает функцию numpy:

    ddf['days_to_complete'] = ddf.time_order.map_partitions(func=np.busday_count,args= ddf['time_order_date'].values.astype('datetime64[D]'),ddf['time_complete_date']),meta=(None, 'i8'))

  2. Также map_partitions с использованием lambda:

    ddf['days_to_complete'] = ddf.map_partitions(lambda ddf: ddf.assign(result = np.busday_count(begindates=ddf['time_order_date'].values.astype('datetime64[D]'),enddates=ddf['time_complete_date'],weekmask='1111111',holidays=hols_list)),meta=(None,'i8'))

и получите следующую ошибку после запуска ddf.compute():

 TypeError: busday_count() got multiple values for argument 'begindates'
  

Как лучше всего использовать эту функцию numpy для параллельной обработки / Dask?
Я не добился успеха, используя документы / примеры Dask или другие потоки SO.
Я бы хотел также использовать Pandas CustomBusinessHour rollfoward, как я работаю в basic pandas здесь:

 bis_hour = CustomBusinessHour(n=1,weekmask='Mon Tue Wed Thu Fri Sat Sun',holidays=hols_list,start = bus_hours_start,end = bus_hours_end,offset=0)
df['time_order_bis'] = pd.to_datetime(df['time_order'])
df['time_order_bis'] = df['time_order_bis'].apply(lambda row: bis_hour.rollforward(row))
  

Это «переносит» время заказа в установленные рабочие часы клиента (субботний заказ теперь составляет 7 утра понедельника, рабочий день). Спасибо!

Редактировать: я пробовал писать и вызывать функцию:

 def bdays(df):
  return np.busday_count(df.time_order_date.values.astype('datetime64[D]'),df.time_complete_date,weekmask='1111111',holidays=hols_list)
ddf['days_to_complete'] = ddf.map_partitions(bdays,df=ddf,meta=('days_to_complete','i8')).compute()
  

Я получаю следующую ошибку: TypeError: bdays() got multiple values for argument 'df'

Ответ №1:

У меня это работает! Ключ должен был возвращать массив Dask и не вычислять вещи слишком рано, что нарушает типы. Я рекомендую выполнять множество проверок типа () и идти шаг за шагом, вам нужны объекты Dask на всем пути, по сути, объекты pandas / массивы numpy могут нарушать разделение / параллелизм.

Функция:

 def bdays(df=ddf):


return da.from_array(np.busday_count(df.time_order_date,df.time_complete_date,weekmask='1111111',holidays=hols_list))
  

Используйте map_partitions. Пожалуйста, обратите внимание, что для первого параметра функции выше требуется фрейм данных / раздел -> мы не указываем это в разделах карты! Только дополнительные параметры.

 ddf['days_to_complete'] = ddf.map_partitions(bdays,meta=('days_to_complete','i8'))
  

Вычисления (compute()) перед присвоением новому столбцу в моем фрейме данных вызвали ошибки.

 TypeError: set_index() missing 1 required positional argument: 'other'
  

Предложение по отладке:
Проверьте свои входные данные и протестируйте функцию только с одним разделом. bdays — это функция сверху.

 type(ddf.map_partitions(bdays,meta='i8'))
  

output: dask.dataframe.core.Series