При обработке моих данных в ParDo мне нужно использовать схему JSON, хранящуюся в Google Cloud Storage. Я думаю, это может быть неопубликованная загрузка? Я читал страницы, которые они называют документацией (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.pvalue.html), и он содержит что-то о apache_beam.pvalue.AsSingleton
и apache_beam.pvalue.AsSideInput
, но нет результатов, если я использую их в Google, и я не могу найти ни одного пример для Python.
Как я могу прочитать файл из хранилища из ParDo? Или мне нужно загрузить неопубликованный файл в свой конвейер до ParDo, но как мне тогда использовать этот второй источник в ParDo?
[ИЗМЕНИТЬ]
Мои основные данные поступают от BQ: beam.io.Read(beam.io.BigQuerySource(...
Боковой ввод также поступает от BQ, используя тот же BigQuerySource
.
Когда я затем добавляю шаг после того, как основная сторона данных вводит другие данные, я получаю некоторые странные ошибки. Я замечаю, что когда я делаю beam.Map(lambda x: x)
в сторону ввода, он работает.
боковой ввод
schema_data = (p | "read schema data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select * from `{schema_table}` limit 1", use_standard_sql=True, flatten_results=True))
| beam.Map(lambda x: x)
)
основные данные
source_data = (p | "read source data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select {columns} from `{source_table}` limit 10", use_standard_sql=True, flatten_results=True)))
комбинирование
validated_records = source_data | 'record validation' >> beam.ParDo(Validate(), pvalue.AsList(schema_data))
Я нашел похожий вопрос здесь </ а>. Что касается комментариев к этому посту, если ваш файл схемы (в данном случае JSON) находится в известном месте в GCS, вы можете добавить
ParDo
в свой конвейер, который напрямую считывает его из GCS, используя реализациюstart_bundle()
.Вы можете использовать абстракцию Beam
FileSystem
, если вам нужно абстрагироваться от файловой системы, которую вы используете для хранения файла схемы (а не только для GCS).Кроме того, вы можете читать / загружать файлы из хранилища с помощью
API Google Cloud Storage.
Я также нашел здесь a блог, в котором рассказывается о различных моделях чтения источников при использовании Google Cloud Dataflow.
Надеюсь, это поможет.
Я бы использовал упомянутые вами документы в качестве справочника по библиотеке и просмотрел руководство по программированию Beam для получения более подробных пошаговых руководств: боковая секция ввода. Я постараюсь помочь с парой примеров, в которых мы загрузим схему BigQuery из общедоступной таблицы и загрузим ее в GCS:
Наши данные будут некоторыми строками csv без заголовков, поэтому мы должны использовать схему GCS:
Использование боковых входов
Читаем файл JSON в
schema
PCollection:а затем мы передаем его
ParDo
в качестве побочного ввода, чтобы он транслировался каждому рабочему, выполняющемуDoFn
. В этом случае мы можем использоватьAsSingleton
, поскольку мы хотим предоставить схему как одно значение:Теперь мы можем получить доступ к
schema
вprocess
методеEnrichElementsFn
:Обратите внимание, что было бы лучше выполнить обработку схемы (для построения
field_names
), прежде чем сохранять ее как синглтон, чтобы избежать дублирования работы, но это всего лишь иллюстративный пример.Использование начального пакета
В этом случае мы не передаем дополнительный ввод в
ParDo
:И теперь мы используем клиентскую библиотеку Python (нам нужно установить
google-cloud-storage
) для чтения схемы каждый раз, когда рабочий инициализирует пакет:Результат одинаков в обоих случаях:
Протестировано с SDK 2.16.0 и
DirectRunner
.Полный код обоих примеров здесь.
Выглядит отлично. Но вы используете DirectRunner, и он полностью отличается от DataflowRunner, у меня были десятки ситуаций, когда что-то выполнялось локально, но не удаленно. Это просто еще один пример, загрузка неопубликованных файлов отлично работает локально, но не с DataflowRunner. Ошибка — «недопустимое имя таблицы», но Dataflow возвращает в основном случайные сообщения об ошибках, поэтому я не уверен, в чем проблема именно в этой точке, по крайней мере, я уверен, что настоящая проблема не в имени таблицы. — person Thijs; 16.01.2020
Я постараюсь исправить это, обновлю свой вопрос, и если у меня заработает неопубликованная загрузка, я приму ваш ответ. Я беру схему из таблицы BQ, может это вызывает какие-то проблемы. — person Thijs; 16.01.2020
Работает ли тогда при добавлении
beam.Map(lambda x: x)
? Если это так, даже если кажется, что он ничего не делает, он может исправить тип, необходимый в качестве входных данных дляpvalue.AsList()
. Если ошибка не исчезнет, можете ли вы добавить полный пример, включая пример схемы, хранящейся в BigQuery, и трассировку стека ошибок? В противном случае, мне кажется, потребуется много догадок. — person Thijs; 18.01.2020Да, лямбда работает, поэтому проблема может быть в типе данных. «Материализуется» ли BigQuery.read nog при запуске? — person Thijs; 21.01.2020