Python — доступ к sys.stdin из класса не работает

#python #multiprocessing #stdin

#python #многопроцессорная обработка #stdin

Вопрос:

Итак, я пытаюсь прочитать конвейерный ввод из sys.stdin внутри класса. Проблема в том, что я не получаю никаких входных данных от stdin, находясь внутри класса, но я могу получить все данные извне класса в main().

Есть ли способ получить доступ к sys.stdin из многопроцессорного класса?

Вот мой код:

 class BufferReader(Process):

    def __init__(self, queue, lock):
        super(BufferReader, self).__init__()
        self.queue = queue
        # number of lines to store in buffer before sending to processes
        self.buffer_size = 200000
        self.lines_buffer = []
        self.lock = lock

    def run(self):
        count = 0
        try:
            # for each line in the stdin
            for line in sys.stdin:
                # strip the line from any whitespace
                stripped = line.strip()

                # if end of line, break
                if not stripped:
                    break

                # add the line to the buffer
                self.lines_buffer.append(stripped)

                # if the buffer is full, process the data, and empty the buffer
                if count == self.buffer_size:
                    self.lock.acquire()
                    self.queue.put(self.lines_buffer)
                    self.lock.release()
                    del self.lines_buffer[:]
                    count = 0

                # increase the line counter
                count  = 1
        except KeyboardInterrupt:
            sys.stdout.flush()
            pass

def parse(index, data_queue, lock):

    while not data_queue.empty():
        lock.acquire()
        if data_queue.empty():
            lock.release()
            sys.exit(0)
        result = data_queue.get()
        lock.release()

        with codecs.open("proc-%d" % index, 'w', 'utf-8') as fp:
            for line in result:
                fp.write(line)
            fp.close()
        sys.exit(0)

def main():
    data_queue = Queue()
    lock = Lock()
    br = BufferReader(data_queue, lock)
    br.start()

    # spawn the processes
    procs = [Process(target=parse, args=(i, data_queue, lock))
             for i in range(5)]

    for p in procs:
        p.start()

    br.join()

    for p in procs:
        p.join()

if __name__ == '__main__':
    main()
 

Комментарии:

1. Почему вы запускаете 5 процессов, а затем сразу же блокируете 4 из них?

2. br.start() находится прямо под br = BufferReader(data_queue, lock). Также я блокирую их, потому что мне нужно читать из очереди один за другим. Если я не заблокирую, это может вызвать проблемы из-за многопроцессорности.

3. Что вы получаете, делая это параллельно?

4. Поток BufferReader будет считывать тонну данных и будет буферизовать stdin и помещать данные в очередь для обработки процессами, в то время как мастер (тот, который породил все) будет собирать результаты от каждого процесса, а затем сохранять их в файл, в то время как другие процессы выполняютих вещь.

5. Я не уверен, в чем sys.stdin проблема, связанная с вашим, но я не вижу смысла использовать более одного дополнительного процесса, если они просто будут ждать, пока не получат доступ к блокировке. Похоже, что только 1 когда-либо будет делать что-то полезное.

Ответ №1:

При multiprocessing этом вы создаете рабочих в отдельных процессах с их собственными идентификаторами процессов и тому подобное, включая их собственные устройства ввода и вывода. Это означает, что sys.stdin/stdout экземпляр, который вы получаете внутри созданного процесса, не будет таким же, как экземпляр главного процесса, хотя вы все равно можете их читать и записывать.

Существует по крайней мере два варианта того, как это сделать:

  • Передайте sys.stdin/stdout.fileno() файловый дескриптор главного процесса порожденным процессам. Вы должны иметь возможность открывать его внутри созданных процессов с помощью os.fdopen(fileno) .
  • Используйте threading вместо этого, поскольку потоки одного и того же процесса совместно используют устройства ввода и вывода.

Кроме того, как указано в комментариях ниже, одновременное чтение одного входного потока из нескольких процессов может быть сложным, если вы точно не знаете, что делаете. Было бы разумно назначить только один процесс для чтения входных данных и отправки данных другим работникам. Или внедрите какую-то циклическую систему, которая гарантирует, что только один из процессов одновременно будет получать входные данные. multiprocessing.Pool Для этого может пригодиться использование пула процессов.

И я бы рекомендовал использовать fileinput module, чтобы упростить обработку стандартного ввода.

Комментарии:

1. Передача sys.stdin в дочерний процесс не будет работать; дескриптор файла будет закрыт в дочернем процессе.

2. @dano Если вы передадите sys.stdin.fileno() файловый дескриптор главного процесса, вы сможете открыть его, используя os.fdopen(fileno) в порожденном процессе.

3. Да, но на самом деле это не то, что вы сказали в ответе. Вы должны отредактировать это.

4. Также стоит отметить, что этот метод работает, только если вы не читаете из stdin в родительском процессе до передачи файлового дескриптора дочернему процессу (по крайней мере, в моем тестировании).

5. @dano согласился, добавил и улучшил.