Ошибка потока данных при предоставлении темы pubsub в качестве документа

У меня проблема, когда я создаю шаблон потока данных с помощью Python, и этот шаблон должен принимать 3 аргумента, определяемых пользователем, при запуске нового задания потока данных.

Проблема возникает с beam.io.gcp.pubsub.WriteToPubSub (), где я пытаюсь указать имя темы из ValueProvider, которое, согласно документации Google, требуется при создании шаблона:

https://cloud.google.com/dataflow/docs/guides/templates/creating-templates

Источник beam.io.ReadFromPubSub () успешно принимает поставщика значений для значения подписки, как и преобразование beam.io.gcp.bigquery.WriteToBigQuery ().

Очевидно, что совместное использование моего кода поможет :)

Сначала обычный импорт:

from __future__ import absolute_import

import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.value_provider import StaticValueProvider
import json
import time
from datetime import datetime
import dateutil.parser
import sys

Затем мой определенный класс для входных аргументов, предоставленных шаблону:

class userOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--subscription',  
            default='projects/MYPROJECT/subscrpiptions/subscription', 
            help='PubSub subscription to listen on')
        parser.add_value_provider_argument(
            '--bqtable', 
            default='dataset.table', 
            help='Big Query Table Name in the format project:dataset.table') 
        parser.add_value_provider_argument(
            '--topic',  
            default='projects/MYPROJECT/subscrpiptions/subscription', 
            help='PubSub topic to write failed messages to')

И сам конвейер определяется как (обратите внимание, что я пропустил функции карты)

def run():

    user_options = PipelineOptions().view_as(userOptions)

    pipeline_options = PipelineOptions()
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming = True

    with beam.Pipeline(options=pipeline_options) as p:

        records = ( 
        p  | 'Read from PubSub' 
            >> beam.io.ReadFromPubSub(
                subscription=str(user_options.subscription),
                id_label='Message_ID',
                with_attributes=True)
        | 'Format Message' >> 
            beam.Map(format_message_element)
        | 'Transform null records to empty list' >>
            beam.Map(transform_null_records)
        | 'Transform Dates' >>
            beam.Map(format_dates)
        | 'Write to Big Query' >>
            beam.io.gcp.bigquery.WriteToBigQuery(
                table=user_options.bqtable,
                create_disposition='CREATE_IF_NEEDED',
                write_disposition='WRITE_APPEND',
                insert_retry_strategy='RETRY_NEVER'
            )
        | 'Write Failures to Pub Sub' >>
            beam.io.gcp.pubsub.WriteToPubSub(user_options.topic)
        ) 

Теперь, когда я пытаюсь сгенерировать шаблон с помощью команды powershell:

python profiles-pipeline.py --project xxxx-xxxxxx-xxxx `
--subscription projects/xxxx-xxxxxx-xxxx/subscriptions/sub-xxxx-xxxxxx-xxxx-dataflow `
--bqtable xxxx-xxxxxx-xxxx:dataset.table `
--topic projects/xxxx-xxxxxx-xxxx/topics/top-xxxx-xxxxxx-xxxx-failures `
--runner DataflowRunner `
--temp_location gs://xxxx-xxxxxx-xxxx/temp/ `
--staging_location gs://xxxx-xxxxxx-xxxx/staging/ `
--template_location gs://xxxx-xxxxxx-xxxx/template

Я получаю такую ​​ошибку:

File "pipeline.py", line 193, in <module>
    run()
  File "pipeline.py", line 183, in run
    beam.io.gcp.pubsub.WriteToPubSub(user_options.topic)
  File "C:\github\pipeline-dataflow-jobs\dataflow\lib\site-packages\apache_beam\io\gcp\pubsub.py", line 292, in __init__
    topic, id_label, with_attributes, timestamp_attribute)
  File "C:\github\pipeline-dataflow-jobs\dataflow\lib\site-packages\apache_beam\io\gcp\pubsub.py", line 430, in __init__
    self.project, self.topic_name = parse_topic(topic)
  File "C:\github\pipeline-dataflow-jobs\dataflow\lib\site-packages\apache_beam\io\gcp\pubsub.py", line 325, in parse_topic
    match = re.match(TOPIC_REGEXP, full_topic)
  File "c:\program files\python37\lib\re.py", line 173, in match
    return _compile(pattern, flags).match(string)
TypeError: expected string or bytes-like object

Я сталкивался с этой ошибкой раньше при попытке использовать beam.io.WriteToBigQuery (), но как только я перешел на beam.io.gcp.bigquery.WriteToBigQuery (), ошибка была устранена, поскольку это принимает ValueProvider в качестве имени таблицы. Однако для pubsub я не могу найти альтернативу для записи, которая работает.

См. также:  Укажите connection_factory для create_engine () SQLAlchemy

Любая помощь будет очень признательна.

Понравилась статья? Поделиться с друзьями:
IT Шеф
Комментарии: 2
  1. TylrRssl1

    Я частично решил эту проблему, так как мой конвейер неправильно пытался опубликовать неудачные вставки в Big Query, однако у меня все еще есть проблема, заключающаяся в том, что я не могу передать имя темы pubsub в качестве входного параметра. Однако это работает, если название темы жестко запрограммировано.

    #################################################################
    # Import the libraries required by the pipeline                 #
    #################################################################
    from __future__ import absolute_import
    
    import argparse
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    from apache_beam.options.pipeline_options import StandardOptions
    from apache_beam.options.value_provider import RuntimeValueProvider
    import json
    import time
    from datetime import datetime
    import dateutil.parser
    import sys
    import logging
    
    #################################################################
    # Create a class for the user defined settings provided at job 
    # creation
    #################################################################
    
    class userOptions(PipelineOptions):
        @classmethod
        def _add_argparse_args(cls, parser):
            parser.add_value_provider_argument(
                '--subscription',  
                default='projects/MYPROJECT/subscrpiptions/subscription', 
                help='PubSub subscription to listen on')
            parser.add_value_provider_argument(
                '--bqtable', 
                default='dataset.table', 
                help='Big Query Table Name in the format project:dataset.table') 
            parser.add_value_provider_argument(
                '--topic', 
                default='projects/MYPROJECT/topics/subscription', 
                help='Pubsub topic to write failures to') 
    
    ##############################################################################
    # Format failure message
    ##############################################################################
    def format_failed_message(data):
        try:
            message=json.dumps(data)
        except:
            print("customError in function format_failed_message occured.", sys.exc_info(), "Message contents: ", data)
        return message
    
    #################################################################
    # create a function called run                                  #
    #################################################################
    def run():
    
        ##############################################################
        # Setup the pipeline options with both passed in arguments 
        # and streaming options
        ##############################################################
        user_options = PipelineOptions().view_as(userOptions)
    
        pipeline_options = PipelineOptions()
        pipeline_options.view_as(SetupOptions).save_main_session = True
        pipeline_options.view_as(StandardOptions).streaming = True
    
        ##############################################################
        # Define the pipeline
        ##############################################################
        with beam.Pipeline(options=pipeline_options) as p:
    
            # First we create a PCollection which will contain the messages read from Pubsub
            records = ( 
            p  | 'Read from PubSub' 
                >> beam.io.ReadFromPubSub(
                    subscription=str(user_options.subscription),
                    id_label='Message_ID',
                    with_attributes=True)
            # Transform the message and its attributes to a dict.
            | 'Format Message' >> 
                beam.Map(format_message_element)
            # Transform the empty arrays defined as element:null to element:[].
            | 'Transform null records to empty list' >>
                beam.Map(transform_null_records)
            # Transform the dateCreated and DateModified to a big query compatible timestamp format.
            | 'Transform Dates' >>
                beam.Map(format_dates)
            # Attempt to write the rows to BQ
            | 'Write to Big Query' >>
                beam.io.gcp.bigquery.WriteToBigQuery(
                    table=user_options.bqtable,
                    create_disposition='CREATE_IF_NEEDED',
                    write_disposition='WRITE_APPEND',
                    insert_retry_strategy='RETRY_NEVER'
                )
            )
    
            #For any rows that failed to write to BQ
            failed_data = (records[beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS]
                            #Format the dictionary to a string
                            | 'Format the dictionary as a string for publishing' >>
                                beam.Map(format_failed_message)
                            #Encode the string to utf8 bytes
                            | 'Encode the message' >>
                                beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
                            )
            #Published the failed rows to pubsub
            failed_data | beam.io.gcp.pubsub.WriteToPubSub(topic='projects/xxxx-xxxxx-xxxxxx/topics/top-xxxxx-failures')
            #failed_data | beam.io.gcp.pubsub.WriteToPubSub(topic=user_options.topic)
    
        # As this is a streaming pipeline it will run continuosly till either we 
        # stop the pipeline or it fails.
        result = p.run()
        result.wait_until_finish()
    
    #At the main entry point call the run function
    if __name__ == '__main__':
        #logging.getLogger().setLevel(logging.INFO)
        run()
    
  2. TylrRssl1

    | ‘Encode bytestring’ ›› beam.Map (encode_byte_string) # Я думаю, что эту часть вы уже реализовали | «Написать в pusub» ›› beam.io.WriteToPubSub (output_topic) — У меня работает.

    Пожалуйста, отделите свои ответы от уточняющих вопросов. person TylrRssl1; 26.09.2020

Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: