У нас есть одна облачная функция, основанная на облачном хранилище. Эта облачная функция сработает после загрузки файла в корзину. Когда файл загружен, функция вызывает / запускает DAG воздушного потока. Этот DAG обработает файл.
Проблема в том, что когда несколько файлов помещаются одновременно с за секунду, вызов функции завершается с ошибкой ниже,
b ‘{error: Run id manual__2020-07-31T17: 48: 15 + 00: 00 уже существует для dag id pl_imaoc_trigger_dag} \ n’
Чтобы решить эту проблему, мы передаем run_id как run_id: IMAOC_31072020201842766625, дату в миллисекундах.
Код:
dag_name = environ_vars['imaoc_meta_dag']
webserver_url = (
webserver_id
+ '/api/experimental/dags/'
+ dag_name
+ '/dag_runs'
)
print('webserver_url: {}'.format(webserver_url))
data['run_id'] = _datetime.datetime.now().strftime(**"IMAOC_%d%m%Y%H%M%S%f"**)
resp = map_iap_request(webserver_url,client_id,method = 'POST',json = data)
print('response text:{}'.format(resp))
Но все еще не решена, и AIRFLOW_CTX_DAG_RUN_ID поступает в формате manual__2020-07-31T20: 18: 43 + 00: 00 ….
Не знаю, что делать, чтобы удалить этот конфликт и запустить DAG, если файл приходит в ту же секунду.
Есть ли возможность установить переменную AIRFLOW_CTX_DAG_RUN_ID перед запуском DAG ..? — person Chimmu schedule 03.08.2020
пожалуйста, используйте приведенный ниже код, он работает
у меня это не работает. Облачная функция вообще не запускается. — person Chimmu; 18.12.2020