Я пытаюсь протестировать даг с более чем одной задачей в тестовой среде. Мне удалось протестировать отдельную задачу, связанную с дагом, но я хочу создать несколько задач в даге и выполнить первую задачу. Для тестирования одной задачи в даге использую
task1.run()
который исполняется. Но то же самое не работает, когда у меня много задач одна за другой в даге.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('tutorial', default_args=default_args)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
t2.set_upstream(t1)
t1.run() # It is executing just first task.
Чтобы выполнить вторую задачу, мне нужно запустить t2.run (), что мне не нужно, поскольку я разрабатываю DAG. Как этого добиться?
Пытаюсь протестировать весь прогон дага. Если мы поместим приведенный выше код внутрь тестового оператора, он не сработает. Суть в том, что я готов протестировать весь прогон dag, который состоит из множества зависимых задач, с использованием pytest. — person mad_ schedule 25.04.2018
Возможный дубликат модульного теста Airflow Python? — person mad_ schedule 25.04.2018
Эта ссылка, указывающая на модульный тест Dag, включает только одну задачу. Я хочу протестировать весь даг, а не только одну задачу на даг. Надеюсь, это устранит путаницу. — person mad_ schedule 25.04.2018
что ты уже испробовал? Как выглядит тестовый код? А что насчет dag.run()
? — person mad_ schedule 25.04.2018
Не знаю, почему не щелкнуло. простой dag.run () работает. Спасибо — person mad_ schedule 25.04.2018
dag.run () работает в воздушном потоке 1.7, но, похоже, не работает в 1.9 — person mad_ schedule 21.06.2018
Посмотрите здесь: blog.godatadriven.com/testing-and-debugging-apache -воздушный поток — person mad_ schedule 25.03.2020
Я еще не совсем уверен, что понимаю ваш вопрос, но попробую начать отвечать.
Если ваша цель — просто запустить группу DAG или подмножества ее задач вручную, вы можете добиться этого с помощью интерфейса командной строки, например:
$ airflow run ...
— запустить экземпляр задачи$ airflow test ...
— протестировать экземпляр задачи без проверки зависимостей и записи состояния в базу данных$ airflow trigger_dag ...
— запускать конкретный запуск DAG DAGДокументы CLI — https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html.
Я думаю, что команда запуска воздушного потока является наиболее подходящей для вашего варианта использования.
Во время выполнения планирование задач в группах обеспечения доступности баз данных и выполнение подчиненных зависимостей после выполнения их требований выполняется исполнителем автоматически. Вам не нужно вызывать run () в любом месте вашего кода.
Что касается самого метода запуска, код все еще там:
Вопросов
Спасибо, Тейлор, что нашли время и дали ответ. Мне нужно протестировать даг с зависимостями внутри модульного теста. Поэтому я не хочу хранить файл в файле / позволять отображать тег в графическом интерфейсе, поскольку это тестовый файл. Я хочу загрузить весь даг в модульном тесте и запустить оттуда. Раньше он работал в 1.7, но не совсем уверен, что мне не хватает, чтобы заставить его работать в 1.9. Да, это связано с моим другим вопросом, поскольку он связан с другой версией, поэтому оставил два вопроса отдельно, чтобы избежать путаницы. Проблема запуска дага, как в приведенном выше примере, заключается в том, что он не соблюдает зависимость — person mad_; 03.07.2018