Параллельные фьючерсы Python для фрейма данных Panda

#python #pandas #api #python-requests #concurrent.futures

#python #pandas #API #python-запросы #concurrent.futures

Вопрос:

У меня есть один фрейм данных из пары тысяч строк

input_df

 case_id api_param   stat
1        data1      1
2        data2      0
1        data3      0
4        data4      0
1        data5      1
 

Я делаю а groupBy(case_id) и получаю:

   case_id    1      2       3  
      1     data1  data3  data5          
      2     data2  nan    nan   
      4     data4  nan    nan
 

Теперь предположим, что для каждого case_id из них я хотел бы изменить значение даты в столбце api_param для всех case_id, где stat == 0. => изменить data2, data3, data4.
Для этого я решаю выбрать новые данные в пределах k точек данных из предыдущих данных и вызвать API, чтобы проверить, что данные действительны;

ie: url = https: // example..com/over/there?name=api_param [i] с помощью api_param == data2 k данных pnt, например, для case_id ==2 выше. если ответ API равен 200, я могу перезаписать старое значение в input_df .

Теперь у меня могут быть тысячи таких случаев в моем файле, и в каждом случае есть много точек данных для изменения. Допустим, у меня есть 300 случаев, каждый из которых имеет 100 дат для изменения

И, следовательно, использование API запросов Python будет очень медленным. Я хотел бы использовать concurrent.futures; Как я мог бы это сделать?

Ответ №1:

Вы могли бы использовать multiprocessing.pool.ThreadPool .

 from multiprocessing.pool import ThreadPool
from datetime import timedelta

# get dates with stat=0
dates = input_df[input_df['stat']==0]['api_param']
# get urls, add 7 days to date (assuming date is already datetime.datetime)
urls = dates.apply(lambda date_obj: 'https:// example..com/over/there?name=%s' % str(date_obj timedelta(days=7))).tolist()

with ThreadPool(10) as pool:
    results = pool.map(request.get, urls)

# add request status to input_df
input_df['request_status'] = 0
input_df.loc[input_df['stat']==0, 'request_status'] = [x.status_code for x in results]

# update dates
input_df.loc[(input_df['stat']==0) amp; (input_df['request_status']==200), 'api_params'] = input_df.loc[(input_df['stat']==0) amp; (input_df['request_status']==200), 'api_params'].apply(lambda date_obj: date_obj timdelta(days=7))
 

Ответ №2:

Пожалуйста, попробуйте использовать эти функции

 def check_api_call(count, dates):
    length = dates.values.__len__()
    executor = futures.ThreadPoolExecutor()
    for i in range(length):
        date = dates.values[i]
        pool = executor.submit(task_api, date)
        response = pool.result()
        while not response:
            count = count   1
            day_value = count * 7
            td = pd.to_timedelta(day_value, unit='d')
            delta_date = datetime.strptime(date, "%Y-%m-%d")   td
            new_date = delta_date.strftime("%Y-%m-%d")
            pool = executor.submit(task_api, new_date)
            response = pool.result()
            if not response:
                continue
            dates.values[i] = new_date
    return True, dates

def task_api(date):
    url = "https:// example..com/over/there?name="   date
    response = requests.get(url)
    if response.status_code == 404:
        return False
    else:
        return True