#sql #postgresql #airflow
Вопрос:
Я новичок в airflow
этом деле . Я настроил все в соответствии с официальной документацией. Я использую pet example DAG
, тем не менее, когда я заглядываю в DAG
журнал, он показывает мне следующие ошибки:
Первая ошибка, исходящая от populate_pet_table
psycopg2.errors.InvalidTextRepresentation: invalid input syntax for type integer: "Maxy"
LINE 2: INSERT INTO pet VALUES ('Maxy', 'Dog', '2018-07-...
Вторая ошибка, исходящая от get_birth_date
BETWEEN SYMMETRIC 2020-01-01 AND 2020-12-31;
HINT: No operator matches the given name and argument types. You might need to add explicit type casts.
Что там не так? Это действительно официальный пример, так что для меня он должен работать нормально. Это код dag:
import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by
# instantiating the Postgres Operator
with DAG(
dag_id="postgres_operator_dag",
start_date=datetime.datetime(2020, 2, 2),
schedule_interval="@once",
catchup=False,
) as dag:
create_pet_table = PostgresOperator(
task_id="create_pet_table",
postgres_conn_id="postgres_default",
sql="""
CREATE TABLE IF NOT EXISTS pet (
pet_id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
pet_type VARCHAR NOT NULL,
birth_date DATE NOT NULL,
OWNER VARCHAR NOT NULL);
""",
)
populate_pet_table = PostgresOperator(
task_id="populate_pet_table",
postgres_conn_id="postgres_default",
sql="""
INSERT INTO pet VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
INSERT INTO pet VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
""",
)
get_all_pets = PostgresOperator(
task_id="get_all_pets", postgres_conn_id="postgres_default", sql="SELECT * FROM pet;"
)
get_birth_date = PostgresOperator(
task_id="get_birth_date",
postgres_conn_id="postgres_default",
sql="""
SELECT * FROM pet
WHERE birth_date
BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
""",
params={'begin_date': '2020-01-01', 'end_date': '2020-12-31'},
)
create_pet_table >> populate_pet_table >> get_all_pets >> get_birth_date
Ответ №1:
Похоже, что в самом SQL произошла ошибка.
Изменить:
INSERT INTO pet VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
INSERT INTO pet VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
Для:
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
INSERT INTO pet (name, pet_type, birth_date, OWNER) VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
Для второго вопроса вам нужно изменить
BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
Для:
BETWEEN SYMMETRIC DATE '{{ params.begin_date }}' AND DATE '{{ params.end_date }}';
и это будет работать нормально.
Пример из sqlfiddle
Я поднял вопрос о пиаре, чтобы решить эту проблему.
Комментарии:
1. похоже, это исправило первую ошибку, а как насчет второй?
2. Да, пожалуйста, сообщите origin о проблеме здесь, так как теперь все работает. P. S Можете ли вы сказать мне, если я не хочу запускать одну из этих задач, как я могу это сделать? Я попытался удалить (get_birth_date), так что теперь у меня есть: create_pet_table >>> populate_pet_table > > > > > > get_all_pets, но журнал dag показывает, что он все еще работает :O