Проблемы с очередью ведения журнала и многопроцессорной обработкой

#python #logging #process

#python #ведение журнала #процесс

Вопрос:

В моей программе я хотел бы запустить несколько процессов и собрать сообщения журнала от каждого процесса в единую централизованную очередь сообщений. Я создал специальный класс LogMaster, роль которого заключается в сборе журналов из процессов и их печати через обработчик потоков и обработчик файлов. Затем я попытался запустить процессы, но произошла неправильная обработка моих журналов, поскольку они не печатали / не собирали сообщения журнала.

Ниже приведен мой код и выводимый результат при запуске скрипта. Я попытался определить, откуда возникла проблема, вставив отладочные сообщения путем печати и записи в файл журнала. «1 — start_process: условие печати работает!» но «2 — process_func: условие печати работает!» никогда не отображается, и я не понимаю почему. Я умоляю вас о помощи 🙂

Вывод

 [user@mydev dev]$ python main.py
2019-03-06 13:05:12,483 INFO    MyGraph started
1 - start_process: Print term is working!
2019-03-06 13:05:12,483 INFO    1 - start_process: LogMaster.info is working too!
2019-03-06 13:05:12,483 INFO    Start process 'graph.GraphGenerator'
2019-03-06 13:05:12,483 INFO    MyGraph ended
  

main.py

 #!/usr/bin/python2
from logger import set_logging, LogMaster
from graph import GraphGenerator

SERVICE_NAME = 'MyGraph'
set_logging(SERVICE_NAME)
LogMaster.start_logging()
LogMaster.info("{} started".format(SERVICE_NAME ))
service = GraphGenerator()
service.start_process()
LogMaster.info("{} ended".format(SERVICE_NAME))
LogMaster.stop_logging()
  

graph.py реализует класс GraphGenerator(), который будет отвечать за запуск нескольких процессов

 from multiprocessing import Process, Queue, Value, Lock
from logger import LogMaster, set_logging
import traceback

class GraphGenerator():

    def __init__(self):
        pass

    def start_process(self):
        LogMaster.print_term("1 - start_process: Print term is working!")
        LogMaster.info("1 - start_process: LogMaster.info is working too!")
        p = Process(target=self.process_func, args=(LogMaster.logging_queue,))
        LogMaster.info("Start process '{}'".format(self.__class__))
        p.start()

    def process_func(self, logging_queue):
        LogMaster.print_term("2 - process_func: Print term is working!")
        LogMaster.info("2 - process_func: LogMaster.info is working too!")
        try:
            LogMaster.set_logging_queue(logging_queue)
            LogMaster.print_term("process_func: Print term is working!")
            LogMaster.info("process_func: LogMaster.info is working too!")

        except Exception as e:
            print("Ex=================")
            LogMaster.info(traceback.format_exc())
  

logger.py внедрите класс LogMaster(), отвечающий за ведение журнала

 from __future__ import print_function
import os
import sys
import logging
from multiprocessing import Queue
from threading import Thread

class LogMaster(object):
    logging_queue = None
    logging_thread = None

    @classmethod
    def start_logging(self):
        # create pipe to centralise messages
        self.logging_queue = Queue()
        self.logging_thread = Thread(target=self.logging_func)
        self.logging_thread.daemon = True
        self.logging_thread.start()

    @classmethod
    def set_logging_queue(self, q):
        self.logging_queue = q

    @classmethod
    def stop_logging(self):
        self.logging_queue.put(None)
        self.logging_thread.join()

    @classmethod
    def print_term(self, msg, end='n'):
        self.logging_queue.put(("print", msg, end))

    @classmethod
    def log(self, loglevel, msg):
        #print(loglevel, msg)
        self.logging_queue.put(("log", loglevel, msg))

    @classmethod
    def info(self, msg):
        self.log(logging.INFO, msg)

    @classmethod
    def logging_func(self):
        while True:
            item = self.logging_queue.get()
            if item == None:
                break
            elif item[0] == "print":
                print(item[1], end=item[2])
                sys.stdout.flush()
                pass
            elif item[0] == "log":
                logging.log(item[1], item[2])

def set_logging(logfile_name, verbose=False):

    # == General log ==
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)

    # File handler
    fh = logging.FileHandler(logfile_name)
    fh.setLevel(logging.INFO)

    # Stream handler
    sh = logging.StreamHandler()
    sh.setLevel(logging.INFO)
    sh.createLock()

    #Formatter
    formatter = logging.Formatter('%(asctime)st%(levelname)st%(message)s')
    fh.setFormatter(formatter)
    sh.setFormatter(formatter)

    # add the handlers to the logger
    logger.addHandler(fh)
    logger.addHandler(sh)
  

Комментарии:

1. В коде, которым вы делитесь здесь, много чего не так. Проблемы с отступами, нерелевантный код, вызовы функций / методов, которые не существуют. Я хотел бы помочь, но необходимость исправлять этот код, чтобы иметь возможность разобраться в проблеме, о которой ваш пост, очень раздражает. Пожалуйста, исправьте ваш MCVE.

2. Прошу прощения за первую версию. Я хотел очистить код в первой опубликованной версии, однако допустил некоторую ошибку. Эта новая версия протестирована и работает. Очень, очень сожалею об ошибке.

Ответ №1:

В вашем примере проблема с синхронизацией. service.start_process() возвращается почти сразу; то же самое происходит LogMaster.info("{} ended".format(SERVICE_NAME)) . LogMaster.stop_logging() вызывается до завершения настройки вашего дочернего процесса.
Разветвление дочерних процессов требует времени; вы удаляете свои logging_thread до того, как дочерний процесс начнет выполняться. Он все еще выполняется, но в этот момент в очереди больше ничего не прослушивается; сообщения теряются.

Вам придется придумать план, где / когда синхронизировать ваши процессы. Быстрое решение — просто добавить p.join() after p.start() в GraphGenerator.start_process . Однако не уверен, хотите ли вы, чтобы ваш основной процесс блокировал выполнение этого метода.
Другой вариант показан в примере кода ниже.

В операционных системах, совместимых с POSIX, нет необходимости передавать очередь в process_func . В этих операционных системах дочерние элементы создаются путем базового копирования родительского объекта, объект класса LogMaster в дочернем процессе является копией объекта в основном процессе, у него уже есть очередь.

Хотя вы могли бы немного реорганизовать свой LogMaster . Сам класс может быть подклассом threading.Thread , а инструкции в set_logging могут быть добавлены к LogMaster и выполнены как часть инициализации.
При этом создание экземпляра класса могло бы настроить ведение журнала и запустить прослушиватель для очереди, в то время как доступ к статическим / классовым методам в самом классе просто использовался бы для отправки сообщений.

 from __future__ import print_function
import logging
import logging.handlers
import multiprocessing
import sys   
from threading import Thread  

class LogMaster(Thread):

    QUEUE = None

    def __init__(self, file_name):
        Thread.__init__(self, name="LoggerThread")
        LogMaster.QUEUE = multiprocessing.Queue()
        self._file_name = file_name
        self._init_logging()
        self.start()

    def _init_logging(self):
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.INFO)
        fh = logging.FileHandler(self._file_name)
        fh.setLevel(logging.INFO)
        sh = logging.StreamHandler()
        sh.setLevel(logging.INFO)
        formatter = logging.Formatter('%(asctime)st%(levelname)st%(message)s')
        fh.setFormatter(formatter)
        sh.setFormatter(formatter)
        self.logger.addHandler(fh)
        self.logger.addHandler(sh)

    def _handle_message(self, message):
        if message[0] == "print":
            print(message[1], end=message[2])
            sys.stdout.flush()
        elif message[0] == "log":
            self.logger.log(message[1], message[2])

    def run(self):
        while True:
            message = LogMaster.QUEUE.get()
            if message is None:
                break
            else:
                self._handle_message(message)

    def shutdown(self):
        LogMaster.QUEUE.put(None)
        try:
            self.join(2)
        except RuntimeError:
            pass

    @classmethod
    def _log(cls, loglevel, msg):
        cls.QUEUE.put(("log", loglevel, msg))

    @classmethod
    def print_term(cls, msg, end='n'):
        cls.QUEUE.put(("print", msg, end))

    @staticmethod
    def info(msg):
        LogMaster._log(logging.INFO, msg)
  

graph.py

 import traceback
from multiprocessing import Process
from logger import LogMaster

class GraphGenerator:

    def start_process(self):
        LogMaster.print_term("1 - start_process: Print term is working!")
        LogMaster.info("1 - start_process: LogMaster.info is working too!")
        p = Process(target=self.process_func)
        LogMaster.info("starting '{}'".format(self.__class__))
        p.start()
        # maybe you want to start several processes quickly and rather block
        # your main process to wait for them there.
        return p

    def process_func(self):
        LogMaster.print_term("2 - process_func: Print term is working!")
        LogMaster.info("2 - process_func: LogMaster.info is working too!")
        try:
            raise ValueError("checking the exception handling")
        except ValueError:
            LogMaster.info(traceback.format_exc())
  

main.py

 from graph import GraphGenerator
from logger import LogMaster

if __name__ == '__main__':  
    SERVICE_NAME = 'MyGraph'
    lm = LogMaster(SERVICE_NAME)
    LogMaster.info("{} started".format(SERVICE_NAME))
    service = GraphGenerator()
    child = service.start_process()
    # start more child processes if needed
    # once all are started, wait for them to complete in the main process
    child.join()
    LogMaster.info("{} ended".format(SERVICE_NAME))
    lm.shutdown()
  

Вывод

 [shmee@massive test]$ python --version
Python 2.7.5
# ======================================
[shmee@massive test]$ python main.py
2019-03-07 12:28:23,425 INFO    MyGraph started
1 - start_process: Print term is working!
2019-03-07 12:28:23,425 INFO    1 - start_process: LogMaster.info is working too!
2019-03-07 12:28:23,425 INFO    starting 'graph.GraphGenerator'
2 - process_func: Print term is working!
2019-03-07 12:28:23,426 INFO    2 - process_func: LogMaster.info is working too!
2019-03-07 12:28:23,426 INFO    Traceback (most recent call last):
  File "/home/shmee/test/graph.py", line 21, in process_func
    raise ValueError("checking the exception handling")
ValueError: checking the exception handling

2019-03-07 12:28:23,428 INFO    MyGraph ended
# ======================================    
[shmee@massive test]$ cat MyGraph
2019-03-07 12:28:23,425 INFO    MyGraph started
2019-03-07 12:28:23,425 INFO    1 - start_process: LogMaster.info is working too!
2019-03-07 12:28:23,425 INFO    starting 'graph.GraphGenerator'
2019-03-07 12:28:23,426 INFO    2 - process_func: LogMaster.info is working too!
2019-03-07 12:28:23,426 INFO    Traceback (most recent call last):
  File "/home/shmee/test/graph.py", line 21, in process_func
    raise ValueError("checking the exception handling")
ValueError: checking the exception handling

2019-03-07 12:28:23,428 INFO    MyGraph ended
# ======================================    
# this works as well
[shmee@massive test]$ ../source/python-3.6.4.el7.x86_64/bin/python3 main.py
  

Комментарии:

1. Большое вам спасибо за подробный ответ, очень четкий и полный