многопроцессорная обработка.Pool().apply_async(), похоже, не запускает мою функцию

#python #multiprocessing

#python #многопроцессорная обработка

Вопрос:

Я пытаюсь запустить 2 функции над двумя разными процессами с помощью многопроцессорной обработки.Pool().apply_async() , но, похоже, он не запускает мою функцию. Сообщений об ошибках нет. Но когда я пробую свою функцию без многопроцессорной обработки, она работает отлично. Вот как выглядит мой код (краткая версия) :

 import multiprocessing, twitch_integration
p = multiprocessing.Pool()
p.apply_async(twitch_integration.get_user_followers, args=(userid1, "", conn,))
p.apply_async(twitch_integration.get_user_followers, args=(userid2, "", conn,))
p.close()
p.join()
  

Я не знаю, важно ли это отметить, но функция get_user_followers является рекурсивной и запрашивает модуль.
Я поставил печать в самом начале get_user_followers , но она ничего не печатает.

Я искал последние 4 часа, я не преувеличиваю. Если бы кто-нибудь мог мне помочь, я был бы очень благодарен. Спасибо.

После запроса результатов, как упоминал @Steve, возникла новая ошибка, и вот обратная трассировка :

 Traceback (most recent call last):
  File "main.py", line 89, in <module>
    main(conn, cursor);
  File "main.py", line 68, in main
    r = result.get(timeout=1)
  File "C:Python38libmultiprocessingpool.py", line 771, in get
    raise self._value
  File "C:Python38libmultiprocessingpool.py", line 537, in _handle_tasks
    put(task)
  File "C:Python38libmultiprocessingconnection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "C:Python38libmultiprocessingreduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "C:Python38libsocket.py", line 272, in __getstate__
    raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
TypeError: cannot pickle 'SSLSocket' object
  

Вот функция get_user_followers для получения более подробной информации (я знаю, что это некрасиво, но я начинаю с Python и программирования):

 import requests, json, sys, time
from mysql.connector import Error
def get_user_followers(user_id, pagination, conn):
    global FOLLOWER_COUNT
    global ERROR_COUNT
    global OAUTH_TOKEN
    global TOTAL_FOLLOWER
    global ERROR_IN_A_ROW
    print("YES")
    after = "amp;after={0}".format(pagination)
    first = "amp;first=100"
    query = 'users/follows?to_id={0}{1}{2}'.format(user_id, first, pagination if pagination == "" else after)
    try:
        response = get_response(query)
    except requests.exceptions.RequestException as e:
        print("pause 1 seconde. . . Retrying same request")
        time.sleep(1)
        print("Request error: ", e)
        get_user_followers(user_id, pagination, conn)
        return
    finally:
        pass
    if response.status_code == 200:
        ERROR_IN_A_ROW = 0
    elif response.status_code == 401:
        print(response.json())
        print("HTTP Error 401")
        ERROR_IN_A_ROW  = 1
        if ERROR_IN_A_ROW == 3:
            print("AFTER 3 HTTP ERROR IN A ROW - EXITING PROGRAM")
            print("pagination: ", pagination, " user_id", user_id)
            return
        OAUTH_TOKEN = requests.post(POST_URL, data=POST_PARAMS).json()['access_token']
        get_user_followers(user_id, pagination, conn)
        return
    elif response.status_code == 429:
        print(response.json())
        print("HTTP Error 429")
        time.sleep(int(response.json()['Ratelimti-Reset']))
        get_user_followers(user_id, pagination, conn)
        return
    else:
        print(response.json())
        print("HTTP Error {0}".format(response.status_code))
        ERROR_IN_A_ROW  = 1
        if ERROR_IN_A_ROW == 3:
            print("AFTER 3 HTTP ERROR IN A ROW - EXITING PROGRAM")
            print("pagination: ", pagination, " user_id", user_id)
            return
        get_user_followers(user_id, pagination, conn)
        return
    response = response.json()
    TOTAL_FOLLOWER = response['total']
    length = len(response['data'])
    if length == 0:
        try:
            print("{0}/{1}".format(FOLLOWER_COUNT, TOTAL_FOLLOWER)   " ({0}%)".format(format((FOLLOWER_COUNT * 100 / TOTAL_FOLLOWER), ".2f"))   "r", end="")
        except:
            print("{0}/{1}".format(FOLLOWER_COUNT, TOTAL_FOLLOWER))
        FOLLOWER_COUNT = 0
        ERROR_COUNT = 0
        TOTAL_FOLLOWER = 0
        return 1
    pagination = response['pagination']['cursor']
    i = 0
    while i < length:
        try:
            conn.cursor().execute("""INSERT INTO user (id, username) VALUES ({0}, '{1}')""".format(response['data'][i]['from_id'], str(response['data'][i]['from_name'])))
        except Error as e:
            print("An error has occured : ", e)
            ERROR_COUNT  = 1
        finally:
            pass
        i  = 1
        FOLLOWER_COUNT  = 1
        try:
            print("{0}/{1}".format(FOLLOWER_COUNT, TOTAL_FOLLOWER)   " ({0}%)".format(format((FOLLOWER_COUNT * 100 / TOTAL_FOLLOWER), ".2f"))   " {0}".format(user_id))
        except:
            print("{0}/{1}".format(FOLLOWER_COUNT, TOTAL_FOLLOWER))
    get_user_followers(user_id, pagination, conn)
  

Ответ №1:

Чтобы запустить ваши задания / функции, вы должны запросить их результаты. Это должно запускать ваши задания:

 import multiprocessing, twitch_integration
p = multiprocessing.Pool()
result = p.apply_async(twitch_integration.get_user_followers, args=(userid1, "", conn,))
r = result.get(timeout=1)
result = p.apply_async(twitch_integration.get_user_followers, args=(userid2, "", conn,))
r = result.get(timeout=1)
p.close()
p.join()
  

Комментарии:

1. У меня новая ошибка: TypeError: не удается выделить объект ‘SSLSocket’. Я сейчас ищу, как ее решить, чтобы я мог увидеть, сработало ли ваше решение 🙂

2. На самом деле для результата не должно иметь значения, получаете ли вы .get() результат. Но это .get() всегда должно выполняться в любом случае, потому что, как вы только что обнаружили, иногда это единственный способ увидеть исключения, возникающие асинхронно, Если возможно, попробуйте переписать код так, чтобы вместо этого вызывать именованную функцию уровня модуля. Попытка вызвать метод сложного объекта создает множество возможных режимов сбоев, которые трудно отследить, потому что они происходят не в основной программе, а в недрах механизма mp.

3. Вы знаете, что такое травление? Ваша программа пытается превратить объект в статическое представление, которое может быть отправлено из одного процесса в другой. Какой это объект, я не могу знать, потому что вы не предоставили достаточно информации. Проблема в том, что этот объект, который он пытается сериализовать, содержит сокет и поэтому не может быть сериализован. Это имеет смысл. Живое соединение с сокетом не может быть полностью представлено в виде нескольких байтов, поскольку состояние сокета зависит от состояния систем по обе стороны сокета.

4. @TimPeters, я пытаюсь понять, что ты говоришь. Если в print моих функциях задания есть операторы, и то, что эти операторы печати выводят на консоль, не отображается, пока я не вызову .get() , не означает ли это, что они не выполнялись? Или многопроцессорный модуль действительно захватывает все stdout выходные данные и затем отправляет их на консоль только тогда, когда я запрашиваю результат? Для меня это звучит немного надуманно. Я предположил, что отсутствие какого-либо вывода на мою консоль означает. что мой код не выполнялся. Пожалуйста, объясните, что происходит, если вы считаете, что я ошибаюсь.

5. @TimPeters Можно я тебя поцелую? Это работает <3 Я просто переместил функцию conn creator в тот же файл, что и get_user_followers, и теперь она работает. Кроме того, ему больше не нужен результат = amp; r = result.get(timeout= 1),