googleapis / nodejs-pubsub

Node.js client for Google Cloud Pub/Sub: Ingest event streams from anywhere, at any scale, for simple, reliable, real-time stream analytics.
https://cloud.google.com/pubsub/
Apache License 2.0
516 stars 227 forks source link

calling setOptions on subscriber does not take effect #1879

Open dermasmid opened 5 months ago

dermasmid commented 5 months ago

if changing flowControl settings after calling .on they dont not get into effect

const subscription = pubSubClient.subscription(subscriptionNameOrId);
const messageHandler = message => {

  message.ack();
};

subscription.on('message', messageHandler);

// does not have any effect on flow control
subscription.setOptions({
flowControl: {
          maxMessages: 1,
          allowExcessMessages: true,
 }
})
swarmiakimmo commented 4 months ago

I was able to confirm the bug by testing the client with a local script.

feywind commented 1 month ago

Looks like it's not passing it down to the streaming pull subscriber stream. I'll verify what we want this to do, behaviour-wise.

theabhinavdas commented 1 week ago

Have the same issue. maxMessages doesn't work for me when using streaming pulls. Any alternative till this is fixed or would we have to do unary pulls?

theabhinavdas commented 1 week ago

@feywind What's also quite peculiar is that when I use options with subscriptions, it seems to square the number of maxMessages specified. For me, if I set this to 1, it does in fact seem to pull only one message (as noticed in my handler). When I set maxMessages to 2, it seems to pull 4 messages. When I set maxMessages to 3, it seems to pull 9 messages!

const subscription = pubSubClient.subscription(subscriptionNameOrId, {
    flowControl: {
      maxMessages: 1
    }
});
theabhinavdas commented 1 week ago

@dermasmid from this other issue, it seems setting allowExcessMessages: false in your options fixes the issue like so:

const subscription = pubSubClient.subscription(subscriptionNameOrId, {
    flowControl: {
      maxMessages: 1,
      allowExcessMessages: false
    }
});

It may additionally have something to do with you instantly ack-ing in your messageHandler where you'd expect StreamingPull to pull the next message?