Вопросы

Как запланировать запуск двух групп DAG в двух разных schedule_interval, но второй запускается только после завершения первого

У меня есть два разных DAG, которые должны работать на разных частотах. Один, т.е. dag1, должен работать еженедельно, а dag2 — ежедневно. Теперь dag2 должен запускаться только после завершения dag1, при каждом запуске dag1.

Я определил две группы DAG в двух разных модулях Python.

dag1.py

PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

with DAG('dag1',
     default_args={
         'owner': 'airflow',
         'start_date': dt.datetime(2019, 8, 19, 9, 30, 00),
         'concurrency': 1,
         'retries': 0
     }
     schedule_interval='00 10 * * 1',
     catchup=True
    ) as dag:

CRAWL_PARAMS = BashOperator(
    task_id='crawl_params',
    bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)

dag2.py

PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))

with DAG('dag2',
     default_args = {
         'owner': 'airflow',
         'start_date': dt.datetime(2019, 8, 25, 9, 30, 00),
         'concurrency': 1,
         'retries': 0
     }
     schedule_interval='5 10 * * *',
     catchup=True
    ) as dag:

CRAWL_DATASET = BashOperator(
    task_id='crawl_dataset',
    bash_command='''
        cd {}/scraper && scrapy crawl crawl_dataset
    '''.format(PROJECT_PATH)
)

В настоящее время я вручную установил 5-минутный промежуток между двумя дагами. Эта настройка в настоящее время не работает, а также отсутствует функция, позволяющая сделать dag2 зависимым от dag1 по мере необходимости.

Читать:
Как передать столбцы в двух фреймах данных в функцию гаверсина?

Я проверил ответы здесь и здесь, но не смог понять.

ПРИМЕЧАНИЕ: schedule_intervals являются ориентировочными. Намерение состоит в том, чтобы запускать dag1 каждый понедельник в фиксированное время и dag2 ежедневно в фиксированное время, а в понедельник он должен запускаться только после dag1 завершения. Здесь у каждого дага тоже несколько задач.

Похожие записи

Действия Github для NodeJS — «Ошибка: не удается найти модуль» для локального файла

admin

Родительский вид не обновляется после закрытия дочернего представления

admin

Изменение столбцов для получения уникальных значений, транспонирование другого столбца и добавление процента этих уникальных значений.

admin

целевая командная строка makefile

admin

Как использовать указатель функции для возврата указателя функции?

admin

Джулия: Сделайте Dataframe из вывода итератора

admin