Я обнаружил странное поведение при получении сообщения от GCP Pub / Sub. Следующие коды показывают, как я регистрирую подписки с помощью клиента pubsub </ a. >
gcp.go
package gcp
import (
"context"
"path"
"runtime"
"google.golang.org/api/option"
"cloud.google.com/go/pubsub"
)
// PubsubClient is the GCP pubsub service client.
var PubsubClient *pubsub.Client
// Initialize initializes GCP client service using the environment.
func Initialize(env, projectName string) error {
var err error
ctx := context.Background()
credentialOpt := option.WithCredentialsFile(getFilePathByEnv(env))
PubsubClient, err = pubsub.NewClient(ctx, projectName, credentialOpt)
return err
}
// GetTopic returns the specified topic in GCP pub/sub service and create it if it not exist.
func GetTopic(topicName string) (*pubsub.Topic, error) {
topic := PubsubClient.Topic(topicName)
ctx := context.Background()
isTopicExist, err := topic.Exists(ctx)
if err != nil {
return topic, err
}
if !isTopicExist {
ctx = context.Background()
topic, err = PubsubClient.CreateTopic(ctx, topicName)
}
return topic, err
}
// GetSubscription returns the specified subscription in GCP pub/sub service and creates it if it not exist.
func GetSubscription(subName string, topic *pubsub.Topic) (*pubsub.Subscription, error) {
sub := PubsubClient.Subscription(subName)
ctx := context.Background()
isSubExist, err := sub.Exists(ctx)
if err != nil {
return sub, err
}
if !isSubExist {
ctx = context.Background()
sub, err = PubsubClient.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{Topic: topic})
}
return sub, err
}
func getFilePathByEnv(env string) string {
_, filename, _, _ := runtime.Caller(1)
switch env {
case "local":
return path.Join(path.Dir(filename), "local.json")
case "development":
return path.Join(path.Dir(filename), "development.json")
case "staging":
return path.Join(path.Dir(filename), "staging.json")
case "production":
return path.Join(path.Dir(filename), "production.json")
default:
return path.Join(path.Dir(filename), "local.json")
}
}
main.go
package main
import (
"context"
"fmt"
"log"
"net/http"
"runtime"
"runtime/debug"
"runtime/pprof"
"time"
"rpriambudi/pubsub-receiver/gcp"
"cloud.google.com/go/pubsub"
"github.com/go-chi/chi"
)
func main() {
log.Fatal(http.ListenAndServe(":4001", Route()))
}
func Route() *chi.Mux {
InitializeSubscription()
chiRoute := chi.NewRouter()
chiRoute.Route("/api", func(r chi.Router) {
r.Get("/_count", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Number of goroutines: %v", runtime.NumGoroutine())
})
r.Get("/_stack", getStackTraceHandler)
})
return chiRoute
}
func InitializeSubscription() {
gcp.Initialize("local", "fifth-bonbon-277102")
go pubsubHandler("test-topic-1", "test-topic-1-subs")
go pubsubHandler("test-topic-2", "test-topic-2-subs")
go pubsubHandler("test-topic-3", "test-topic-3-subs")
// ....
return
}
func getStackTraceHandler(w http.ResponseWriter, r *http.Request) {
stack := debug.Stack()
w.Write(stack)
pprof.Lookup("goroutine").WriteTo(w, 2)
}
func pubsubHandler(topicID string, subscriptionID string) {
topic, err := gcp.GetTopic(topicID)
fmt.Println("topic: ", topic)
if err != nil {
fmt.Println("Failed get topic: ", err)
return
}
sub, err := gcp.GetSubscription(subscriptionID, topic)
fmt.Println("subscription: ", sub)
if err != nil {
fmt.Println("Get subscription err: ", err)
return
}
err = sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) {
messageHandler(subscriptionID, ctx, msg)
})
if err != nil {
fmt.Println("receive error: ", err)
}
}
func messageHandler(subscriptionID string, ctx context.Context, msg *pubsub.Message) {
defer func() {
if r := recover(); r != nil {
fmt.Println("recovered from panic.")
msg.Ack()
}
}()
fmt.Println("message of subscription: ", subscriptionID)
fmt.Println("Message ID: ", string(msg.ID))
fmt.Println("Message received: ", string(msg.Data))
msg.Ack()
time.Sleep(10 * time.Second)
}
Он отлично работает, когда у меня всего несколько pubsubHandler
внутри InitializeSubscription
. Но когда я добавил еще pubsubHandler
в функцию инициализации (примерно 10 или более обработчиков), все стало интересно. Подтверждение никогда не достигает сервера pubsub, поэтому сообщение просто не подтверждается (я проверил AcknowledgeRequest
в обозревателе метрик, и никаких запросов подтверждения не поступало). Таким образом, сообщение продолжает возвращаться к подписчику. Кроме того, когда я перезапускаю приложение, иногда оно не получает никаких сообщений, ни новых, ни неподтвержденных.
Кажется, я нашел обходной путь, установив NumGoroutines
на 1
для каждого объекта подписки в функции pubsubHandler
.
func pubsubHandler(topicID string, subscriptionID string) {
....
sub, err := gcp.GetSubscription(subscriptionID, topic)
....
sub.ReceiverSettings.NumGoroutines = 1
err = sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) {
messageHandler(subscriptionID, ctx, msg)
})
....
}
Мой вопрос: это запланированное поведение? Какая основная причина может привести к такому неожиданному поведению? Или мои реализации просто неверны, чтобы достичь желаемых результатов? (множественная подписка внутри одного приложения). Или есть какие-то передовые методы, которым следует следовать при создании обработчика подписки?
Насколько я понимаю, функция Receive
из pubsub.Subscription
изначально является блокирующим кодом. Следовательно, когда я пытался запустить его внутри горутин, это могло привести к неожиданным побочным эффектам, особенно если мы не ограничиваем количество горутин, которые могут обрабатывать сообщения. Верны ли мои рассуждения?
Спасибо за ответы и хорошего дня!
Изменить 1: обновление примера до полного кода, поскольку клиент pubsub ранее не импортировался напрямую в main.go.
Я считаю, что проблема может заключаться в скорости обработки сообщений (в настоящее время 10 секунд на сообщение). Если вы получаете слишком много сообщений одновременно, ваш клиент может быть перегружен, что приведет к накоплению накопившихся сообщений.
Я рекомендую поиграть с настройками управления потоком и увеличивая
ReceiveSettings.NumGoroutines
до значения выше значения по умолчанию 10. Если у вас высокая скорость публикации, вы также можете увеличить MaxOutstandingMessages или полностью отключить ограничение, установив его на -1. Это говорит клиенту, что нужно удерживать больше сообщений одновременно, предел, который разделяется наReceive
вызов.Спасибо за ваш ответ. Поскольку я отправил подтверждение до
time.Sleep
, разве скорость обработки одного сообщения не должна быть меньше 10 секунд? А также, если я увеличуReceiveSettings.NumGoroutines
, это означает, что количество горутин, которые будут обрабатывать каждое сообщение, увеличится, или это было максимальное количество порожденных горутин, которые будут обрабатывать любые входящие сообщение? — person kurosawa93; 30.06.2020Обычно вы хотите выполнить основную часть обработки до
ack
ing. Поскольку у вас естьtime.Sleep
в вашей функции дескриптора, сообщение все равно будет удерживаться до тех пор, пока функция не вернется. УвеличениеNumGoroutines
увеличивает количество горутин, порождаемых для извлечения сообщений. Если вы хотите, чтобы больше горутин обрабатывали сообщения, увеличьтеMaxOutstandingMessages
. — person kurosawa93; 30.06.2020