#python #python-3.x #airflow
Вопрос:
Я новичок airflow
и пытаюсь создать dag для обработки текста. У меня есть линия передачи данных, состоящая из задач обработки текста — чтение документа, очистка текста и загрузка данных в файл JSON. Для обработки текста для каждой задачи преобразования используются пользовательские операторы, которые хранятся в text_processing_plugin
папке. Полная структура папок plugin
папки является:-
├── airflow.cfg
├── airflow.db
├── airflow-webserver.pid
├── dags
│ ├── d0.py
├── plugins
│ └── text_processing_plugin
│ ├── __init__.py
│ ├── operators
│ │ ├── dependency_parsing.py
│ │ ├── entity_detection.py
│ │ ├── __init__.py
│ │ ├── lemmatize.py
│ │ ├── pos_tagging.py
│ │ ├── remove_stop_words.py
│ │ └── tokenize_doc.pyfolder structure of plugin folder is:-
├── requirements.txt
├── unittests.cfg
где text_processing_plugin/__init__.py
есть следующий код:-
from airflow.plugins_manager import AirflowPlugin
from text_processing_plugin.operators.dependency_parsing import DependencyParsingOperator
from text_processing_plugin.operators.entity_detection import DetectEntityOperator
from text_processing_plugin.operators.lemmatize import LemmatizeOperator
from text_processing_plugin.operators.pos_tagging import POSTagOperator
from text_processing_plugin.operators.remove_stop_words import RemoveStopWordsOperator
from text_processing_plugin.operators.tokenize_doc import DocTokenizerOperator
class TextProcessingPlugin(AirflowPlugin):
name = "text_processing_plugin"
operators = [DependencyParsingOperator, DetectEntityOperator, LemmatizeOperator, POSTagOperator,
RemoveStopWordsOperator, DocTokenizerOperator]
sensors = []
hooks = []
executors = []
macros = []
admin_views = []
flask_blueprints = []
menu_links = []
appbuilder_views = []
appbuilder_menu_items = []
global_operator_extra_links = []
operator_extra_links = []
Для создания DAG, airflow 1.x
как в парадигме, используется, как показано ниже:-
import os
import json
import spacy
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from airflow.operators.text_processing_plugin import DependencyParsingOperator, DetectEntityOperator, LemmatizeOperator, POSTagOperator, RemoveStopWordsOperator, DocTokenizerOperator
sp = spacy.load('en_core_web_sm')
default_args = {
'owner': 'episource',
'depends_on_past': True,
'start_date': datetime.datetime(2021, 3, 30),
'retries': 0,
'schedule_interval':'@once',
}
dag = DAG(
'text_processing_dag',
description='Text Processing Dag',
default_args=default_args,
catchup=False,
tags=['text_processing'])
def read_doc(**kwargs):
file_path = os.path.join(os.getcwd(), '/data/1.txt')
doc = open(file_path).read()
return doc
def write_to_json(**kwargs):
ti = kwargs['ti']
with open(os.path.join(os.getcwd, 'output', '1.json'), 'a ') as file:
result_1 = ti.xcom_pull(task_ids = 'tokenize_doc')
result_2 = ti.xcom_pull(task_ids = 'detect_entity')
print('result 1 is ', result_1)
print('result 2 is ', result_2)
file.write(json.dumps(result_1))
file.write(json.dumps(result_2))
extract = PythonOperator(
task_id = 'extract',
python_callable = read_doc,
dag = dag)
t11_tokenize_doc = DocTokenizerOperator(
sp = sp,
task_id = "transform_tokenize_doc",
dag = dag,
name = "Sentence Tokenizing",
pool='t1',
task_concurrency=2)
t12_detect_entities = DetectEntityOperator(
sp = sp,
task_id = "transform_detect_entity",
dag = dag,
name = "Entity Detection",
pool='t1',
task_concurrency=2)
load = PythonOperator(
task_id = 'load',
python_callable = write_to_json,
dag = dag)
extract >> [t11_tokenize_doc, t12_detect_entities] >> load
Когда я пытаюсь запустить код, я получаю :-
Traceback (most recent call last):
File "dags/d0.py", line 8, in <module>
from airflow.operators.text_processing_plugin import DependencyParsingOperator, DetectEntityOperator, LemmatizeOperator, POSTagOperator, RemoveStopWordsOperator, DocTokenizerOperator
ModuleNotFoundError: No module named 'airflow.operators.text_processing_plugin'
Я сослался на некоторые существующие ответы на Stackoverflow, но не смог обойти ошибку. Был бы признателен за какой-нибудь намек на это.
Ответ №1:
Изменено в версии 2.0: Импорт операторов, датчиков, крючков, добавленных в плагины через airflow.{операторы,датчики,крючки}.<имя подключаемого модуля><имя подключаемого модуля> больше не поддерживается, и эти расширения следует импортировать как обычные модули python. Дополнительные сведения см. в разделе: Управление модулями и создание пользовательского оператора