Переменные воздушного потока по умолчанию — Настройка инкрементной нагрузки

Я пытаюсь реализовать инкрементную загрузку данных для извлечения данных из rds postgres в другой postgres rds

Я использую воздушный поток для реализации ETL. Итак, прочитав некоторое время о макросах воздушного потока, я решил, что настрою инкрементный поток с переменными воздушного потока по умолчанию.

Итак, алгоритм такой,

если моя предыдущая дата выполнения None или null или »: выберите данные с начала времени (в нашем случае это год назад), иначе выберите конец предыдущей даты выполнения, если

Примечание: следующий код предназначен сначала для понимания переменных по умолчанию, и это еще не реализовано для проблемы, о которой я упоминал выше.

Соответствующий код для этого показан ниже. Когда я запускаю даг в первый раз, я всегда заканчиваю тем, что печатаю «Нет» для переменной предыдущей успешной даты выполнения, а не историческую дату, как то, что я упомянул. Я не могу понять этого. Любые идеи по этому поводу будут большим подспорьем

from datetime import *
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

default_args={'owner':'airflow','start_date': days_ago(1),'depends_on_past':'False'}


dag = DAG('jinja_trial_10',default_args=default_args,schedule_interval=timedelta(minutes=5))



def printexecutiontimes(**kwargs):
    executiondate = kwargs.get('execution_date')
    previoussuccessfulexecutiondate =  kwargs.get('prev_execution_date_success')
    previousexecutiondate = kwargs.get('prev_ds_nodash')

    if (previoussuccessfulexecutiondate == 'None' or previoussuccessfulexecutiondate is None):
        previoussuccessfulexecutiondate = datetime.strftime(datetime.now() - timedelta(days = 365),'%Y-%m-%d')


    print('Execution Date : {0}'.format(executiondate))
    print('Previous successful execution date : {0}'.format(previoussuccessfulexecutiondate))
    print('Previous execution date : {0}'.format(previousexecutiondate))


    print('hello')




task_start  = DummyOperator(task_id = 'start',dag=dag)

jinja_task= PythonOperator(task_id = 'TryingoutJinjatemplates',
                           python_callable =printexecutiontimes,
                           provide_context = 'True',
                           dag=dag )

task_end = DummyOperator(task_id = 'end',dag=dag)


task_start >>jinja_task >> task_end

См. также:  Composer (Airflow) DAG RunID конфликт в GCP
Понравилась статья? Поделиться с друзьями:
IT Шеф
Комментарии: 3
  1. rathimittha

    Недавно у меня было что-то очень похожее, и следующий код — это то, что я закончил созданием пользовательской функции с использованием деталей DagRun.

    Обратитесь к этому ответу — если вы просто хотите получить последний запуск DAG (независимо от статуса).

    Для меня мне нужно было получить последнюю дату успешного запуска, поэтому я создал функцию ниже:

    def get_last_dag_run(dag_id):
        dag_runs = DagRun.find(dag_id=dag_id)
        dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
    
        for dag_run in dag_runs:
            #print all dag runs - debug only
            print(f"All ----- state: {dag_run.state} , run_id: {dag_run.run_id} , execution_date: {dag_run.execution_date}")
    
        print('Success runs ---------------------------------')
        dag_runs = list(filter(lambda x: x.state == 'success', dag_runs))
        for dag_run in dag_runs:
            #print successfull dag runs - debug only
            print(f"Success - state: {dag_run.state} , run_id: {dag_run.run_id} , execution_date: {dag_run.execution_date}")
        
        # return last execution run or default value (01-01-1970) 
        return dag_runs[0].execution_date if dag_runs else datetime(1970, 1, 1)
    

    Спасибо за ответ Шариф. Но пока я пробую ваше решение, вы можете помочь мне понять, почему мой код не работает? person rathimittha; 02.11.2020

    Понятия не имею, вот еще несколько ссылок SO, на которые вы можете сослаться, большинство из них закончилось написанием собственного кода. google.com/ person rathimittha; 02.11.2020

    Вот ответ: airflow.apache.org/docs/stable/ person rathimittha; 02.11.2020

    привет, Шариф, я использовал логику в ссылке на stackoverflow (предоставленной Чарли), которую вы предоставили. Однако, насколько я понимаю, даг запускается впервые, и, согласно логике Чарли, я должен получить «без предыдущих запусков». Но у меня никогда не получается, у меня всегда свидание. Я ожидаю, что я должен получить «без предыдущих запусков», чтобы я мог передать правильные даты для выбора дополнительных данных. person rathimittha; 03.11.2020

    Итак, что я вижу, когда я проверяю тип last_dag_run из решения Чарли и тип предыдущего успешного выполнения из моего кода, он говорит ‹type — proxy›. И проверка с ‘is None’, на нем не работает … person rathimittha; 03.11.2020

  2. rathimittha

    После нескольких экспериментов и большого количества чтения я придумал следующий код, и он сработал для меня.

    • Создайте переменную в пользовательском интерфейсе Airflow и присвойте ей значение 0
    • Используйте предопределенные переменные Airflow, чтобы определить, является ли это полной или дополнительной нагрузкой.
    • Псевдокод —
    If
        value of Variable created = 0
    then 
        set Variable = 1
        set the start data to point in time in the past(a date-time from the inception of a certain process)
        set the end date to the value of the "execution_date" (defined as a part of airflow's predefined variables)
    else
        set the start date to "prev_execution_date_success" (defined as a part of airflow's predefined variables)
        set the end date to "execution_date" (defined as a part of airflow's predefined variables)
    end
    

    Ниже приведен фрагмент кода для того же

    from datetime import *
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.utils.dates import days_ago
    from airflow.models import Variable
    
    default_args={'owner':'airflow','start_date': datetime(2020,11,3,12,5),'depends_on_past':'False'}
    
    
    dag = DAG('airflow_incremental_load_setup',default_args=default_args,schedule_interval=timedelta(minutes=5))
    
    def printexecutiontimes(**kwargs):
        # Variable to be created before the running of dag
        full_load_check = Variable.get('full_load_completion')
        print('full_load_check : {0}'.format(full_load_check))
        if full_load_check == '0':
            print('First execution')
            print('Execution date : {0}'.format(kwargs.get('execution_date')))
            print('Actual start date : {0}'.format(kwargs.get('ds')))
            print('Previous successful execution date : {0}'.format(kwargs.get('prev_execution_date_success')))
            print('Calculated field : {0}'.format(datetime.strftime(datetime.now() - timedelta(days=365), '%Y-%m-%d')))
            Variable.set('full_load_check', '1')
            start_date = datetime.strftime(datetime.now() - timedelta(days=365), '%Y-%m-%d')
            end_date = datetime.strftime(kwargs.get('execution_date'), '%Y-%m-%d')
        else:
            print('After the first execution ..')
            print('Execution date : {0}'.format(kwargs.get('execution_date')))
            print('Actual start date : {0}'.format(kwargs.get('ds')))
            print('Previous successful execution date : {0}'.format(kwargs.get('prev_execution_date_success')))
            print('Calculated field : {0}'.format(kwargs.get('prev_execution_date_success')))
            start_date = kwargs.get('prev_execution_date_success')
            start_date = parse(str(start_date))
            end_date = kwargs.get('execution_date')
            end_date = parse(str(end_date))
            print('Type of start_date_check : {0}'.format(type(start_date)))
            start_date = datetime.strftime(start_date, '%Y-%m-%d')
            end_date = datetime.strftime(end_date, '%Y-%m-%d')
    
    task_start  = DummyOperator(task_id = 'start',dag=dag)
    
    main_task= PythonOperator(task_id = 'IncrementalJobTask',
                                python_callable =printexecutiontimes,
                                provide_context = 'True',
                                dag=dag )
    
    task_end = DummyOperator(task_id = 'end',dag=dag)
    
    
    task_start >>main_task >> task_end
    
  3. rathimittha

    Мне помогло:

    if isinstance(context['prev_execution_date_success'], type(None)):
    
Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: