Я пытался использовать макросы Airflow, ссылающиеся на переменные по умолчанию и BigQuery. метки для регистрации метаданных запросов, отправленных через Airflow BigQueryOperator. Вот определение оператора:
BigQuery_Labels_Test_Task = BigQueryOperator(
task_id="BigQuery_Labels_Test_Task",
bql="SELECT 1",
use_legacy_sql=False,
bigquery_conn_id="gcp_bq_connection",
destination_dataset_table=f"test_dataset.test_table",
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
labels={
"dag_id": "{{ dag.dag_id }}",
"task_id": "{{ task.task_id }}",
"run_id": "{{ run_id }}",
},
dag=dag,
)
Но когда он выполняется, он выдает следующую ошибку:
[2020-06-01 08:15:13,495] {{taskinstance.py:887}} INFO - Executing <Task(BigQueryOperator): BigQuery_Labels_Test_Task> on 2020-06-01T07:51:40.752935+00:00
[2020-06-01 08:15:13,499] {{standard_task_runner.py:53}} INFO - Started process 16317 to run task
[2020-06-01 08:15:13,567] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: BigQuery_Labels_Test_DAG.BigQuery_Labels_Test_Task 2020-06-01T07:51:40.752935+00:00 [running]> b438a71d2d52
[2020-06-01 08:15:13,592] {{bigquery_operator.py:255}} INFO - Executing: SELECT 1
[2020-06-01 08:15:14,161] {{taskinstance.py:1128}} ERROR - <HttpError 400 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/test-project/jobs?alt=json returned "Label value "manual__2020-06-01T07:51:40.752935+00:00" has invalid characters.">
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/contrib/operators/bigquery_operator.py", line 282, in execute
encryption_configuration=self.encryption_configuration
File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 910, in run_query
return self.run_with_configuration(configuration)
File "/usr/local/lib/python3.7/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1318, in run_with_configuration
.execute(num_retries=self.num_retries)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/googleapiclient/_helpers.py", line 134, in positional_wrapper
return wrapped(*args, **kwargs)
File "/usr/local/airflow/.local/lib/python3.7/site-packages/googleapiclient/http.py", line 907, in execute
raise HttpError(resp, content, uri=self.uri)
googleapiclient.errors.HttpError: <HttpError 400 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/test-project/jobs?alt=json returned "Label value "manual__2020-06-01T07:51:40.752935+00:00" has invalid characters.">
[2020-06-01 08:15:14,165] {{taskinstance.py:1151}} INFO - Marking task as UP_FOR_RETRY
Кто-нибудь сталкивался с чем-нибудь подобным? Есть ли у поля метки в bq какие-либо ограничения по символам?
PS: Когда я жестко кодирую что-то вроде следующего для значений меток, это работает:
labels={
"dag_id": "dag_id",
"task_id": "task_id",
},
Только строчные значения работают с жестким кодированием.
Ответ сработал для вас? Если да, примите его, чтобы он был более заметным и полезным для людей, столкнувшихся с подобной проблемой. — person Aaditya Ganapathy schedule 02.06.2020
Эта проблема не связана с воздушным потоком, а связана с BigQuery. Для определения меток run_id не соответствует нижеприведенным требованиям (run_id имеет +, :):
Дополнительную информацию о ярлыках BigQuery см. В этом.