Как проверить dag воздушного потока в unittest?

Я пытаюсь протестировать даг с более чем одной задачей в тестовой среде. Мне удалось протестировать отдельную задачу, связанную с дагом, но я хочу создать несколько задач в даге и выполнить первую задачу. Для тестирования одной задачи в даге использую

task1.run()

который исполняется. Но то же самое не работает, когда у меня много задач одна за другой в даге.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args)

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)

t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)

t2.set_upstream(t1)

t1.run() # It is executing just first task.

Чтобы выполнить вторую задачу, мне нужно запустить t2.run (), что мне не нужно, поскольку я разрабатываю DAG. Как этого добиться?

Пытаюсь протестировать весь прогон дага. Если мы поместим приведенный выше код внутрь тестового оператора, он не сработает. Суть в том, что я готов протестировать весь прогон dag, который состоит из множества зависимых задач, с использованием pytest.   —  person mad_    schedule 25.04.2018

Возможный дубликат модульного теста Airflow Python?   —  person mad_    schedule 25.04.2018

Эта ссылка, указывающая на модульный тест Dag, включает только одну задачу. Я хочу протестировать весь даг, а не только одну задачу на даг. Надеюсь, это устранит путаницу.   —  person mad_    schedule 25.04.2018

что ты уже испробовал? Как выглядит тестовый код? А что насчет dag.run()?   —  person mad_    schedule 25.04.2018

Не знаю, почему не щелкнуло. простой dag.run () работает. Спасибо   —  person mad_    schedule 25.04.2018

См. также:  Ошибка при добавлении ролей реакции в discord.py

dag.run () работает в воздушном потоке 1.7, но, похоже, не работает в 1.9   —  person mad_    schedule 21.06.2018

Посмотрите здесь: blog.godatadriven.com/testing-and-debugging-apache -воздушный поток   —  person mad_    schedule 25.03.2020

Понравилась статья? Поделиться с друзьями:
IT Шеф
Комментарии: 1
  1. mad_

    Я еще не совсем уверен, что понимаю ваш вопрос, но попробую начать отвечать.

    Если ваша цель — просто запустить группу DAG или подмножества ее задач вручную, вы можете добиться этого с помощью интерфейса командной строки, например:

    • $ airflow run ... — запустить экземпляр задачи
    • $ airflow test ... — протестировать экземпляр задачи без проверки зависимостей и записи состояния в базу данных
    • $ airflow trigger_dag ... — запускать конкретный запуск DAG DAG

    Документы CLI — https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html.

    Я думаю, что команда запуска воздушного потока является наиболее подходящей для вашего варианта использования.

    Во время выполнения планирование задач в группах обеспечения доступности баз данных и выполнение подчиненных зависимостей после выполнения их требований выполняется исполнителем автоматически. Вам не нужно вызывать run () в любом месте вашего кода.

    Что касается самого метода запуска, код все еще там:

    Вопросов

    1. Когда вы говорите протестировать DAG в тестовой среде, что вы имеете в виду именно? Как в CI или в юнит-тестах?
    2. Это код для теста или код из одной из ваших реальных групп DAG?
    3. Связано ли это с вашим другим недавним вопросом Test Dag run for Airflow 1.9 в unittest?

    Спасибо, Тейлор, что нашли время и дали ответ. Мне нужно протестировать даг с зависимостями внутри модульного теста. Поэтому я не хочу хранить файл в файле / позволять отображать тег в графическом интерфейсе, поскольку это тестовый файл. Я хочу загрузить весь даг в модульном тесте и запустить оттуда. Раньше он работал в 1.7, но не совсем уверен, что мне не хватает, чтобы заставить его работать в 1.9. Да, это связано с моим другим вопросом, поскольку он связан с другой версией, поэтому оставил два вопроса отдельно, чтобы избежать путаницы. Проблема запуска дага, как в приведенном выше примере, заключается в том, что он не соблюдает зависимость person mad_; 03.07.2018

Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: