Неопубликованные статические данные

При обработке моих данных в ParDo мне нужно использовать схему JSON, хранящуюся в Google Cloud Storage. Я думаю, это может быть неопубликованная загрузка? Я читал страницы, которые они называют документацией (https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.pvalue.html), и он содержит что-то о apache_beam.pvalue.AsSingleton и apache_beam.pvalue.AsSideInput, но нет результатов, если я использую их в Google, и я не могу найти ни одного пример для Python.

Как я могу прочитать файл из хранилища из ParDo? Или мне нужно загрузить неопубликованный файл в свой конвейер до ParDo, но как мне тогда использовать этот второй источник в ParDo?

[ИЗМЕНИТЬ]

Мои основные данные поступают от BQ: beam.io.Read(beam.io.BigQuerySource(...
Боковой ввод также поступает от BQ, используя тот же BigQuerySource.

Когда я затем добавляю шаг после того, как основная сторона данных вводит другие данные, я получаю некоторые странные ошибки. Я замечаю, что когда я делаю beam.Map(lambda x: x) в сторону ввода, он работает.

боковой ввод

schema_data = (p | "read schema data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select * from `{schema_table}` limit 1", use_standard_sql=True, flatten_results=True))
                         | beam.Map(lambda x: x)
                       )

основные данные

    source_data = (p | "read source data" >> beam.io.Read(beam.io.BigQuerySource(query=f"select {columns} from `{source_table}` limit 10", use_standard_sql=True, flatten_results=True)))  

комбинирование

validated_records = source_data | 'record validation' >> beam.ParDo(Validate(), pvalue.AsList(schema_data))

См. также:  Связанный список: как реализовать деструктор, конструктор копирования и оператор присваивания копирования?
Понравилась статья? Поделиться с друзьями:
IT Шеф
Комментарии: 2
  1. Thijs

    Я бы использовал упомянутые вами документы в качестве справочника по библиотеке и просмотрел руководство по программированию Beam для получения более подробных пошаговых руководств: боковая секция ввода. Я постараюсь помочь с парой примеров, в которых мы загрузим схему BigQuery из общедоступной таблицы и загрузим ее в GCS:

    bq show --schema bigquery-public-data:usa_names.usa_1910_current > schema.json
    gsutil cp schema.json gs://$BUCKET
    

    Наши данные будут некоторыми строками csv без заголовков, поэтому мы должны использовать схему GCS:

    data = [('NC', 'F', 2020, 'Hello', 3200),
            ('NC', 'F', 2020, 'World', 3180)]
    

    Использование боковых входов

    Читаем файл JSON в schema PCollection:

    schema = (p 
      | 'Read Schema from GCS' >> ReadFromText('gs://{}/schema.json'.format(BUCKET)))
    

    а затем мы передаем его ParDo в качестве побочного ввода, чтобы он транслировался каждому рабочему, выполняющему DoFn. В этом случае мы можем использовать AsSingleton, поскольку мы хотим предоставить схему как одно значение:

    (p
      | 'Create Events' >> beam.Create(data) \
      | 'Enrich with side input' >> beam.ParDo(EnrichElementsFn(), pvalue.AsSingleton(schema)) \
      | 'Log elements' >> beam.ParDo(LogElementsFn()))
    

    Теперь мы можем получить доступ к schema в process методе EnrichElementsFn:

    class EnrichElementsFn(beam.DoFn):
      """Zips data with schema stored in GCS"""
      def process(self, element, schema):
        field_names = [x['name'] for x in json.loads(schema)]
        yield zip(field_names, element)
    

    Обратите внимание, что было бы лучше выполнить обработку схемы (для построения field_names), прежде чем сохранять ее как синглтон, чтобы избежать дублирования работы, но это всего лишь иллюстративный пример.


    Использование начального пакета

    В этом случае мы не передаем дополнительный ввод в ParDo:

    (p
      | 'Create Events' >> beam.Create(data) \
      | 'Enrich with start bundle' >> beam.ParDo(EnrichElementsFn()) \
      | 'Log elements' >> beam.ParDo(LogElementsFn()))
    

    И теперь мы используем клиентскую библиотеку Python (нам нужно установить google-cloud-storage) для чтения схемы каждый раз, когда рабочий инициализирует пакет:

    class EnrichElementsFn(beam.DoFn):
      """Zips data with schema stored in GCS"""
      def start_bundle(self):
        from google.cloud import storage
    
        client = storage.Client()
        blob = client.get_bucket(BUCKET).get_blob('schema.json')
        self.schema = blob.download_as_string()
    
      def process(self, element):
        field_names = [x['name'] for x in json.loads(self.schema)]
        yield zip(field_names, element)
    

    Результат одинаков в обоих случаях:

    INFO:root:[(u'state', 'NC'), (u'gender', 'F'), (u'year', 2020), (u'name', 'Hello'), (u'number', 3200)]
    INFO:root:[(u'state', 'NC'), (u'gender', 'F'), (u'year', 2020), (u'name', 'World'), (u'number', 3180)]
    

    Протестировано с SDK 2.16.0 и DirectRunner.

    Полный код обоих примеров здесь.

    Выглядит отлично. Но вы используете DirectRunner, и он полностью отличается от DataflowRunner, у меня были десятки ситуаций, когда что-то выполнялось локально, но не удаленно. Это просто еще один пример, загрузка неопубликованных файлов отлично работает локально, но не с DataflowRunner. Ошибка — «недопустимое имя таблицы», но Dataflow возвращает в основном случайные сообщения об ошибках, поэтому я не уверен, в чем проблема именно в этой точке, по крайней мере, я уверен, что настоящая проблема не в имени таблицы. person Thijs; 16.01.2020

    Я постараюсь исправить это, обновлю свой вопрос, и если у меня заработает неопубликованная загрузка, я приму ваш ответ. Я беру схему из таблицы BQ, может это вызывает какие-то проблемы. person Thijs; 16.01.2020

    Работает ли тогда при добавлении beam.Map(lambda x: x)? Если это так, даже если кажется, что он ничего не делает, он может исправить тип, необходимый в качестве входных данных для pvalue.AsList(). Если ошибка не исчезнет, ​​можете ли вы добавить полный пример, включая пример схемы, хранящейся в BigQuery, и трассировку стека ошибок? В противном случае, мне кажется, потребуется много догадок. person Thijs; 18.01.2020

    Да, лямбда работает, поэтому проблема может быть в типе данных. «Материализуется» ли BigQuery.read nog при запуске? person Thijs; 21.01.2020

Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: