Добавление столбца во время получения данных с помощью ThreadPoolExecutor в Python

#python #multithreading #scrape

#python #многопоточность #очистить

Вопрос:

Я хочу прочитать разные страницы по ссылке ниже с разными номерами, используя ThreadPoolExecutor, и сохранить связанные номера в dataframe в качестве нового столбца.

https://booking.snav.it/api/v1/rates/1030/2019-02-25/1042/2019-02-25?lang=1

Числа меняются, как показано ниже:

 from concurrent.futures import ThreadPoolExecutor, as_completed
from pandas import json_normalize
import pandas as pd
import requests


def download_file(url):
    url_info = requests.get(url, stream=True)
    jdata = url_info.json()
    return jdata


nums = [1030,1031,1040,1050,1020,1021,1010,1023]
urls= [f"https://booking.snav.it/api/v1/rates/{i}/2019-02-25/1042/2019-02-25?lang=1" for i in nums]
with ThreadPoolExecutor(max_workers=14) as executor:
     for url in urls:
         sleep(0.1)
         processes.append(executor.submit(download_file, url))

for index, task in enumerate(as_completed(processes)):
    jdata = task.result()
    tmp = json_normalize(jdata)
    tmp["num"] = nums[index]
df = df.append(tmp)
print(df.head())
 

В приведенном выше коде я попытался прочитать данные, используя многопоточность, и соответствующий номер для каждого ответа json в качестве нового столбца df dataframe. Но этот код не работает, из-за использования многопоточности порядок nums чисел не совпадает с обработанными ответами json. Что мне делать?

Комментарии:

1. вы можете enumerate() использовать URL-адреса (index, url) , отправлять и возвращать (index, jdata) их, чтобы позже index использовать для сортировки результатов в правильном порядке.

Ответ №1:

Попробуйте это:

 from concurrent.futures import ThreadPoolExecutor

...

with ThreadPoolExecutor(max_workers=14) as executor:
     rv = executor.map(download_file, urls)

for index, jdata in enumerate(rv):
    tmp = json_normalize(jdata)
    tmp["num"] = nums[index]
    df.append(tmp)

print(df.head())
 

Комментарии:

1. Что я должен импортировать, чтобы использовать mp ?

2. Я допустил ошибку, и теперь, когда вы отредактировали свой вопрос, все выглядит нормально. map сохраняет в результатах порядок итераций, переданных для аргументов. Проверьте starmap наличие другого полезного интерфейса.

3. Я пишу это, но когда я хочу запустить for index, task in enumerate(as_completed(rv)): , я сталкиваюсь с ошибкой: Exception has occurred: TypeError x unhashable type: 'list' для следующего for цикла

4. Вы должны устранить as_completed вызов. Прочитайте map документацию: возвращаемое значение содержит результаты вызовов функций.

5. О, черт возьми… Я продолжаю оставлять небольшие ошибки в коде. rv является эквивалентом rv= [download_file(f) for f in urls] , но с использованием пула потоков.