Popen: перенаправить stderr и стандартный вывод в один поток

#python #stream #spark-submit

#python #поток #запустить-отправить

Вопрос:

Я создал оболочку вокруг команды Spark-Submit, чтобы иметь возможность генерировать события в реальном времени путем анализа журналов. Цель состоит в том, чтобы создать интерфейс реального времени, показывающий подробный ход выполнения задания Spark.

Таким образом, оболочка будет выглядеть следующим образом:

   submitter = SparkSubmitter()
  submitter.submit('/path/to/spark-code.py')
  for log_event in submitter:
    if log_event:
      print('Event:', log_event)
  

И результат будет выглядеть следующим образом:

   Event: StartSparkContextEvent()
  Event: StartWorkEvent()
  Event: FinishWorkEvent()
  Event: StopSparkContextEvent()
  

Внутренне класс SparkSubmitter запускает команду spark-submit в качестве подпроцесса.Запускает процесс, а затем выполняет итерации по потоку stdout и возвращает события путем анализа журналов, сгенерированных процессом, вот так:

   class SparkSubmitter():
    def submit(self, path):
        command = self.build_spark_submit_command(path)
      self.process = Popen(command, stdout=PIPE, stderr=PIPE)

    def __iter__(self):
        return self

    def __next__(self):
        # note: this is a IO-Blocking command
        log = self.process.stdout.readline().decode('utf-8') 
      return self.parse_log_and_return_event(log)

  

Эта реализация хорошо работает с автономным кластером Spark. Но у меня возникла проблема при запуске в кластере Yarn.

В кластере Yarn «Журналы, связанные с Spark», поступают в stderr , вместо stdout . Итак, мой класс не может анализировать журналы, сгенерированные spark, потому что он только пытается прочитать stdout .

Вопрос 1: Возможно ли прочитать стандартный вывод Popen и stderr как единый поток?

Вопрос 2: Поскольку stdout и stderr оба являются потоками, возможно ли объединить оба потока и прочитать их как один?

Вопрос 3: Возможно ли перенаправить все журналы только в стандартный вывод?

Ответ №1:

Ответ на все 3 ваших вопроса — да, вы можете использовать stderr=subprocess.STDOUT в качестве аргумента для Popen перенаправления вывода с stderr на stdout :

 self.process = Popen(command, stdout=PIPE, stderr=subprocess.STDOUT)
  

Комментарии:

1. Что я все еще хочу знать, из какого канала пришла каждая отдельная строка: stderr или stdout?