В настоящее время у меня есть две группы DAG: DAG_A и DAG_B. Оба работают с schedule_interval=timedelta(days=1)
У DAG_A есть задача 1, выполнение которой обычно занимает 7 часов. А DAG_B занимает всего 3 часа.
DAG_B имеет ExternalTaskSensor(external_dag_id="DAG_A", external_task_id="Task1")
, но также использует некоторую другую информацию X, которая генерируется ежечасно.
Как лучше всего увеличить частоту DAG_B, чтобы он запускался не менее 4 раз в день? Насколько мне известно, обе группы DAG должны иметь одинаковый schedule_interval. Однако я хочу как можно больше обновить X на DAG_B.
Одна из возможностей — создать еще одну группу обеспечения доступности баз данных, которая имеет ExternalTaskSensor для DAG_B. Но я не думаю, что это лучший способ.
Если я вас правильно понял, ваши условия таковы:
Я думаю, вы можете легко адаптировать свой текущий дизайн, указав
ExternalTaskSensor
дождаться желаемой даты выполнения DAG_A.Из Определение оператора ExternalTaskSensor:
Это
execution_date
можно определить с помощью параметраexecution_date_fn
:Вы можете определить датчик следующим образом:
Где
_get_execution_date_of_dag_a
выполняет запрос к БД, используяget_last_dagrun
, что позволяет получить последнийexecution_date
из DAG_A.Надеюсь, этот подход вам поможет. Вы можете найти рабочий пример в этом ответе.