У меня есть два разных DAG, которые должны работать на разных частотах. Один, т.е. dag1
, должен работать еженедельно, а dag2
— ежедневно. Теперь dag2
должен запускаться только после завершения dag1, при каждом запуске dag1
.
Я определил две группы DAG в двух разных модулях Python.
dag1.py
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
with DAG('dag1',
default_args={
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 19, 9, 30, 00),
'concurrency': 1,
'retries': 0
}
schedule_interval='00 10 * * 1',
catchup=True
) as dag:
CRAWL_PARAMS = BashOperator(
task_id='crawl_params',
bash_command='cd {}/scraper && scrapy crawl crawl_params'.format(PROJECT_PATH)
)
dag2.py
PROJECT_PATH = path.abspath(path.join(path.dirname(__file__), '../..'))
with DAG('dag2',
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2019, 8, 25, 9, 30, 00),
'concurrency': 1,
'retries': 0
}
schedule_interval='5 10 * * *',
catchup=True
) as dag:
CRAWL_DATASET = BashOperator(
task_id='crawl_dataset',
bash_command='''
cd {}/scraper && scrapy crawl crawl_dataset
'''.format(PROJECT_PATH)
)
В настоящее время я вручную установил 5-минутный промежуток между двумя дагами. Эта настройка в настоящее время не работает, а также отсутствует функция, позволяющая сделать dag2
зависимым от dag1
по мере необходимости.
Я проверил ответы здесь и здесь, но не смог понять.
ПРИМЕЧАНИЕ: schedule_intervals
являются ориентировочными. Намерение состоит в том, чтобы запускать dag1
каждый понедельник в фиксированное время и dag2
ежедневно в фиксированное время, а в понедельник он должен запускаться только после dag1
завершения. Здесь у каждого дага тоже несколько задач.
ExternalTaskSensor
< / a>, который ожидает завершения последней задачи вашего 1-го DAGschedule_interval
на 2-й даг (поскольку он будет «принудительно» активирован 1-м дагом)Спасибо @ y2k-shubham. Я пробовал использовать TriggerDagRunOperator, но, как ясно, ежедневный запуск
dag2
не поможет. Я все же попробую что-нибудь придумать с помощью ExternalTaskSensor. — person curioswati; 27.08.2019Вы можете написать две задачи в одной группе обеспечения доступности баз данных и иметь последующий поток для установки зависимости задачи.
Что касается различных зависимостей расписания задач, создайте группу DAG с ежедневным расписанием. Для задачи с недельным расписанием напишите shortCircuitOperator, чтобы активировать еженедельный триггер:
Затем сделайте свою еженедельную задачу зависимой от этого еженедельного триггера.
спасибо @Saurav, это похоже на решение. но каким-то образом, если я использую ShortCircuitOperator и задача пропускается (потому что условие не выполнено), зависимые задачи также пропускаются. чего я не ожидал. так как задача 1 будет выполняться только еженедельно, а задача 2 должна будет выполняться ежедневно (например, когда задача 1 пропущена). какая-нибудь подсказка по этому поводу? — person curioswati; 27.08.2019
Вы можете использовать условную задачу. Итак, согласно моей первой (не самой лучшей) идее, напишите ежедневное задание дважды. Один раз он запускается при сбое еженедельной задачи и один раз при выполнении. stackoverflow.com/ questions / 43678408 / — person curioswati; 27.08.2019
Вместо этого я использовал триггер none_failed, вдохновленный здесь, но он не работает. даже в этом случае триггер не работает. Я также пробовал all_done, который должен позволять ему работать независимо от результата предыдущей задачи, но не работал. — person curioswati; 27.08.2019
спасибо @saurav за то, что указал мне правильное направление. хотя ShortCircuitOperator у меня не работал из-за той же проблемы, о которой я упоминал выше, относительно пропуска задачи, но, используя ту же логику, я решил решение с BranchPythonOperator. размещение решения ниже. большое спасибо за! — person curioswati; 28.08.2019
После долгой борьбы с пониманием потока я, наконец, сам пришел к ответу (не уверен, насколько он оптимален, но сейчас работает для меня). Благодаря этому ответу и ветвление документов. Вот мое решение с использованием BranchPythonOperator.
dag1.py
Здесь BranchPythonOperator использует функцию branch_tasks, чтобы выбрать, какие задачи запускать в зависимости от того, какой сегодня день недели.
Другой уловкой здесь является то, что когда
crawl_params
действительно выполняется, когда условие для него истинно, нижестоящие потоки также будут выполняться, но когда он пропускается, его последующие потоки также будут пропущены. Чтобы этого избежать, нам нужно передатьtrigger_rule='none_failed'
оператору задачи. Это означает, что задача должна выполняться, если ни одна из задач восходящего потока не завершилась неудачно (они либо были выполнены успешно, либо были пропущены).