GCP Pub / sub: использование горутин для запуска нескольких подписчиков в одном приложении

Я обнаружил странное поведение при получении сообщения от GCP Pub / Sub. Следующие коды показывают, как я регистрирую подписки с помощью клиента pubsub </ a. >

gcp.go

package gcp

import (
    "context"
    "path"
    "runtime"

    "google.golang.org/api/option"

    "cloud.google.com/go/pubsub"
)

// PubsubClient is the GCP pubsub service client.
var PubsubClient *pubsub.Client

// Initialize initializes GCP client service using the environment.
func Initialize(env, projectName string) error {
    var err error
    ctx := context.Background()
    credentialOpt := option.WithCredentialsFile(getFilePathByEnv(env))

    PubsubClient, err = pubsub.NewClient(ctx, projectName, credentialOpt)
    return err
}

// GetTopic returns the specified topic in GCP pub/sub service and create it if it not exist.
func GetTopic(topicName string) (*pubsub.Topic, error) {
    topic := PubsubClient.Topic(topicName)
    ctx := context.Background()
    isTopicExist, err := topic.Exists(ctx)

    if err != nil {
        return topic, err
    }

    if !isTopicExist {
        ctx = context.Background()
        topic, err = PubsubClient.CreateTopic(ctx, topicName)
    }

    return topic, err
}

// GetSubscription returns the specified subscription in GCP pub/sub service and creates it if it not exist.
func GetSubscription(subName string, topic *pubsub.Topic) (*pubsub.Subscription, error) {
    sub := PubsubClient.Subscription(subName)
    ctx := context.Background()
    isSubExist, err := sub.Exists(ctx)

    if err != nil {
        return sub, err
    }

    if !isSubExist {
        ctx = context.Background()
        sub, err = PubsubClient.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{Topic: topic})
    }

    return sub, err
}

func getFilePathByEnv(env string) string {
    _, filename, _, _ := runtime.Caller(1)

    switch env {
    case "local":
        return path.Join(path.Dir(filename), "local.json")
    case "development":
        return path.Join(path.Dir(filename), "development.json")
    case "staging":
        return path.Join(path.Dir(filename), "staging.json")
    case "production":
        return path.Join(path.Dir(filename), "production.json")
    default:
        return path.Join(path.Dir(filename), "local.json")
    }
}

main.go

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "runtime"
    "runtime/debug"
    "runtime/pprof"
    "time"

    "rpriambudi/pubsub-receiver/gcp"

    "cloud.google.com/go/pubsub"
    "github.com/go-chi/chi"
)

func main() {
    log.Fatal(http.ListenAndServe(":4001", Route()))
}

func Route() *chi.Mux {
    InitializeSubscription()
    chiRoute := chi.NewRouter()

    chiRoute.Route("/api", func(r chi.Router) {
        r.Get("/_count", func(w http.ResponseWriter, r *http.Request) {
            fmt.Fprintf(w, "Number of goroutines: %v", runtime.NumGoroutine())
        })

        r.Get("/_stack", getStackTraceHandler)
    })

    return chiRoute
}

func InitializeSubscription() {
    gcp.Initialize("local", "fifth-bonbon-277102")

    go pubsubHandler("test-topic-1", "test-topic-1-subs")
    go pubsubHandler("test-topic-2", "test-topic-2-subs")
    go pubsubHandler("test-topic-3", "test-topic-3-subs")
    // ....

    return
}

func getStackTraceHandler(w http.ResponseWriter, r *http.Request) {
    stack := debug.Stack()
    w.Write(stack)

    pprof.Lookup("goroutine").WriteTo(w, 2)
}

func pubsubHandler(topicID string, subscriptionID string) {
    topic, err := gcp.GetTopic(topicID)
    fmt.Println("topic: ", topic)
    if err != nil {
        fmt.Println("Failed get topic: ", err)
        return
    }

    sub, err := gcp.GetSubscription(subscriptionID, topic)
    fmt.Println("subscription: ", sub)
    if err != nil {
        fmt.Println("Get subscription err: ", err)
        return
    }

    err = sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) {
        messageHandler(subscriptionID, ctx, msg)
    })
    if err != nil {
        fmt.Println("receive error: ", err)
    }
}

func messageHandler(subscriptionID string, ctx context.Context, msg *pubsub.Message) {
    defer func() {
        if r := recover(); r != nil {
            fmt.Println("recovered from panic.")
            msg.Ack()
        }
    }()

    fmt.Println("message of subscription: ", subscriptionID)
    fmt.Println("Message ID: ", string(msg.ID))
    fmt.Println("Message received: ", string(msg.Data))

    msg.Ack()
    time.Sleep(10 * time.Second)
}

Он отлично работает, когда у меня всего несколько pubsubHandler внутри InitializeSubscription. Но когда я добавил еще pubsubHandler в функцию инициализации (примерно 10 или более обработчиков), все стало интересно. Подтверждение никогда не достигает сервера pubsub, поэтому сообщение просто не подтверждается (я проверил AcknowledgeRequest в обозревателе метрик, и никаких запросов подтверждения не поступало). Таким образом, сообщение продолжает возвращаться к подписчику. Кроме того, когда я перезапускаю приложение, иногда оно не получает никаких сообщений, ни новых, ни неподтвержденных.

См. также:  Google Cloud Platform: внешний IP-адрес недоступен извне

Кажется, я нашел обходной путь, установив NumGoroutines на 1 для каждого объекта подписки в функции pubsubHandler.

func pubsubHandler(topicID string, subscriptionID string) {
    ....

    sub, err := gcp.GetSubscription(subscriptionID, topic)
    
    ....

    sub.ReceiverSettings.NumGoroutines = 1
    err = sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) {
        messageHandler(subscriptionID, ctx, msg)
    })

    ....
}

Мой вопрос: это запланированное поведение? Какая основная причина может привести к такому неожиданному поведению? Или мои реализации просто неверны, чтобы достичь желаемых результатов? (множественная подписка внутри одного приложения). Или есть какие-то передовые методы, которым следует следовать при создании обработчика подписки?

Насколько я понимаю, функция Receive из pubsub.Subscription изначально является блокирующим кодом. Следовательно, когда я пытался запустить его внутри горутин, это могло привести к неожиданным побочным эффектам, особенно если мы не ограничиваем количество горутин, которые могут обрабатывать сообщения. Верны ли мои рассуждения?

Спасибо за ответы и хорошего дня!

Изменить 1: обновление примера до полного кода, поскольку клиент pubsub ранее не импортировался напрямую в main.go.

Понравилась статья? Поделиться с друзьями:
IT Шеф
Комментарии: 1
  1. kurosawa93

    Я считаю, что проблема может заключаться в скорости обработки сообщений (в настоящее время 10 секунд на сообщение). Если вы получаете слишком много сообщений одновременно, ваш клиент может быть перегружен, что приведет к накоплению накопившихся сообщений.

    Я рекомендую поиграть с настройками управления потоком и увеличивая ReceiveSettings.NumGoroutines до значения выше значения по умолчанию 10. Если у вас высокая скорость публикации, вы также можете увеличить MaxOutstandingMessages или полностью отключить ограничение, установив его на -1. Это говорит клиенту, что нужно удерживать больше сообщений одновременно, предел, который разделяется на Receive вызов.

    Спасибо за ваш ответ. Поскольку я отправил подтверждение до time.Sleep, разве скорость обработки одного сообщения не должна быть меньше 10 секунд? А также, если я увеличу ReceiveSettings.NumGoroutines, это означает, что количество горутин, которые будут обрабатывать каждое сообщение, увеличится, или это было максимальное количество порожденных горутин, которые будут обрабатывать любые входящие сообщение? person kurosawa93; 30.06.2020

    Обычно вы хотите выполнить основную часть обработки до acking. Поскольку у вас есть time.Sleep в вашей функции дескриптора, сообщение все равно будет удерживаться до тех пор, пока функция не вернется. Увеличение NumGoroutines увеличивает количество горутин, порождаемых для извлечения сообщений. Если вы хотите, чтобы больше горутин обрабатывали сообщения, увеличьте MaxOutstandingMessages. person kurosawa93; 30.06.2020

Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: