Задание потока данных застряло при чтении из паба / Суб

#python-3.x #google-cloud-dataflow #apache-beam #google-cloud-pubsub

# #python-3.x #google-cloud-поток данных #apache-beam #google-cloud-pubsub

Вопрос:

Наша версия SDK Apache Beam Python 3.7 SDK 2.25.0

Существует конвейер, который считывает данные из Pub / Sub, преобразует их и сохраняет результаты в GCS. Обычно это работает нормально в течение 1-2 недель. После этого он застревает.

 "Operation ongoing in step s01 for at least 05m00s without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
  at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.maybeWait(RemoteGrpcPortWriteOperation.java:175)
  at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:196)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
  at org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
  at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400)
  at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156)
  at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
 

Шаг 01 — это просто "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=subscription)

После этого поток данных увеличивает количество рабочих и прекращает обработку любых новых данных. Задание все еще находится в RUNNNING состоянии.

Нам просто нужно перезапустить задание, чтобы решить его. Но это происходит каждые ~ 2 недели.

Как мы можем это исправить?

Комментарии:

1. Трудно понять. Это похоже на тупик. Поддержка может помочь..

2. Я думаю, нам нужно намного больше информации для отладки этого. Можете ли вы подать заявку в службу поддержки? А если нет, можете ли вы предоставить дополнительную информацию о своем конвейере?

3. @Artyom Tokachev, вы можете сообщить об этой ошибке в системе отслеживания проблем, поделившись подробностями конвейера.

4. @Artyom Tokachev вам удалось решить вашу проблему? Есть какие-либо предложения для людей с подобной ситуацией?

Ответ №1:

Это похоже на проблему с устаревшим «Java Runner Harness». Я бы посоветовал запустить ваш конвейер с помощью Dataflow Runner v2, чтобы избежать подобных проблем. Вы также можете подождать, пока оно не станет значением по умолчанию (в настоящее время оно выполняется).