Конвейер потока данных застрял при чтении из Pub / Sub

#python #google-cloud-dataflow #google-cloud-pubsub #google-cloud-console

#python #google-облако-поток данных #google-облако-pubsub #google-cloud-console

Вопрос:

После одного дня работы в полном порядке, потоковой передачи данных из Pub / Sub, выравнивания данных и записи строк в BigQuery; конвейер потока данных начал сообщать об ошибках, подобных этому:

 
Processing stuck in step s01 for at least 05m00s without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
  at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.maybeWait(RemoteGrpcPortWriteOperation.java:170)
  at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:191)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
  at org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
  at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1269)
  at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
  at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
  

Эти ошибки увеличивают время и достигают 25m00s с той же трассировкой ошибок.

Через Stackdriver мне не повезло, потому что эти ошибки не отображаются.

Вот мой конвейер:

 from __future__ import absolute_import

import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window


class parse_pubsub(beam.DoFn):
    def process(self, element):
        # Flatten data ...
        for row in final_rows:
            yield row


def run(argv=None):
    """Build and run the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input_topic', required=True,
        help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
    parser.add_argument(
        '--output_table', required=True,
        help=('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
       'or DATASET.TABLE.'))
    known_args, pipeline_args = parser.parse_known_args(argv)

    # table_schema = '-------'

    with beam.Pipeline(argv=pipeline_args) as p:
        lines = ( p | 'Read from PubSub' >> beam.io.ReadFromPubSub(known_args.input_topic)
                    | 'Parse data' >> beam.ParDo(parse_pubsub())
                    | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                        known_args.output_table,
                        schema=table_schema,
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                    )
                )


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
  

Может ли это быть рабочей проблемой? Должен ли я начать работу с большим количеством работников? Есть ли что-то, что можно предотвратить в коде?

Ответ №1:

К сожалению, задания потоковой передачи данных Python все еще находятся в бета-версии. Одним из ограничений бета-версии является то, что несколько соединителей ввода-вывода работают на серверной части потока данных, и журналы недоступны для пользователей.

Существует по крайней мере одна проблема, для которой я видел похожие трассировки стека, BEAM-5791, которая была исправлена в 2.9.0. Если вы еще этого не сделали, попробуйте обновить Beam до последней версии.

Другой распространенной причиной являются проблемы с разрешениями. Убедитесь, что учетная запись службы потока данных по-прежнему имеет доступ к вашей теме pubsub.

Если после этого у вас все еще возникают проблемы, вам необходимо подать заявку в службу поддержки Google Cloud. Они могут просмотреть внутренние журналы для вашей работы и помочь вам найти причину проблемы.

Комментарии:

1. Спасибо за отзыв! Задание, которое вызвало эти ошибки, выполнялось на 2.11.0 SDK. Мы изменили разрешения, и проблема все еще была там. С тех пор я начал новую работу без каких-либо ошибок на данный момент. Если ошибки появятся снова, я обязательно открою заявку в службе поддержки Google Cloud. Еще раз спасибо за ваш отзыв.

2. Выглядит как issues.apache.org/jira/browse/BEAM-6451 для меня.