У меня есть простой пример, показывающий DAG с двумя уровнями. При запуске этот даг не работает из-за искусственной ошибки, есть одна failed
задача и одна upstream_fail
.
bug = True
def process1(param):
print("process 1 running {}".format(param))
if bug and (param == 2):
raise Exception("failure!!")
def process2(param):
print("process 2 running {}".format(param))
with dag:
for i in range(10):
task1 = PythonOperator(
task_id="process_1_{}".format(i),
python_callable=process1,
op_kwargs={'param': i}
)
task2 = PythonOperator(
task_id="process_2_{}".format(i),
python_callable=process2,
op_kwargs={'param': i},
trigger_rule=TriggerRule.ALL_SUCCESS,
retries=2
)
task1 >> task2
Теперь предположим, что я исправил ошибку (bug = False
) и попытался очистить все неудачные задачи:
airflow clear -s 2001 -e 2019 --only_failed test_resubmit
Эта команда очищает задачу test_resubmit.process_1_2
, и она будет успешно выполнена, однако ее нисходящий поток (т.е. test_resubmit.process_2_2
) все еще находится в состоянии upstream_failed
. Как заставить все задачи upstream_failed «повторить попытку» после того, как состояние их родителей изменилось на успешное?
Состояние
upstream_failed
— это конечное состояние, поэтому оно не будет повторять попытку, даже если его зависимости теперь удовлетворены (в отличие отup_for_retry
). Вы захотите пройти--downstream
, чтобы задачи, стоящие ниже по потоку, также были очищены.См. Все параметры в https://airflow.readthedocs.io/en/stable/cli.html#clear.
Пробовал —downstream, не работает. Это не очищает нижележащую задачу от неудачной. Мне пришлось вручную выбрать все последующие задачи и очистить их. — person motam79; 28.12.2018