Запуск мультипроцессоров python из динамического списка sql

#python #sql #loops #impala #multiprocess

Вопрос:

Привет, я пытаюсь сделать свой код немного более динамичным и умным, с этой целью я хочу вызывать функции, которые я хочу запускать в динамическом списке, вместо того, чтобы жестко их кодировать. Это очистит код и поможет с автоматическим повторным запуском неудачных сценариев.

Ниже приведен фрагмент кода, над которым я работал. обычно я получаю эту ошибку при запуске

Ошибка типа: объект ‘str’ не может быть вызван

  cursor = conn.cursor()
    cursor.execute(
        f'''select distinct caller from {db}.log a where a.log_text like 'Failed:%' and a.log_time > DATE_TRUNC('DAY', NOW()) and caller not in (select caller from {db}.log a where a.log_text like 'Done' and a.log_time > DATE_TRUNC('DAY', NOW()))''')
    df = as_pandas(cursor)
    print('The following scripts will be rerun')
    print(df)

    c = df['caller']

    processes = []
    # Loop over failed scripts/modules
    for mod in (c):  
        print(f'Rerun of {c}')
        p = multiprocessing.Process(target=mod, args=(db,))
        time.sleep(10)
        p.start()
        processes.append(p)

    for process in processes:
        process.join()
 

Ошибка полного отслеживания

Обратная связь (последний последний вызов): Файл «/home/xxx/anaconda3/lib/python3.6/multiprocessing/process.py», строка 258, в файле _bootstrap self.run () «/home/xxx/anaconda3/lib/python3.6/multiprocessing/process.py», строка 93, в запуске self._target(*self._args, **self._kwargs) Ошибка типа: объект ‘str’ не может быть вызван процессом Процесс-2: Обратная трассировка (последний последний вызов): Файл «/home/xxx/anaconda3/lib/python3.6/multiprocessing/process.py», строка 258, в файле _bootstrap self.run () «/home/xxx/anaconda3/lib/python3.6/multiprocessing/process.py», строка 93, в run self._target(*self._args, **self._kwargs) Ошибка типа: объект ‘str’ не может быть вызван, процесс завершен с кодом выхода 0

Комментарии:

1. Новый target подпроцесс должен быть объектом функции, но вы указываете только имя функции.

2. Могу ли я преобразовать имя в объект функции с тем же именем ?

3. Было бы лучше найти функцию, обозначенную именем, или создать диктант для сопоставления имен функций с объектами. Где функции, которые должен вызывать подпроцесс, тот же модуль, что и показанный код, другой модуль, разбросанный по нескольким модулям?

4. Они разбросаны по множеству различных сценариев/модулей. Обычно я просто называю их как » для мода в (foo_1,foo_2…..):», Но для этого кода для каждого запуска будет отличаться, какие сценарии или функции я хочу вызвать, в зависимости от того, что могло не сработать во время первого запуска.

5. Тогда словарь с именами функций в качестве ключей и объектами функций в качестве значений-лучший способ решить эту проблему, я думаю.

Ответ №1:

Поэтому я нашел решение этой проблемы и разместил его здесь на случай, если кто-нибудь сможет использовать его в будущем.

В основном я использую функции getattr и importlib.import_module. Затем я запускаю их через цикл for.

Поэтому я использую sql, чтобы получить имя всех отказавших модулей, а затем загрузить эти модули в список df, который затем повторяется через цикл for, который запустит отказавший модуль.

 cursor.execute(
            f'''select distinct caller from {db}.sch_log_python a where a.log_text like 'Failed:%' and lower(caller) in ('st_%','ctrl_%','ar%') and a.log_time > DATE_TRUNC('DAY', NOW()) and caller not in (select caller from {db}.sch_log_python a where a.log_text like 'Done' and a.log_time > DATE_TRUNC('DAY', NOW()))''')
        df = as_pandas(cursor)
        print('The following scripts will be rerun')
        print(df)

        sch_log_func(caller, 'Rerun of failed scripts', db)

        c = df
        # Loop over failed scripts/modules
        for i in (c):
            cstr = c.to_string(index=False, header=False)
            print(f'Initialise '   cstr)
            cstr = cstr.lower()
            print(cstr   ' Module to import')
            print('Trying to run '   cstr)
            cls = getattr(importlib.import_module(cstr), cstr)
            cls(db)
            print(f'Started {cstr}')