Airflow ExternalTaskSensor с различным интервалом планировщика

В настоящее время у меня есть две группы DAG: DAG_A и DAG_B. Оба работают с schedule_interval=timedelta(days=1)

У DAG_A есть задача 1, выполнение которой обычно занимает 7 часов. А DAG_B занимает всего 3 часа.

DAG_B имеет ExternalTaskSensor(external_dag_id="DAG_A", external_task_id="Task1"), но также использует некоторую другую информацию X, которая генерируется ежечасно.

Как лучше всего увеличить частоту DAG_B, чтобы он запускался не менее 4 раз в день? Насколько мне известно, обе группы DAG должны иметь одинаковый schedule_interval. Однако я хочу как можно больше обновить X на DAG_B.


Одна из возможностей — создать еще одну группу обеспечения доступности баз данных, которая имеет ExternalTaskSensor для DAG_B. Но я не думаю, что это лучший способ.

См. также:  Как запланировать запуск двух групп DAG в двух разных schedule_interval, но второй запускается только после завершения первого
Понравилась статья? Поделиться с друзьями:
IT Шеф
Комментарии: 1
  1. Leonardo Farias

    Если я вас правильно понял, ваши условия таковы:

    • Продолжайте работать DAG_A ежедневно
    • Запускать DAG_B n раз в день
    • Каждый раз, когда запускается DAG_B, он будет ждать завершения DAG_A__Task_1

    Я думаю, вы можете легко адаптировать свой текущий дизайн, указав ExternalTaskSensor дождаться желаемой даты выполнения DAG_A.

    Из Определение оператора ExternalTaskSensor:

    Ожидает завершения другой группы обеспечения доступности баз данных или задачи в другой группе обеспечения доступности баз данных для определенной даты выполнения.

    Это execution_date можно определить с помощью параметра execution_date_fn:

    Execution_date_fn (Необязательно [Callable]) — функция, которая получает текущую дату выполнения в качестве первого позиционного аргумента и, возможно, любое количество аргументов ключевого слова, доступных в словаре контекста, и возвращает желаемые даты выполнения для запроса. В ExternalTaskSensor можно передать либо execution_delta, либо execution_date_fn, но не то и другое одновременно.

    Вы можете определить датчик следующим образом:

        wait_for_dag_a = ExternalTaskSensor(
            task_id='wait_for_dag_a',
            external_task_id="external_task_1",
            external_dag_id='dag_a_id',
            allowed_states=['success', 'failed'],
            execution_date_fn=_get_execution_date_of_dag_a,
            poke_interval=30
        )
    

    Где _get_execution_date_of_dag_a выполняет запрос к БД, используя get_last_dagrun, что позволяет получить последний execution_date из DAG_A.

    from airflow.utils.db import provide_session
    from airflow.models.dag import get_last_dagrun
    
    @provide_session
    def _get_execution_date_of_dag_a(exec_date, session=None,  **kwargs):
        dag_a_last_run = get_last_dagrun(
            'dag_a_id', session)
        return dag_a_last_run.execution_date
    

    Надеюсь, этот подход вам поможет. Вы можете найти рабочий пример в этом ответе.

Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: