Я пытаюсь реализовать инкрементную загрузку данных для извлечения данных из rds postgres в другой postgres rds
Я использую воздушный поток для реализации ETL. Итак, прочитав некоторое время о макросах воздушного потока, я решил, что настрою инкрементный поток с переменными воздушного потока по умолчанию.
Итак, алгоритм такой,
если моя предыдущая дата выполнения None или null или »: выберите данные с начала времени (в нашем случае это год назад), иначе выберите конец предыдущей даты выполнения, если
Примечание: следующий код предназначен сначала для понимания переменных по умолчанию, и это еще не реализовано для проблемы, о которой я упоминал выше.
Соответствующий код для этого показан ниже. Когда я запускаю даг в первый раз, я всегда заканчиваю тем, что печатаю «Нет» для переменной предыдущей успешной даты выполнения, а не историческую дату, как то, что я упомянул. Я не могу понять этого. Любые идеи по этому поводу будут большим подспорьем
from datetime import *
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
default_args={'owner':'airflow','start_date': days_ago(1),'depends_on_past':'False'}
dag = DAG('jinja_trial_10',default_args=default_args,schedule_interval=timedelta(minutes=5))
def printexecutiontimes(**kwargs):
executiondate = kwargs.get('execution_date')
previoussuccessfulexecutiondate = kwargs.get('prev_execution_date_success')
previousexecutiondate = kwargs.get('prev_ds_nodash')
if (previoussuccessfulexecutiondate == 'None' or previoussuccessfulexecutiondate is None):
previoussuccessfulexecutiondate = datetime.strftime(datetime.now() - timedelta(days = 365),'%Y-%m-%d')
print('Execution Date : {0}'.format(executiondate))
print('Previous successful execution date : {0}'.format(previoussuccessfulexecutiondate))
print('Previous execution date : {0}'.format(previousexecutiondate))
print('hello')
task_start = DummyOperator(task_id = 'start',dag=dag)
jinja_task= PythonOperator(task_id = 'TryingoutJinjatemplates',
python_callable =printexecutiontimes,
provide_context = 'True',
dag=dag )
task_end = DummyOperator(task_id = 'end',dag=dag)
task_start >>jinja_task >> task_end
Недавно у меня было что-то очень похожее, и следующий код — это то, что я закончил созданием пользовательской функции с использованием деталей DagRun.
Обратитесь к этому ответу — если вы просто хотите получить последний запуск DAG (независимо от статуса).
Для меня мне нужно было получить последнюю дату успешного запуска, поэтому я создал функцию ниже:
Спасибо за ответ Шариф. Но пока я пробую ваше решение, вы можете помочь мне понять, почему мой код не работает? — person rathimittha; 02.11.2020
Понятия не имею, вот еще несколько ссылок SO, на которые вы можете сослаться, большинство из них закончилось написанием собственного кода. google.com/ — person rathimittha; 02.11.2020
Вот ответ: airflow.apache.org/docs/stable/ — person rathimittha; 02.11.2020
привет, Шариф, я использовал логику в ссылке на stackoverflow (предоставленной Чарли), которую вы предоставили. Однако, насколько я понимаю, даг запускается впервые, и, согласно логике Чарли, я должен получить «без предыдущих запусков». Но у меня никогда не получается, у меня всегда свидание. Я ожидаю, что я должен получить «без предыдущих запусков», чтобы я мог передать правильные даты для выбора дополнительных данных. — person rathimittha; 03.11.2020
Итак, что я вижу, когда я проверяю тип last_dag_run из решения Чарли и тип предыдущего успешного выполнения из моего кода, он говорит ‹type — proxy›. И проверка с ‘is None’, на нем не работает … — person rathimittha; 03.11.2020
После нескольких экспериментов и большого количества чтения я придумал следующий код, и он сработал для меня.
Ниже приведен фрагмент кода для того же
Мне помогло: