ThreeDotsLabs / watermill

Building event-driven applications the easy way in Go.
https://watermill.io
MIT License
6.87k stars 365 forks source link

MaxOutstandingMessages doesn't seem to be working #412

Open ATahhan opened 6 months ago

ATahhan commented 6 months ago

Hi, I'm trying to add flow control on my subscription with MaxOutstandingMessages setting. I'm setting it to 1, with streaming pull request, but it seems this is not working. I'm always getting all messages delivered at the same time regardless of the number of messages.

image

Here is the configuration I'm using when creating a subscriber:

import (
    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud"
    "github.com/ThreeDotsLabs/watermill/message"
    "cloud.google.com/go/pubsub"
         ...
)

func CreateSubscriber(l watermill.LoggerAdapter, cfg config.PubSubConfig, opts SubscriberOptions) (message.Subscriber, error) {
    return googlecloud.NewSubscriber(
        googlecloud.SubscriberConfig{
            GenerateSubscriptionName: opts.SubscriptionNameFn,
            SubscriptionConfig: pubsub.SubscriptionConfig{
                RetainAckedMessages: false,
            },
            ReceiveSettings: pubsub.ReceiveSettings{
                MaxOutstandingMessages: 1,
            },
            ProjectID:                 cfg.ProjectID,
            TopicProjectID:            cfg.TopicProjectID,
            ClientOptions:             setCredentials(cfg),
            DoNotCreateTopicIfMissing: true,
        },
        l,
    )
}