Как определить таксы поддага в Airflow из другого файла dag.py?

Я хочу создать родительский DAG с несколькими дочерними DAG, которые вызываются через SubDagOperator.

  • Я могу найти только примеры динамического создания субдагов в задаче SubDagOperator.
  • Однако в этом случае мне нужны автономные дочерние группы DAG, которые уже определены в файле DAG.py, и сшить их вместе в родительском теге.

Если я установил задачу SubDAGOperator только с именем Dag дочернего тега:

task_1 = SubDagOperator(
    task_id="task_1",
    subdag=child_dag_name,
    dag=parent_dag
)

Я получаю следующую ошибку:

NameError: name 'child_dag_name' is not defined

Отвечает ли это на ваш вопрос? Связывание групп DAG верхнего уровня вместе   —  person Tim M. Schendzielorz    schedule 10.05.2020

Я уже нашел этот вопрос. Согласно вашему полному ответу здесь нет быстрого и простого решения, поэтому я мог бы также попробовать вариант TriggerDagRunOperator. В любом случае это кажется более масштабируемым. Я хочу активировать свой первый даг один раз, а затем второй даг нужно активировать один раз, если все задачи предыдущего будут успешными ровно через 24 часа. Возможно ли это с помощью TriggerDagRunOperator?   —  person Tim M. Schendzielorz    schedule 11.05.2020

См. также:  Перенаправление портов отладки для удаленного использования записной книжки Jupyter
Понравилась статья? Поделиться с друзьями:
IT Шеф
Комментарии: 1
  1. Tim M. Schendzielorz

    Этот ответ в равной степени зависит от знания Python и от ноу-хау в Airflow

    Напомним, что

    • python: importing модуля означает, что все элементы верхнего уровня (нулевой отступ) выполняются немедленно (во время процесса импорта)
    • airflow: scheduler / webserver выбираются только те DAG объекты, которые находятся на верхнем уровне (нулевой отступ) файла определения dag

    Имея в виду 2 вышеупомянутых момента, вот что вы можете сделать

    • создайте вспомогательную / служебную функцию в вашем child_dag.py файле, чтобы вызвать и вернуть объект DAG для child-dag
    • используйте эту вспомогательную функцию для создания экземпляра дочернего DAG верхнего уровня, а также для создания SubDagOperator задачи

    dag_object_builder.py

    from typing import Dict, Any
    
    from airflow.models import DAG
    
    
    def create_dag_object(dag_id: str, dag_params: Dict[str, Any]) -> DAG:
        dag: DAG = DAG(dag_id=dag_id, **dag_params)
        return dag
    

    child_dag.py

    from datetime import datetime
    from typing import Dict, Any
    
    from airflow.models import DAG
    
    from src.main.subdag_example import dag_object_builder
    
    default_args: Dict[str, Any] = {
        "owner": "my_owner",
        "email": ["[email protected]_domain.com"],
        "weight_rule": "downstream",
        "retries": 1
    }
    
    ...
    
    
    def create_child_dag_object(dag_id: str) -> DAG:
        my_dag: DAG = dag_object_builder.create_dag_object(
            dag_id=dag_id,
            dag_params={
                "start_date": datetime(year=2019, month=7, day=10, hour=21, minute=30),
                "schedule_interval": None,
                "max_active_runs": 1,
                "default_view": "graph",
                "catchup": False,
                "default_args": default_args
            }
        )
        return my_dag
    
    
    my_child_dag: DAG = create_child_dag_object(dag_id="my_child_dag")
    

    parent_dag.py

    from datetime import datetime
    from typing import Dict, Any
    
    from airflow.models import DAG
    from airflow.operators.subdag_operator import SubDagOperator
    
    from src.main.subdag_example import child_dag
    from src.main.subdag_example import dag_object_builder
    
    default_args: Dict[str, Any] = {
        "owner": "my_owner",
        "email": ["[email protected]_domain.com"],
        "weight_rule": "downstream",
        "retries": 1
    }
    
    my_parent_dag: DAG = dag_object_builder.create_dag_object(
        dag_id="my_parent_dag",
        dag_params={
            "start_date": datetime(year=2019, month=7, day=10, hour=21, minute=30),
            "schedule_interval": None,
            "max_active_runs": 1,
            "default_view": "graph",
            "catchup": False,
            "default_args": default_args
        }
    )
    
    ...
    
    my_subdag_task: SubDagOperator = SubDagOperator(
        task_id="my_subdag_task",
        dag=my_parent_dag,
        subdag=child_dag.create_child_dag_object(dag_id="my_subdag")
    )
    

Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: