#python #pandas #api #python-requests #concurrent.futures
#python #pandas #API #python-запросы #concurrent.futures
Вопрос:
У меня есть один фрейм данных из пары тысяч строк
input_df
case_id api_param stat
1 data1 1
2 data2 0
1 data3 0
4 data4 0
1 data5 1
Я делаю а groupBy(case_id)
и получаю:
case_id 1 2 3
1 data1 data3 data5
2 data2 nan nan
4 data4 nan nan
Теперь предположим, что для каждого case_id
из них я хотел бы изменить значение даты в столбце api_param для всех case_id, где stat == 0. => изменить data2, data3, data4.
Для этого я решаю выбрать новые данные в пределах k точек данных из предыдущих данных и вызвать API, чтобы проверить, что данные действительны;
ie: url = https: // example..com/over/there?name=api_param [i] с помощью api_param == data2 k данных pnt, например, для case_id ==2 выше. если ответ API равен 200, я могу перезаписать старое значение в input_df
.
Теперь у меня могут быть тысячи таких случаев в моем файле, и в каждом случае есть много точек данных для изменения. Допустим, у меня есть 300 случаев, каждый из которых имеет 100 дат для изменения
И, следовательно, использование API запросов Python будет очень медленным. Я хотел бы использовать concurrent.futures; Как я мог бы это сделать?
Ответ №1:
Вы могли бы использовать multiprocessing.pool.ThreadPool
.
from multiprocessing.pool import ThreadPool
from datetime import timedelta
# get dates with stat=0
dates = input_df[input_df['stat']==0]['api_param']
# get urls, add 7 days to date (assuming date is already datetime.datetime)
urls = dates.apply(lambda date_obj: 'https:// example..com/over/there?name=%s' % str(date_obj timedelta(days=7))).tolist()
with ThreadPool(10) as pool:
results = pool.map(request.get, urls)
# add request status to input_df
input_df['request_status'] = 0
input_df.loc[input_df['stat']==0, 'request_status'] = [x.status_code for x in results]
# update dates
input_df.loc[(input_df['stat']==0) amp; (input_df['request_status']==200), 'api_params'] = input_df.loc[(input_df['stat']==0) amp; (input_df['request_status']==200), 'api_params'].apply(lambda date_obj: date_obj timdelta(days=7))
Ответ №2:
Пожалуйста, попробуйте использовать эти функции
def check_api_call(count, dates):
length = dates.values.__len__()
executor = futures.ThreadPoolExecutor()
for i in range(length):
date = dates.values[i]
pool = executor.submit(task_api, date)
response = pool.result()
while not response:
count = count 1
day_value = count * 7
td = pd.to_timedelta(day_value, unit='d')
delta_date = datetime.strptime(date, "%Y-%m-%d") td
new_date = delta_date.strftime("%Y-%m-%d")
pool = executor.submit(task_api, new_date)
response = pool.result()
if not response:
continue
dates.values[i] = new_date
return True, dates
def task_api(date):
url = "https:// example..com/over/there?name=" date
response = requests.get(url)
if response.status_code == 404:
return False
else:
return True