googleapis / nodejs-pubsub

Node.js client for Google Cloud Pub/Sub: Ingest event streams from anywhere, at any scale, for simple, reliable, real-time stream analytics.
https://cloud.google.com/pubsub/
Apache License 2.0
518 stars 230 forks source link

Messages not emitted in time for processing, resulting in failed ack deadline extensions and redeliveries #1848

Open vikmovcan opened 10 months ago

vikmovcan commented 10 months ago

Description

Hello, I've been experiencing message redeliveries when using a subscription with exactly once delivery enabled where batches of messages are published. It is not clear whether this is a client or product issue, therefore I first wanted to make sure there is no misunderstanding/misconfiguration on my side.

Environment details

Steps to reproduce

  1. Create a pull subscription with exactly once delivery enabled and ack deadline of 60 seconds e.g.
   const projectId = "some-project-id";
   const topicNameOrId = "test-topic";
   const pubsub = new PubSub({ projectId });
   const topic = pubsub.topic(topicNameOrId);
   const subscriptionName = "eod-with-batching";

   await topic.createSubscription(subscriptionName, {
     enableExactlyOnceDelivery: true,
     ackDeadlineSeconds: 60,
   });
  1. Set up a subscription with the following configuration (I would like to manually extend the lease)
  const maxMessages = 300; // a couple of hundred should suffice
  const subscription = topic.subscription(subscriptionName, {
    flowControl: {
      allowExcessMessages: false,
      maxMessages: maxMessages,
      maxExtensionMinutes: 0,
    },
    maxAckDeadline: Duration.from({
      seconds: 90,
    }),
    minAckDeadline: Duration.from({
      seconds: 90,
    }),
    streamingOptions: {
      maxStreams: 2, // issue more prominent with more than one stream
    },
  });
  1. Add a message handler which adds an interval which extends the deadline of each message received every 60 seconds by 120 seconds and awaits the completion of a long-running async action.
  const waitFor = (ms: number) =>
    new Promise((resolve) => {
      setTimeout(() => {
        resolve(0);
      }, ms);
    });

  const messageMap: Record<string, Message> = {};

  subscription.on("message", async (message: Message) => {
    if (messageMap[message.id]) {
       console.log(`message ${message.id} unexpectedly redelivered. delivery attempt:${message.deliveryAttempt}`);
    }

    messageMap[message.id] = message;

    const interval = setInterval(async () => {
      try {
        await message.modAckWithResponse(120);
      } catch (e) {
        console.log("error", e);
      }
    }, 1000 * 60);

    await waitFor(1000 * 60 * 1200);
  });
  1. Create and publish messages (note: redeliveries occur with or without batching)
  const messagesToPublish: string[] = [];

  // ensure some messages are in the backlog
  const numberOfBatchedMessages = maxMessages * 3;

  for (let i = 0; i < numberOfBatchedMessages; i++) {
    messagesToPublish.push(
      `Test message ${i + 1} at ${new Date().toISOString()}`
    );
  }

  const publishOptions: PublishOptions = {
    batching: {
      maxMessages: 5,
      maxMilliseconds: 100,
    },
  };

  const batchPublisher = pubsub.topic(topicNameOrId, publishOptions);

  const promises = messagesToPublish.map((m) => {
    return batchPublisher.publishMessage({
      data: Buffer.from(m),
    });
  });

    const publishedMessages = await Promise.all(promises);
    console.log(`published ${publishedMessages.length} messages`);
  1. Wait for redeliveries starting to happen within minutes of running the example above. Note that the issue is intermittent.

As I understand the documentation here for EOD (https://cloud.google.com/pubsub/docs/exactly-once-delivery), as long as 1) message is being processed 2) message deadline keeps being extended 3) message is not negatively acked the message should not get redelivered (but it is).

Is there something I am missing/doing incorrectly or this is expected behaviour?

vikmovcan commented 9 months ago

After investigating this further, I narrowed the issue down (will update the title to match) Some messages received by the library internally do not get emitted immediately for processing (triggering message event callback). This was identified when comparing the received timestamps on the messages against the time they got into the callback for message event.

The issue can be reproduced given the following:

a) a topic with a few hundred messages b) a subscription using streaming pull e.g.

  const subscription = topic.subscription(subscriptionName, {
    flowControl: {
      allowExcessMessages: false,
      maxMessages: tripleDigitMaxMessages,
      maxExtensionMinutes: 0,
    },
    maxAckDeadline: Duration.from({
      seconds: 90,
    }),
    minAckDeadline: Duration.from({
      seconds: 90,
    }),
    streamingOptions: {
      maxStreams: 2, // any value greater than 1
    },
  });

c) message processing can take minutes

While this makes sense if you consider the maxMessages configuration value, it is not clear why a) the maxMessages value gets exceeded (is this value per stream?), (not immediately, but over a period of time, the number of messages being processed reaches maxMessages value times the number of streams) b) the "excess" messages are sent only after a (sometimes minute-long) delay c) the library (internally) obtains and holds onto the messages which may potentially exceed their ack deadline soon after being emitted

If I want to process, for example, 500 messages at a time max with the above subscription config and manually extend each message’s ack deadline upon receipt without bumping into the problem described above, what options are there ? One option I see is restricting the number of streams to 1, thus eliminating the possibility of "excess" messages not being emitted or their ack deadlines not extended.

hongalex commented 5 months ago

Apologies for the slow response on this issue. First, are you still encountering this issue?

If so, regarding manual lease management, is there a reason you prefer to do that versus the library's lease management?

vikmovcan commented 5 months ago

Apologies for the slow response on this issue. First, are you still encountering this issue?

If so, regarding manual lease management, is there a reason you prefer to do that versus the library's lease management?

@hongalex, thanks for responding. yes, the issue persists. the reason for this approach is due to a need to avoid hitting API limitations imposed by a 3rd party e.g. limited number of requests per minute and lack of concurrency support. Additionally, to not overwhelm each consumer, it made sense to 'buffer' a limited number of messages at a time and keep pushing back their ack deadline until processed (each message would get processed in a separate queue later). Unfortunately, cloud tasks was not a good fit for our use case instead of pubsub.

hongalex commented 4 months ago

Also apologies for missing a few of your questions last time:

While this makes sense if you consider the maxMessages configuration value, it is not clear why a) the maxMessages value gets exceeded (is this value per stream?), (not immediately, but over a period of time, the number of messages being processed reaches maxMessages value times the number of streams) b) the "excess" messages are sent only after a (sometimes minute-long) delay c) the library (internally) obtains and holds onto the messages which may potentially exceed their ack deadline soon after being emitted

This value is indeed set per stream. This is something we're interested in fixing, since we want to make maxMessages / numStreams = the server side flow control amount we allow per stream instead. However, this is technically a breaking change so this might need to wait until we do a major version bump. However, this is also why we still have client side flow control enabled. This also explains why there is a delay between a message being received and when it is ready for processing.

If I want to process, for example, 500 messages at a time max with the above subscription config and manually extend each message’s ack deadline upon receipt without bumping into the problem described above, what options are there ? One option I see is restricting the number of streams to 1, thus eliminating the possibility of "excess" messages not being emitted or their ack deadlines not extended. need to avoid hitting API limitations

This can be done by setting "minAckDeadline" setting passed in as SubscriberOptions. I see you're already doing this. To achieve the behavior of extending the message's ack deadline by 120s, I recommend just setting both minAckDeadline and maxAckDeadline to 120 and don't disable the total timeout. This should actually decrease the number of API calls you make, since modack calls can be batched instead.

hongalex commented 3 months ago

I wanted to clarify that message.modifyAckDeadline is not a public method that should be called. This is a private method that should be called in the library only. Please refer to my previous comment about how to minimize API calls with the client library.