#python #airflow
#python #воздушный поток
Вопрос:
Я получил ошибку ‘Объект NoneType’ не повторяется при запуске базы данных Apache airflow. Я использую PubSubPullSensor в этой базе данных. Мой код базы данных ниже :
from airflow import DAG
import datetime
from datetime import timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.providers.google.cloud.sensors.pubsub import PubSubPullSensor
DEFAULT_DAG_ARGS = {
'owner': 'linkaja_dataeng',
'depends_on_past': False,
'start_date': datetime.datetime(2020, 9, 10, 0, 0),
'email': ['msaipulr@xxx.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 0,
'retry_delay': timedelta(minutes=2)
}
dag = DAG(
'pubsub_pullsensor',
schedule_interval='@daily',
default_args=DEFAULT_DAG_ARGS)
subscription = 'DataInfra-subscription1'
gcp_conn_id = 'airflow-jobs-connection'
project_id = 'project-data-platform-de'
#define task
task_start = BashOperator(
task_id="start_task",
bash_command='echo start',
dag=dag
)
pull_message = PubSubPullSensor(
task_id="pull_message_pubsub",
ack_messages=True,
project_id=project_id,
gcp_conn_id=gcp_conn_id,
subscription=subscription,
dag=dag
)
task_end = BashOperator(
task_id="end_task",
bash_command='echo end',
dag=dag
)
task_start >> pull_message >> task_end
Я уже создаю тему и подписку pubsub в pubsub google cloud.
Я также публикую сообщения от 5 до 10 в теме pubsub, затем запускаю этот код базы данных, но отображаю ту же ошибку.
Ниже приведены сообщения об ошибках журналов :
*** Reading local file: /usr/local/airflow/logs/pubsub_pullsensor/pull_message_pubsub/2020-09-28T06:04:52.185745 00:00/1.log
Production 2020-09-28 06:05:17,127 {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: pubsub_pullsensor.pull_message_pubsub 2020-09-28T06:04:52.185745 00:00 [queued]>
Production 2020-09-28 06:05:17,221 {taskinstance.py:655} INFO - Dependencies all met for <TaskInstance: pubsub_pullsensor.pull_message_pubsub 2020-09-28T06:04:52.185745 00:00 [queued]>
Production 2020-09-28 06:05:17,221 {taskinstance.py:866} INFO -
--------------------------------------------------------------------------------
Production 2020-09-28 06:05:17,222 {taskinstance.py:867} INFO - Starting attempt 1 of 1
Production 2020-09-28 06:05:17,222 {taskinstance.py:868} INFO -
--------------------------------------------------------------------------------
Production 2020-09-28 06:05:17,260 {taskinstance.py:887} INFO - Executing <Task(PubSubPullSensor): pull_message_pubsub> on 2020-09-28T06:04:52.185745 00:00
Production 2020-09-28 06:05:17,275 {standard_task_runner.py:53} INFO - Started process 14240 to run task
Production 2020-09-28 06:05:17,494 {logging_mixin.py:112} INFO - Running %s on host %s <TaskInstance: pubsub_pullsensor.pull_message_pubsub 2020-09-28T06:04:52.185745 00:00 [running]> 243145dffd6e
Production 2020-09-28 06:05:17,586 {logging_mixin.py:112} INFO - [2020-09-28 06:05:17,586] {{pubsub.py:488}} INFO - Pulling max 5 messages from subscription (path) projects/project-data-platform-de/subscriptions/DataInfra-subscription1
Production 2020-09-28 06:05:17,587 {taskinstance.py:1128} ERROR - 'NoneType' object is not iterable
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/sensors/pubsub.py", line 143, in execute
super().execute(context)
File "/usr/local/lib/python3.7/site-packages/airflow/sensors/base_sensor_operator.py", line 107, in execute
while not self.poke(context):
File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/sensors/pubsub.py", line 156, in poke
return_immediately=self.return_immediately,
File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/common/hooks/base_google.py", line 356, in inner_wrapper
return func(self, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/providers/google/cloud/hooks/pubsub.py", line 497, in pull
metadata=metadata,
File "/usr/local/lib/python3.7/site-packages/google/cloud/pubsub_v1/_gapic.py", line 40, in <lambda>
fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw) # noqa
File "/usr/local/lib/python3.7/site-packages/google/pubsub_v1/services/subscriber/client.py", line 1005, in pull
metadata = tuple(metadata) (
TypeError: 'NoneType' object is not iterable
Production 2020-09-28 06:05:17,591 {taskinstance.py:1185} INFO - Marking task as FAILED.dag_id=pubsub_pullsensor, task_id=pull_message_pubsub, execution_date=20200928T060452, start_date=20200928T060517, end_date=20200928T060517
Production 2020-09-28 06:05:17,993 {logging_mixin.py:112} INFO - [2020-09-28 06:05:17,992] {{email.py:131}} INFO - Sent an alert email to ['msaipul@xxx.com']
Production 2020-09-28 06:05:26,966 {logging_mixin.py:112} INFO - [2020-09-28 06:05:26,966] {{local_task_job.py:103}} INFO - Task exited with return code 1
темы скриншотов и подписки в pubsub GCP
темы скриншотов и подписки
Пожалуйста, помогите мне решить эту проблему. Спасибо
Комментарии:
1. Можете ли вы опубликовать полное сообщение об исключении? Можете ли вы также подтвердить правильность названия темы и подписки?
2. Не могли бы вы проверить, какая строка выдает эту ошибку, а также поделиться полным сообщением об ошибке
3. Я уже добавляю файл журналов и тему скриншота и подписку. Спасибо