Ошибка отображения объекта ‘NoneType’ не может быть повторена при запуске Apache Airflow с использованием базы данных pubsubpullsensor

#python #airflow

#python #воздушный поток

Вопрос:

Я получил ошибку ‘Объект NoneType’ не повторяется при запуске базы данных Apache airflow. Я использую PubSubPullSensor в этой базе данных. Мой код базы данных ниже :

 from airflow import DAG
import datetime
from datetime import timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor

DEFAULT_DAG_ARGS = {
    'owner': 'linkaja_dataeng',
    'depends_on_past': False,
    'start_date': datetime.datetime(2020, 9, 10, 0, 0),
    'email': ['msaipulr@xxx.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 0,
    'retry_delay': timedelta(minutes=2)
}

dag = DAG(
    'pubsub_pullsensor',
    schedule_interval='@daily',
    default_args=DEFAULT_DAG_ARGS)

subscription = 'DataInfra-subscription1'
gcp_conn_id = 'airflow-jobs-connection'
project_id = 'project-data-platform-de'


#define task

task_start = BashOperator(
    task_id="start_task",
    bash_command='echo start',
    dag=dag
)

pull_message = PubSubPullSensor(
    task_id="pull_message_pubsub",
    ack_messages=True,
    project_id=project_id,
    gcp_conn_id=gcp_conn_id,
    subscription=subscription,
    dag=dag
)

task_end = BashOperator(
    task_id="end_task",
    bash_command='echo end',
    dag=dag
)

task_start >> pull_message >> task_end
  

Я уже создаю тему и подписку pubsub в pubsub google cloud.
Я также публикую сообщения от 5 до 10 в теме pubsub, затем запускаю этот код базы данных, но отображаю ту же ошибку.

Ниже приведены сообщения об ошибках журналов :

 *** Reading local file: /usr/local/airflow/logs/pubsub_pullsensor/pull_message_pubsub/2020-09-28T06:04:52.185745 00:00/1.log
Production 2020-09-28 06:05:17,127 {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: pubsub_pullsensor.pull_message_pubsub 2020-09-28T06:04:52.185745 00:00 [queued]>
Production 2020-09-28 06:05:17,221 {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: pubsub_pullsensor.pull_message_pubsub 2020-09-28T06:04:52.185745 00:00 [queued]>
Production 2020-09-28 06:05:17,221 {taskinstance.py:866} INFO - 
--------------------------------------------------------------------------------
Production 2020-09-28 06:05:17,222 {taskinstance.py:867} INFO - Starting attempt 1 of 1
Production 2020-09-28 06:05:17,222 {taskinstance.py:868} INFO - 
--------------------------------------------------------------------------------
Production 2020-09-28 06:05:17,260 {taskinstance.py:887} INFO - Executing <Task(PubSubPullSensor): pull_message_pubsub> on 2020-09-28T06:04:52.185745 00:00
Production 2020-09-28 06:05:17,275 {standard_task_runner.py:53} INFO - Started process 14240 to run task
Production 2020-09-28 06:05:17,494 {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: pubsub_pullsensor.pull_message_pubsub 2020-09-28T06:04:52.185745 00:00 [running]> 243145dffd6e
Production 2020-09-28 06:05:17,586 {logging_mixin.py:112} INFO - [2020-09-28 06:05:17,586] {{pubsub.py:488}} INFO - Pulling max 5 messages from subscription (path) projects/project-data-platform-de/subscriptions/DataInfra-subscription1
Production 2020-09-28 06:05:17,587 {taskinstance.py:1128} ERROR - 'NoneType' object is not iterable
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/sensors/pubsub.py", line 143, in execute
    super().execute(context)
  File "/usr/local/lib/python3.7/site-packages/airflow/sensors/base_sensor_operator.py", line 107, in execute
    while not self.poke(context):
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/sensors/pubsub.py", line 156, in poke
    return_immediately=self.return_immediately,
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/common/hooks/base_google.py", line 356, in inner_wrapper
    return func(self, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/hooks/pubsub.py", line 497, in pull
    metadata=metadata,
  File "/usr/local/lib/python3.7/site-packages/google/cloud/pubsub_v1/_gapic.py", line 40, in <lambda>
    fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw)  # noqa
  File "/usr/local/lib/python3.7/site-packages/google/pubsub_v1/services/subscriber/client.py", line 1005, in pull
    metadata = tuple(metadata)   (
TypeError: 'NoneType' object is not iterable
Production 2020-09-28 06:05:17,591 {taskinstance.py:1185} INFO - Marking task as FAILED.dag_id=pubsub_pullsensor, task_id=pull_message_pubsub, execution_date=20200928T060452, start_date=20200928T060517, end_date=20200928T060517
Production 2020-09-28 06:05:17,993 {logging_mixin.py:112} INFO - [2020-09-28 06:05:17,992] {{email.py:131}} INFO - Sent an alert email to ['msaipul@xxx.com']
Production 2020-09-28 06:05:26,966 {logging_mixin.py:112} INFO - [2020-09-28 06:05:26,966] {{local_task_job.py:103}} INFO - Task exited with return code 1
  

темы скриншотов и подписки в pubsub GCP
темы скриншотов и подписки

Пожалуйста, помогите мне решить эту проблему. Спасибо

Комментарии:

1. Можете ли вы опубликовать полное сообщение об исключении? Можете ли вы также подтвердить правильность названия темы и подписки?

2. Не могли бы вы проверить, какая строка выдает эту ошибку, а также поделиться полным сообщением об ошибке

3. Я уже добавляю файл журналов и тему скриншота и подписку. Спасибо