параллельная многопоточность.фьючерсы с 2 списками в качестве переменных

#python #python-3.x #multithreading #concurrent.futures

Вопрос:

Поэтому я хотел бы многопоточно выполнить следующий рабочий фрагмент кода с параллельными фьючерсами, но ничего из того, что я пробовал до сих пор, похоже, не работает.

 def download(song_filename_list, song_link_list):

    with requests.Session() as s:
    
        login_request = s.post(login_url, data= payload, headers= headers)

        for x in range(len(song_filename_list)):

            download_request = s.get(song_link_list[x], headers= download_headers, stream=True)

            if download_request.status_code == 200:
                print(f"Downloading {x 1} out of {len(song_filename_list)}!n")
                pass
            else:
                print(f"nStatus Code: {download_request.status_code}!n")
                sys.exit()

            
            with open (song_filename_list[x], "wb") as file:
                file.write(download_request.content)
 

2 основными переменными являются the song_filename_list и the song_link_list .

В первом списке указаны имена каждого файла, а во втором — все соответствующие ссылки для загрузки.
Таким образом, имя и ссылка каждого файла расположены в одной и той же позиции.
Например: name_of_file1 = song_filename_list[0] и link_of_file1 = song_link_list[0]


Это самая последняя попытка многопоточности:

 def download(song_filename_list, song_link_list):

    with requests.Session() as s:
    
        login_request = s.post(login_url, data= payload, headers= headers)

        x = []
        for i in range(len(song_filename_list)):
            x.append(i)


        with concurrent.futures.ThreadPoolExecutor() as executor:
            executor.submit(get_file, x)


def get_file(x):
    
    download_request = s.get(song_link_list[x], headers= download_headers, stream=True)

    if download_request.status_code == 200:
        print(f"Downloading {x 1} out of {len(song_filename_list)}!n")
        pass
    else:
        print(f"nStatus Code: {download_request.status_code}!n")
        sys.exit()

        
    with open (song_filename_list[x], "wb") as file:
        file.write(download_request.content)

 

Может ли кто-нибудь объяснить мне, что я делаю не так?
Потому что после get_file вызова функции ничего не происходит.
Он пропускает весь код и завершает работу без каких-либо ошибок, так в чем же моя логика неверна?


ПРАВКА 1:

After adding prints to:

 print(song_filename_list, song_link_list)
        with concurrent.futures.ThreadPoolExecutor() as executor:
            print("Before executor.map")
            executor.map(get_file, zip(song_filename_list, song_link_list))
            print("After executor.map")
            print(song_filename_list, song_link_list)
 

И к началу, и к концу get_file , и к своему file.write .

Результат выглядит следующим образом:

 
Succesfully logged in!

["songs names"] ["songs links"]    <- These are correct.
Before executor.map
After executor.map
["songs names"] ["songs links"]    <- These are correct.

Exiting.
 

Другими словами, значения верны, но он пропускает get_file в executor.map .


ПРАВКА 2:

Вот используемые значения.

  • song_filename_list = ['100049 Himeringo - Yotsuya-san ni Yoroshiku.osz', '1001507 ZUTOMAYO - Kan Saete Kuyashiiwa.osz']
  • song_link_list = ['https://osu.ppy.sh/beatmapsets/100049/download', 'https://osu.ppy.sh/beatmapsets/1001507/download']

ПРАВКА 3:

После некоторых размышлений может показаться, что это работает.

 for i in range(len(song_filename_list)):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.submit(get_file, song_filename_list, song_link_list, i, s)
 
 def get_file(song_filename_list, song_link_list, i, s):
    
    download_request = s.get(song_link_list[i], headers= download_headers, stream=True)

    if download_request.status_code == 200:
        print("Downloading...")
        pass
    else:
        print(f"nStatus Code: {download_request.status_code}!n")
        sys.exit()
    
    with open (song_filename_list[i], "wb") as file:
        file.write(download_request.content)
 

Комментарии:

1. Функция, которую вы пытаетесь выполнить одновременную запись на диск, вы проверили, что она действительно записала файлы?

2. Поскольку вы пытаетесь распараллелить только одну функцию над несколькими входными данными, хранящимися в итерационном режиме, вы можете .map() вместо этого использовать метод исполнителя.

Ответ №1:

В своей download() функции вы отправляете весь массив, в то время как вы должны отправлять каждый элемент:

 def download(song_filename_list, song_link_list):
    with requests.Session() as s:
        login_request = s.post(login_url, 
            data=payload, 
            headers=headers)

        for i in range(len(song_filename_list)):
            with concurrent.futures.ThreadPoolExecutor() as executor:
                executor.submit(get_file, i)
 

Вы можете упростить это с .map() помощью метода исполнителя:

 def download(song_filename_list, song_link_list):
  with requests.Session() as session:
    session.post(login_url, 
        data=payload, 
        headers=headers)

  with concurrent.futures.ThreadPoolExecutor() as executor:
    executor.map(get_file, song_filename_list, song_link_list)
 

Где get_file функция:

 def get_file(song_name, song_link):
  with requests.Session() as session:
    download_request = session.get(song_link, 
        headers=download_headers, 
        stream=True)

  if download_request.status_code == 200:
    print(f"Downloaded {song_name}")
  else:
    print(f"nStatus Code: {download_request.status_code}!n")
  
  with open(song_name, "wb") as file:
    file.write(download_request.content)
 

Это позволяет избежать совместного использования состояния между потоками, что позволяет избежать потенциальных гонок данных.

Если вам нужно отслеживать, сколько песен было загружено, вы можете использовать tqdm, в котором есть thread_map оболочка итератора, которая делает именно это.

Комментарии:

1. Поэтому я попробовал оба приведенных вами примера, но ни один из них не сработал. И чтобы ответить на ваш вопрос выше, все методы параллелизма, которые я пробовал (2, которые вы предоставили, и другие, которые я написал сам), ничего не записывают на диск.

2. Параллельный код должен быть правильным, можете ли вы добавить отладочные отпечатки везде, чтобы увидеть, в чем проблема (при отправке работы, начале get_file, при записи файла и т. Д.)? Кроме того, удалите sys.exit() , чтобы увидеть больше ошибок.

3. Отредактировал мой вопрос с выводами, а также удалил sys.exit() .

4. Спасибо, вы можете добавить печать в файл get_file (до/после запросов get, в с открытым (…) и т.д.).

5. Ну, я повозился и кое-что нашел (см. правку 3), но мне не удалось попробовать ваше предложение, потому что я получаю 429, и ему нужно около часа, чтобы остыть. Я попробую это завтра, хотя и посмотрю, что работает лучше всего, после того, как я немного рассчитаю время. Большое спасибо.