#python #multiprocessing
#python #многопроцессорная обработка
Вопрос:
Я пытаюсь запустить 2 функции над двумя разными процессами с помощью многопроцессорной обработки.Pool().apply_async() , но, похоже, он не запускает мою функцию. Сообщений об ошибках нет. Но когда я пробую свою функцию без многопроцессорной обработки, она работает отлично. Вот как выглядит мой код (краткая версия) :
import multiprocessing, twitch_integration
p = multiprocessing.Pool()
p.apply_async(twitch_integration.get_user_followers, args=(userid1, "", conn,))
p.apply_async(twitch_integration.get_user_followers, args=(userid2, "", conn,))
p.close()
p.join()
Я не знаю, важно ли это отметить, но функция get_user_followers является рекурсивной и запрашивает модуль.
Я поставил печать в самом начале get_user_followers , но она ничего не печатает.
Я искал последние 4 часа, я не преувеличиваю. Если бы кто-нибудь мог мне помочь, я был бы очень благодарен. Спасибо.
После запроса результатов, как упоминал @Steve, возникла новая ошибка, и вот обратная трассировка :
Traceback (most recent call last):
File "main.py", line 89, in <module>
main(conn, cursor);
File "main.py", line 68, in main
r = result.get(timeout=1)
File "C:Python38libmultiprocessingpool.py", line 771, in get
raise self._value
File "C:Python38libmultiprocessingpool.py", line 537, in _handle_tasks
put(task)
File "C:Python38libmultiprocessingconnection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:Python38libmultiprocessingreduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "C:Python38libsocket.py", line 272, in __getstate__
raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
TypeError: cannot pickle 'SSLSocket' object
Вот функция get_user_followers для получения более подробной информации (я знаю, что это некрасиво, но я начинаю с Python и программирования):
import requests, json, sys, time
from mysql.connector import Error
def get_user_followers(user_id, pagination, conn):
global FOLLOWER_COUNT
global ERROR_COUNT
global OAUTH_TOKEN
global TOTAL_FOLLOWER
global ERROR_IN_A_ROW
print("YES")
after = "amp;after={0}".format(pagination)
first = "amp;first=100"
query = 'users/follows?to_id={0}{1}{2}'.format(user_id, first, pagination if pagination == "" else after)
try:
response = get_response(query)
except requests.exceptions.RequestException as e:
print("pause 1 seconde. . . Retrying same request")
time.sleep(1)
print("Request error: ", e)
get_user_followers(user_id, pagination, conn)
return
finally:
pass
if response.status_code == 200:
ERROR_IN_A_ROW = 0
elif response.status_code == 401:
print(response.json())
print("HTTP Error 401")
ERROR_IN_A_ROW = 1
if ERROR_IN_A_ROW == 3:
print("AFTER 3 HTTP ERROR IN A ROW - EXITING PROGRAM")
print("pagination: ", pagination, " user_id", user_id)
return
OAUTH_TOKEN = requests.post(POST_URL, data=POST_PARAMS).json()['access_token']
get_user_followers(user_id, pagination, conn)
return
elif response.status_code == 429:
print(response.json())
print("HTTP Error 429")
time.sleep(int(response.json()['Ratelimti-Reset']))
get_user_followers(user_id, pagination, conn)
return
else:
print(response.json())
print("HTTP Error {0}".format(response.status_code))
ERROR_IN_A_ROW = 1
if ERROR_IN_A_ROW == 3:
print("AFTER 3 HTTP ERROR IN A ROW - EXITING PROGRAM")
print("pagination: ", pagination, " user_id", user_id)
return
get_user_followers(user_id, pagination, conn)
return
response = response.json()
TOTAL_FOLLOWER = response['total']
length = len(response['data'])
if length == 0:
try:
print("{0}/{1}".format(FOLLOWER_COUNT, TOTAL_FOLLOWER) " ({0}%)".format(format((FOLLOWER_COUNT * 100 / TOTAL_FOLLOWER), ".2f")) "r", end="")
except:
print("{0}/{1}".format(FOLLOWER_COUNT, TOTAL_FOLLOWER))
FOLLOWER_COUNT = 0
ERROR_COUNT = 0
TOTAL_FOLLOWER = 0
return 1
pagination = response['pagination']['cursor']
i = 0
while i < length:
try:
conn.cursor().execute("""INSERT INTO user (id, username) VALUES ({0}, '{1}')""".format(response['data'][i]['from_id'], str(response['data'][i]['from_name'])))
except Error as e:
print("An error has occured : ", e)
ERROR_COUNT = 1
finally:
pass
i = 1
FOLLOWER_COUNT = 1
try:
print("{0}/{1}".format(FOLLOWER_COUNT, TOTAL_FOLLOWER) " ({0}%)".format(format((FOLLOWER_COUNT * 100 / TOTAL_FOLLOWER), ".2f")) " {0}".format(user_id))
except:
print("{0}/{1}".format(FOLLOWER_COUNT, TOTAL_FOLLOWER))
get_user_followers(user_id, pagination, conn)
Ответ №1:
Чтобы запустить ваши задания / функции, вы должны запросить их результаты. Это должно запускать ваши задания:
import multiprocessing, twitch_integration
p = multiprocessing.Pool()
result = p.apply_async(twitch_integration.get_user_followers, args=(userid1, "", conn,))
r = result.get(timeout=1)
result = p.apply_async(twitch_integration.get_user_followers, args=(userid2, "", conn,))
r = result.get(timeout=1)
p.close()
p.join()
Комментарии:
1. У меня новая ошибка: TypeError: не удается выделить объект ‘SSLSocket’. Я сейчас ищу, как ее решить, чтобы я мог увидеть, сработало ли ваше решение 🙂
2. На самом деле для результата не должно иметь значения, получаете ли вы
.get()
результат. Но это.get()
всегда должно выполняться в любом случае, потому что, как вы только что обнаружили, иногда это единственный способ увидеть исключения, возникающие асинхронно, Если возможно, попробуйте переписать код так, чтобы вместо этого вызывать именованную функцию уровня модуля. Попытка вызвать метод сложного объекта создает множество возможных режимов сбоев, которые трудно отследить, потому что они происходят не в основной программе, а в недрах механизма mp.3. Вы знаете, что такое травление? Ваша программа пытается превратить объект в статическое представление, которое может быть отправлено из одного процесса в другой. Какой это объект, я не могу знать, потому что вы не предоставили достаточно информации. Проблема в том, что этот объект, который он пытается сериализовать, содержит сокет и поэтому не может быть сериализован. Это имеет смысл. Живое соединение с сокетом не может быть полностью представлено в виде нескольких байтов, поскольку состояние сокета зависит от состояния систем по обе стороны сокета.
4. @TimPeters, я пытаюсь понять, что ты говоришь. Если в
.get()
, не означает ли это, что они не выполнялись? Или многопроцессорный модуль действительно захватывает всеstdout
выходные данные и затем отправляет их на консоль только тогда, когда я запрашиваю результат? Для меня это звучит немного надуманно. Я предположил, что отсутствие какого-либо вывода на мою консоль означает. что мой код не выполнялся. Пожалуйста, объясните, что происходит, если вы считаете, что я ошибаюсь.5. @TimPeters Можно я тебя поцелую? Это работает <3 Я просто переместил функцию conn creator в тот же файл, что и get_user_followers, и теперь она работает. Кроме того, ему больше не нужен результат = amp; r = result.get(timeout= 1),