googleapis / nodejs-pubsub

Node.js client for Google Cloud Pub/Sub: Ingest event streams from anywhere, at any scale, for simple, reliable, real-time stream analytics.
https://cloud.google.com/pubsub/
Apache License 2.0
516 stars 227 forks source link

Pubsub emits more messages then allowed by maxMessages and allowExcessMessages after reaching maxExtension period #1213

Open QuestionAndAnswer opened 3 years ago

QuestionAndAnswer commented 3 years ago

Environment details

Looking into the code, and issue details this also should be relevant for 2.9.0.

Steps to reproduce

You would need 1 topic and 1 subscription of this topic. Bellow is a subscription parameters.

image

  1. Create a server with the following content and run it.

const pubsub = require("@google-cloud/pubsub");

const config = {
    subName: process.env["SUB_NAME"] || "libs-test-pubsub",
};

console.log(config);

const ps = new pubsub.PubSub();
const subscription = ps.subscription(config.subName, {
    flowControl: {
        maxMessages: 2,
        allowExcessMessages: false,
        maxExtension: 5,
    },
    streamingOptions: {
        maxStreams: 1
    },
});

let rcvCounter = 0;
let prcCounter = 0;

function printCounters() {
    process.stdout.clearLine(0)
    process.stdout.write(`rcv: ${rcvCounter}\tprc: ${prcCounter}\r`);
}

const onError = err => {
    console.error("Subscription error:", err);
    process.exit(-1);
};

const onMessage = msg => {
    rcvCounter++;
    printCounters();

    setTimeout(() => {
        msg.ack();
        prcCounter++;
        printCounters();
    }, 2000);
}

subscription.on("error", onError);
subscription.on("message", onMessage);
  1. Now push into the topic 100 messages in a batch, so pubsub subscription queue should acquire 100+- messages in a short amount of time.

You can use this script for that.

const pubsub = require("@google-cloud/pubsub");

const config = {
    topicName: process.env["TOPIC_NAME"] || "libs-test-pubsub",
};

console.log(config);

const ps = new pubsub.PubSub();
const topic = ps.topic(config.topicName);

const messages = [];

for (let i = 0; i < 100; i++) {
    const p = topic.publish(Buffer.from(""), { attr1: "123" })
        .catch(err => {
            console.log(err);
        });

    messages.push(p);
}

Promise.all(messages);
  1. Now, you should see something like this (see the video bellow as well)

First, you would rcv is 2 and prc is 0. Expected, as we allowed only 2 messages being processed (passed into message callback) by flow control. Then, prc becoming 2 (as 2 second timeout reached and we acked the message) and rcv is 4. Such stepped processing and updates should happen each 2 seconds. But, after some time, somehere in the range of maxExtension period deadline (when pubsub starting either extending messages periods or removing them from internal queue), rcv counter jumps up to 100, while prc is still 4, 6 or 8 depends when it happen, but the number is always small and smaller then 20, even 15.

https://user-images.githubusercontent.com/6842709/108252911-3b485400-716a-11eb-80ba-a0f19a82d901.mp4

This leads to falling assumption that, for example k8s pod which processing the messages and have resources limits by CPU or MEM might be evicted because it would try to process all the messages and would reach resources limits (would be throttled by CPU or Killed because of memory limits).

Now, where it is happeing. I've added logs into library, so I definitely see that it following though _extendDeadlines in lease-manager.ts.

image

Through else path in for loop, and calling this.remove(message);

image

And in remove it starting constantly falling into this.pending > 0 condition and dispensing current message (which actually already expired and should be redelivered by pubsub). It's obvious here, that such message should not be dispensed to the client code.

I'm not sure how to fix this in pubsub lib code, either as I don't think that there is a proper workaround for the client code (ours, as we are relying that flow control would be met).

feywind commented 3 years ago

@QuestionAndAnswer Thanks for the super detailed report. I'll have to go over this in more detail, but I'll mark it as a bug for now.

I like your editor's comment on the method. 😂 You must be kidding

QuestionAndAnswer commented 3 years ago

@feywind so, how is it going? Did you have a chance to look into this issue?

procedurallygenerated commented 3 years ago

@feywind we are running into the exact same issue in production.

Node 14 @google-cloud/pubsub version: 2.10.0

Running it in a similar env, docker container inside k8s.

Any progress here?

feywind commented 2 years ago

Quick (sadly content-less) update - no changes here yet, but I've been on a bit of a feature development rotation and hope to get back to these issues.

feywind commented 2 years ago

I stared at this for a while today, and I'm not as familiar with this bit of the library, but I don't know why we'd want to dispense messages from the pending queue just because another message was removed. It feels like there was a paradigm shift here that wasn't taken into account, but I'm going to ask some others.

feywind commented 1 year ago

Linked to the meta-issue about transport problems: b/242894947

michal-zap commented 1 year ago

Is there any way to bypass the problem? Our cluster is running out of memory and options regarding flowController don't seem to work. Maybe is there a way to stop subscribers to listen an enormous amount of messages pulled?

jcgomezcorrea commented 1 year ago

We have the same problem with @google-cloud/pubsub version: 3.4.1 . Any updates or workaround for this issue?

alexby commented 8 months ago

For those who were interested in a workaround:

subscription.setOptions({ flowControl: { maxMessages: 1, allowExcessMessages: false } })

(It could work without allowExcessMessages, but I didn't test it without this option).

The point is that you need to set it on every connection to the broker. It looks like the setting is ignored when you use it in the subscription.create(). So every time you create a topic on initialization and then listening to the queue, or you're subscribing to the topic that already exists - set it to this specific subscription/connection.

I've tested it on node:16.13-alpine + @google-cloud/pubsub@3.4.1 and on the latest @google-cloud/pubsub@4.0.6