Composer (Airflow) DAG RunID конфликт в GCP

У нас есть одна облачная функция, основанная на облачном хранилище. Эта облачная функция сработает после загрузки файла в корзину. Когда файл загружен, функция вызывает / запускает DAG воздушного потока. Этот DAG обработает файл.

Проблема в том, что когда несколько файлов помещаются одновременно с за секунду, вызов функции завершается с ошибкой ниже,

b ‘{error: Run id manual__2020-07-31T17: 48: 15 + 00: 00 уже существует для dag id pl_imaoc_trigger_dag} \ n’

Чтобы решить эту проблему, мы передаем run_id как run_id: IMAOC_31072020201842766625, дату в миллисекундах.

Код:

dag_name = environ_vars['imaoc_meta_dag']
    webserver_url = (
        webserver_id
        + '/api/experimental/dags/'
        + dag_name
        + '/dag_runs'
    )

    print('webserver_url: {}'.format(webserver_url))
    data['run_id'] = _datetime.datetime.now().strftime(**"IMAOC_%d%m%Y%H%M%S%f"**)
    resp = map_iap_request(webserver_url,client_id,method = 'POST',json = data)
    print('response text:{}'.format(resp))

Но все еще не решена, и AIRFLOW_CTX_DAG_RUN_ID поступает в формате manual__2020-07-31T20: 18: 43 + 00: 00 ….

Не знаю, что делать, чтобы удалить этот конфликт и запустить DAG, если файл приходит в ту же секунду.

Есть ли возможность установить переменную AIRFLOW_CTX_DAG_RUN_ID перед запуском DAG ..?   —  person Chimmu    schedule 03.08.2020

См. также:  Почему динамические цвета из моего каталога цветов не работают на моей панели навигации?
Понравилась статья? Поделиться с друзьями:
IT Шеф
Комментарии: 1
  1. Chimmu

    пожалуйста, используйте приведенный ниже код, он работает

    client_id = os.getenv("CLIENT_ID")
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = os.getenv("TENANT_PROJECT")
    # The name of the DAG you wish to trigger
    dag_name = os.getenv("DAG_NAME")
    webserver_url = (
        'https://'
        + webserver_id
        + '.appspot.com/api/experimental/dags/'
        + dag_name
        + '/dag_runs'
    )
    # Make a POST request to IAP which then Triggers the DAG
    run_id = datetime.utcnow().strftime('alpaca_%Y-%m-%dT%H:%M:%S.%f')
    
    conf = {"conf": data}
    print(f"JSON body = {conf}")
    
    make_iap_request(
        webserver_url, client_id, method='POST', json={"conf": data, "run_id": run_id, "replace_microseconds": False})
    

    у меня это не работает. Облачная функция вообще не запускается. person Chimmu; 18.12.2020

Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: