Вопросы

Неопубликованные статические данные

При обработке моих данных в ParDo мне нужно использовать схему JSON, хранящуюся в Google Cloud Storage. Я думаю, это может быть неопубликованная загрузка? Я читал страницы, которые они называют документацией (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.pvalue.html), и он содержит что-то о apache_beam.pvalue.AsSingleton и apache_beam.pvalue.AsSideInput, но нет результатов, если я использую их в Google, и я не могу найти ни одного пример для Python.

Как я могу прочитать файл из хранилища из ParDo? Или мне нужно загрузить неопубликованный файл в свой конвейер до ParDo, но как мне тогда использовать этот второй источник в ParDo?

[ИЗМЕНИТЬ]

Мои основные данные поступают от BQ: beam.io.Read(beam.io.BigQuerySource(...
Боковой ввод также поступает от BQ, используя тот же BigQuerySource.

Когда я затем добавляю шаг после того, как основная сторона данных вводит другие данные, я получаю некоторые странные ошибки. Я замечаю, что когда я делаю beam.Map(lambda x: x) в сторону ввода, он работает.

Читать:
Snapchat Login Kit не может выполнить перенаправление в мое приложение

боковой ввод

schema_data = (p | "read schema data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select * from `{schema_table}` limit 1", use_standard_sql=True, flatten_results=True))
                         | beam.Map(lambda x: x)
                       )

основные данные

    source_data = (p | "read source data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select {columns} from `{source_table}` limit 10", use_standard_sql=True, flatten_results=True)))  

комбинирование

validated_records = source_data | 'record validation' >> beam.ParDo(Validate(), pvalue.AsList(schema_data))

Похожие записи

API-шлюз не возвращает ответ

admin

Как загрузить файл .xlsx с помощью FTPWebRequest через C # ASP.NET

admin

Использование сигмоидального вывода для кросс-энтропийной потери на Pytorch

admin

Установка определенной версии python для действия github

admin

Почему протоколы MPPlayableContentManager (MPPlayableContentDataSource и MPPlayableContentDelegate) в iOS 13.2 не вызываются?

admin

Как скомпилировать программу C в GCC, чтобы включить отладку в WinDbg?

admin