Как запланировать запуск двух групп DAG в двух разных schedule_interval, но второй запускается только после завершения первого

У меня есть два разных DAG, которые должны работать на разных частотах. Один, т.е. dag1, должен работать еженедельно, а dag2 — ежедневно. Теперь dag2 должен запускаться только после завершения dag1, при каждом запуске dag1.

Я определил две группы DAG в двух разных модулях Python.

dag1.py

PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

with DAG('dag1',
     default_args={
         'owner': 'airflow',
         'start_date': dt.datetime(2019, 8, 19, 9, 30, 00),
         'concurrency': 1,
         'retries': 0
     }
     schedule_interval='00 10 * * 1',
     catchup=True
    ) as dag:

CRAWL_PARAMS = BashOperator(
    task_id='crawl_params',
    bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)

dag2.py

PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

with DAG('dag2',
     default_args = {
         'owner': 'airflow',
         'start_date': dt.datetime(2019, 8, 25, 9, 30, 00),
         'concurrency': 1,
         'retries': 0
     }
     schedule_interval='5 10 * * *',
     catchup=True
    ) as dag:

CRAWL_DATASET = BashOperator(
    task_id='crawl_dataset',
    bash_command='''
        cd {}/scraper && scrapy crawl crawl_dataset
    '''.format(PROJECT_PATH)
)

В настоящее время я вручную установил 5-минутный промежуток между двумя дагами. Эта настройка в настоящее время не работает, а также отсутствует функция, позволяющая сделать dag2 зависимым от dag1 по мере необходимости.

Я проверил ответы здесь и здесь, но не смог понять.

ПРИМЕЧАНИЕ: schedule_intervals являются ориентировочными. Намерение состоит в том, чтобы запускать dag1 каждый понедельник в фиксированное время и dag2 ежедневно в фиксированное время, а в понедельник он должен запускаться только после dag1 завершения. Здесь у каждого дага тоже несколько задач.

См. также:  Документ наследования ODM с ошибкой создания схемы типа SINGLE_COLLECTION
Понравилась статья? Поделиться с друзьями:
IT Шеф
Комментарии: 3
  1. curioswati
    1. Самым простым решением было бы начать вторую группу DAG с ExternalTaskSensor < / a>, который ожидает завершения последней задачи вашего 1-го DAG
    2. В качестве альтернативы вы также можете активировать второй тег в конце 1-го, используя _ 2_. В этом случае, однако, вы не сможете назначить schedule_interval на 2-й даг (поскольку он будет «принудительно» активирован 1-м дагом)

    Спасибо @ y2k-shubham. Я пробовал использовать TriggerDagRunOperator, но, как ясно, ежедневный запуск dag2 не поможет. Я все же попробую что-нибудь придумать с помощью ExternalTaskSensor. person curioswati; 27.08.2019

  2. curioswati

    Вы можете написать две задачи в одной группе обеспечения доступности баз данных и иметь последующий поток для установки зависимости задачи.

    task1.set_downstream(task2)
    

    Что касается различных зависимостей расписания задач, создайте группу DAG с ежедневным расписанием. Для задачи с недельным расписанием напишите shortCircuitOperator, чтобы активировать еженедельный триггер:

    # Set trigger for first day of the week
    def check_trigger_week(execution_date, **kwargs):
        return execution_date.weekday() == 0
    
    # Task should check for the trigger to see if its first day of the week
    check_trigger_weekly = ShortCircuitOperator(
      task_id='check_trigger_weekly',
      python_callable=check_trigger_week,
      provide_context=True,
      dag=dag
    )
    

    Затем сделайте свою еженедельную задачу зависимой от этого еженедельного триггера.

    check_trigger_weekly.set_downstream(task)
    

    спасибо @Saurav, это похоже на решение. но каким-то образом, если я использую ShortCircuitOperator и задача пропускается (потому что условие не выполнено), зависимые задачи также пропускаются. чего я не ожидал. так как задача 1 будет выполняться только еженедельно, а задача 2 должна будет выполняться ежедневно (например, когда задача 1 пропущена). какая-нибудь подсказка по этому поводу? person curioswati; 27.08.2019

    Вы можете использовать условную задачу. Итак, согласно моей первой (не самой лучшей) идее, напишите ежедневное задание дважды. Один раз он запускается при сбое еженедельной задачи и один раз при выполнении. stackoverflow.com/ questions / 43678408 / person curioswati; 27.08.2019

    Вместо этого я использовал триггер none_failed, вдохновленный здесь, но он не работает. даже в этом случае триггер не работает. Я также пробовал all_done, который должен позволять ему работать независимо от результата предыдущей задачи, но не работал. person curioswati; 27.08.2019

    спасибо @saurav за то, что указал мне правильное направление. хотя ShortCircuitOperator у меня не работал из-за той же проблемы, о которой я упоминал выше, относительно пропуска задачи, но, используя ту же логику, я решил решение с BranchPythonOperator. размещение решения ниже. большое спасибо за! person curioswati; 28.08.2019

  3. curioswati

    После долгой борьбы с пониманием потока я, наконец, сам пришел к ответу (не уверен, насколько он оптимален, но сейчас работает для меня). Благодаря этому ответу и ветвление документов. Вот мое решение с использованием BranchPythonOperator.

    dag1.py

    import datetime as dt
    from os import path
    
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import BranchPythonOperator
    
    PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
    
    DEFAULT_ARGS = {
        'owner': 'airflow',
        'start_date': dt.datetime(2019, 8, 20),
        'concurrency': 1,
        'retries': 0
    }
    
    def branch_tasks(execution_date, **kwargs):
        '''
        Branch the tasks based on weekday.
        '''
        # check if the execution day is 'Saturday'
        if execution_date.weekday() == 5:
            return ['crawl_params', 'crawl_dataset']
    
        return 'crawl_dataset'
    
    with DAG('dag1',
             default_args=DEFAULT_ARGS,
             schedule_interval='00 10 * * *',
             catchup=False
            ) as dag:
    
        CRAWL_PARAMS = BashOperator(
            task_id='crawl_params',
            bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
        )
    
        CRAWL_DATASET = BashOperator(
            task_id='crawl_dataset',
            bash_command='cd {}/scraper && scrapy crawl crawl_dataset'.format(PROJECT_PATH),
            trigger_rule='none_failed'
        )
    
    BRANCH_OP = BranchPythonOperator(
        task_id='branch_tasks',
        provide_context=True,
        python_callable=branch_tasks,
        dag=dag
    )
    
    BRANCH_OP.set_downstream([CRAWL_PARAMS, CRAWL_DATASET])
    CRAWL_PARAMS.set_downstream(CRAWL_DATASET)
    

    Здесь BranchPythonOperator использует функцию branch_tasks, чтобы выбрать, какие задачи запускать в зависимости от того, какой сегодня день недели.
    Другой уловкой здесь является то, что когда crawl_params действительно выполняется, когда условие для него истинно, нижестоящие потоки также будут выполняться, но когда он пропускается, его последующие потоки также будут пропущены. Чтобы этого избежать, нам нужно передать trigger_rule='none_failed' оператору задачи. Это означает, что задача должна выполняться, если ни одна из задач восходящего потока не завершилась неудачно (они либо были выполнены успешно, либо были пропущены).

Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: