googleapis / nodejs-pubsub

Node.js client for Google Cloud Pub/Sub: Ingest event streams from anywhere, at any scale, for simple, reliable, real-time stream analytics.
https://cloud.google.com/pubsub/
Apache License 2.0
518 stars 228 forks source link

Wait for previous message ack with same orderingKey before delivering next message #1796

Closed georeith closed 1 year ago

georeith commented 1 year ago

Hi, I'm trying to perform asynchronous operations on a pull subscription with ordered messages.

The issue I'm running into is that order is only for delivery, that is I can receive 10 messages in order, but my asynchronous operation is not guaranteed to finalise on these 10 messages in order.

Ideally I would like to only receive one message per ordering key at a time, waiting until the previous message for that ordering key is acked before delivering the next, which is how I initially presumed this worked when reading the documentation.

Is what I'm trying to do possible currently?

Thanks for your help

georeith commented 1 year ago

I've managed to work around this somewhat at the software level, although it would be much neater if I could configure this at the subscription level as this only works if there is one subscriber reading this data.

Multiple machines subscribed to it may perform the work out of order still.

// GCP ordered messages are only guaranteed to arrive in order, they do not wait for the previous
// message to have acked. However we want to wait for the previous status to have completed on the
// document before starting the next. So here we store the last promise for a given document if it
// exists.
//
// This also ensures that if a previous status fails to complete all the statuses after it will nack
// too and go back to the pool.
const statusUpdatePromises = new Map<string, Promise<void>>();
async function subscribeToStatusUpdates() {
  const topicName: string = config.get('statusTopic');
  const subscriptionName: string = config.get('statusSubscriptionName');
  const subscription = await withSubscription(topicName, subscriptionName, {
    ackDeadlineSeconds: 180,
    enableMessageOrdering: true,
  });

  subscription.on('message', (message: Message) => {
    const previousPromise = message.orderingKey
      ? statusUpdatePromises.get(message.orderingKey)
      : undefined;
    let promise: Promise<void>;
    const handleMessage = async () => {
      try {
        await previousPromise;
        const data = JSON.parse(message.data.toString());
        const { status, mediaInfo, transactionId, suppliedAuthHeader } = data;
        await _setMediaStatus(
          status,
          mediaInfo,
          suppliedAuthHeader,
          transactionId,
        );
        message.ack();
      } catch (err) {
        message.nack();
        throw err;
      } finally {
        // if this is the last message for this document, remove the document from the map
        if (
          message.orderingKey &&
          statusUpdatePromises.get(message.orderingKey) === promise
        ) {
          statusUpdatePromises.delete(message.orderingKey);
        }
      }
    };
    promise = handleMessage();
    if (message.orderingKey) {
      statusUpdatePromises.set(message.orderingKey, promise);
    }
  });
}
feywind commented 1 year ago

@georeith This is unfortunately a "works as intended", because the guarantee in the library is that you'll receive all of the messages in order within one subscriber RPC callback (meaning, the code will not return to the event loop until it's called the user callback for each message in order). But Node makes that guarantee somewhat less useful because of the asynchrony.

In similar cases, we've been recommending using a queue on the user side, just pushing all of the messages for that key into the queue and then executing their callbacks one at a time. Which kind of looks like what you've got there. That has the added bonus of giving you finer control over how you want to gather responses, wait for an ack() call to proceed, etc.

I'd like to add some utility classes like this, because it seems like a semi-common use case, if you're doing async with subscriber callbacks (which is basically... like... always) but it's not currently on the roadmap.

feywind commented 1 year ago

Closing out, feel free to add more comments and we'll take a look at them if that utility library happens later.