В настоящее время у меня есть две группы DAG: DAG_A и DAG_B. Оба работают с schedule_interval=timedelta(days=1)
У DAG_A есть задача 1, выполнение которой обычно занимает 7 часов. А DAG_B занимает всего 3 часа.
DAG_B имеет ExternalTaskSensor(external_dag_id="DAG_A", external_task_id="Task1"), но также использует некоторую другую информацию X, которая генерируется ежечасно.
Как лучше всего увеличить частоту DAG_B, чтобы он запускался не менее 4 раз в день? Насколько мне известно, обе группы DAG должны иметь одинаковый schedule_interval. Однако я хочу как можно больше обновить X на DAG_B.
Одна из возможностей — создать еще одну группу обеспечения доступности баз данных, которая имеет ExternalTaskSensor для DAG_B. Но я не думаю, что это лучший способ.
Если я вас правильно понял, ваши условия таковы:
Я думаю, вы можете легко адаптировать свой текущий дизайн, указав
ExternalTaskSensorдождаться желаемой даты выполнения DAG_A.Из Определение оператора ExternalTaskSensor:
Это
execution_dateможно определить с помощью параметраexecution_date_fn:Вы можете определить датчик следующим образом:
Где
_get_execution_date_of_dag_aвыполняет запрос к БД, используяget_last_dagrun, что позволяет получить последнийexecution_dateиз DAG_A.Надеюсь, этот подход вам поможет. Вы можете найти рабочий пример в этом ответе.