У меня есть два разных DAG, которые должны работать на разных частотах. Один, т.е. dag1, должен работать еженедельно, а dag2 — ежедневно. Теперь dag2 должен запускаться только после завершения dag1, при каждом запуске dag1.
Я определил две группы DAG в двух разных модулях Python.
dag1.py
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
with DAG('dag1',
default_args={
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 19, 9, 30, 00),
'concurrency': 1,
'retries': 0
}
schedule_interval='00 10 * * 1',
catchup=True
) as dag:
CRAWL_PARAMS = BashOperator(
task_id='crawl_params',
bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)
dag2.py
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
with DAG('dag2',
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 25, 9, 30, 00),
'concurrency': 1,
'retries': 0
}
schedule_interval='5 10 * * *',
catchup=True
) as dag:
CRAWL_DATASET = BashOperator(
task_id='crawl_dataset',
bash_command='''
cd {}/scraper && scrapy crawl crawl_dataset
'''.format(PROJECT_PATH)
)
В настоящее время я вручную установил 5-минутный промежуток между двумя дагами. Эта настройка в настоящее время не работает, а также отсутствует функция, позволяющая сделать dag2 зависимым от dag1 по мере необходимости.
Я проверил ответы здесь и здесь, но не смог понять.
ПРИМЕЧАНИЕ: schedule_intervals являются ориентировочными. Намерение состоит в том, чтобы запускать dag1 каждый понедельник в фиксированное время и dag2 ежедневно в фиксированное время, а в понедельник он должен запускаться только после dag1 завершения. Здесь у каждого дага тоже несколько задач.
