Не удается использовать оператор BigQuery в Apache Airflow

# #google-bigquery #airflow

Вопрос:

Я создаю конвейер данных, в котором я получаю данные из BigQuery либо через оператора Bigquery, либо через облачную библиотеку Google. Но я всегда получаю ошибку. Ниже приведен dag для оператора больших запросов:

 from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator

from read_val_send1 import read,validating_hit,track_google_analytics_event,add_gcp_connection

default_args = {
    "owner" : "Airflow",
    "depends_on_past": False,
    "start_date" : datetime(2021,5,9),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(seconds = 5)
}

dag = DAG("Automp", default_args = default_args, schedule_interval = "@daily", catchup = False)

activateGCP = PythonOperator(
        task_id='add_gcp_connection_python',
        python_callable=add_gcp_connection,
        provide_context=True, dag = dag
    )
BQ_CONN_ID = "my_gcp_conn"
BQ_PROJECT = 'pii-test'
BQ_DATASET = 'some_Dataset'

t1 = BigQueryCheckOperator(
        task_id='bq_check',
        sql='''
        #standardSQL
        Select * from table''',
        use_legacy_sql=False,
        bigquery_conn_id=BQ_CONN_ID,
        dag=dag
    )
activateGCP >> t1
 

Ошибка

Я прикрепил изображение ошибки

Сломанный DAG: [/usr/local/airflow/dags/Automp.py] Нет модуля с именем «httplib2»

Я также не могу устанавливать пакеты python в airflow с required.txt файл. Ниже приведен файл создания:

 version: '2.1'
services:
    redis:
        image: 'redis:5.0.5'
        # command: redis-server --requirepass redispass

    postgres:
        image: postgres:9.6
        environment:
            - POSTGRES_USER=airflow
            - POSTGRES_PASSWORD=airflow
            - POSTGRES_DB=airflow
        # Uncomment these lines to persist data on the local filesystem.
        #     - PGDATA=/var/lib/postgresql/data/pgdata
        # volumes:
        #     - ./pgdata:/var/lib/postgresql/data/pgdata

    webserver:
        image: puckel/docker-airflow:1.10.9
        restart: always
        depends_on:
            - postgres
            - redis
        environment:
            - LOAD_EX=n
            - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
            - EXECUTOR=Celery
            # - POSTGRES_USER=airflow
            # - POSTGRES_PASSWORD=airflow
            # - POSTGRES_DB=airflow
            # - REDIS_PASSWORD=redispass
        volumes:
            - ./dags:/usr/local/airflow/dags
            # Uncomment to include custom plugins
            # - ./plugins:/usr/local/airflow/plugins
        ports:
            - "8080:8080"
        command: webserver
        healthcheck:
            test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
            interval: 30s
            timeout: 30s
            retries: 3

    flower:
        image: puckel/docker-airflow:1.10.9
        restart: always
        depends_on:
            - redis
        environment:
            - EXECUTOR=Celery
            # - REDIS_PASSWORD=redispass
        ports:
            - "5555:5555"
        command: flower

    scheduler:
        image: puckel/docker-airflow:1.10.9
        restart: always
        depends_on:
            - webserver
        volumes:
            - ./dags:/usr/local/airflow/dags
            - ./requirements.txt:/requirements.txt
            # Uncomment to include custom plugins
            # - ./plugins:/usr/local/airflow/plugins
        environment:
            - LOAD_EX=n
            - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
            - EXECUTOR=Celery
            # - POSTGRES_USER=airflow
            # - POSTGRES_PASSWORD=airflow
            # - POSTGRES_DB=airflow
            # - REDIS_PASSWORD=redispass
        command: scheduler

    worker:
        image: puckel/docker-airflow:1.10.9
        restart: always
        depends_on:
            - scheduler
        volumes:
            - ./dags:/usr/local/airflow/dags
            - ./requirements.txt:/requirements.txt
            # Uncomment to include custom plugins
            # - ./plugins:/usr/local/airflow/plugins
        environment:
            - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
            - EXECUTOR=Celery
            # - POSTGRES_USER=airflow
            # - POSTGRES_PASSWORD=airflow
            # - POSTGRES_DB=airflow
            # - REDIS_PASSWORD=redispass
        command: worker
 

Моя структура папок выглядит следующим образом:

Структура Папок

Комментарии:

1. Вы пробовали установить отсутствующий пакет? pypi.org/project/google-api-python-client должно быть достаточно

Ответ №1:

Изображение, которое вы используете, не включает httplib2 пакет, который, возможно, используется при импорте, поступающем из read_val_send1 каталога.

Что вы можете сделать, так это добавить следующую строку на свой ./requirements.txt .

 httplib2==0.19.1
 

Установка докера-воздушного потока пакеля имеет entrypoint.sh это поддерживает pip install -r requirements.txt . Так что этого должно быть достаточно.

В случае, если что-то пойдет не так, вы всегда можете использовать журналы Docker или Docker interactive execute bash, чтобы узнать, что происходит не так.

Я также рекомендую использовать последнюю версию docker-compose для воздушного потока, чтобы обеспечить более плавный рабочий процесс.