pozil / pub-sub-api-node-client

A node client for the Salesforce Pub/Sub API
Creative Commons Zero v1.0 Universal
70 stars 37 forks source link

Infinite number of events #12

Closed nicopassaglia closed 9 months ago

nicopassaglia commented 1 year ago

I am trying to pass Number.POSITIVE_INFINITY to the number of requests and it throw the error:

The requested number of events in a fetch request must be greater than zero

How we can have an endless amount of requests? I never want the subscription to end unless I say so.

pozil commented 1 year ago

Hi @nicopassaglia, thanks for reaching out.

Here's a bit of background about why this is happening. The Pub Sub API is built with this limitation, it's not something specific to this client. From what the team told me, this is a rate-limit mechanism that helps prevent client overflow: you only request a certain number of events at a time so that you can process them before accepting other events.

If you want to consume an endless number of events you have to reconnect the client manually for now. I'll leave this feature request open as I may work on an auto-reconnect feature.

brandon-burciaga commented 1 year ago

I am curious how others are solving this challenge.

I was hoping I could permanently subscribe to events without needing a new subscription.

Since I can't, I was thinking I'd run this code on some sort of schedule, causing the client to subscribe and listen to, say, the next 500 events. A few hours later, it runs again, and the connection and subscription are again created with the next N events.

There doesn't seem to be a way to use the client to determine how many requested events have been processed, which would probably help in determining if it's time to re-subscribe.

I am trying to figure out if I can determine if I need to re-subscribe at a specific time, or simply run the re-subscription on a timer/schedule. I don't want to miss any events.

@pozil do you know the upper limit of the number of events we can request?

sartor-n commented 1 year ago

Also interested in this.

I was thinking of leveraging the lifecycle events to re-start the event emitter, or to throw an error that would re-start my pm2 instance.

eventsEmitter.on("end", () => {
    console.log("gRPC stream ended");
  });

Any reccommended workaround which would be cleaner than this?

antkl commented 1 year ago

After processing all the events you can call for instance subscription.write({ numRequested: 10 }); if you you want 10 more events.

soileaud commented 1 year ago

Here's a bit of background about why this is happening. The Pub Sub API is built with this limitation, it's not something specific to this client. From what the team told me, this is a rate-limit mechanism that helps prevent client overflow: you only request a certain number of events at a time so that you can process them before accepting other events.

If you want to consume an endless number of events you have to reconnect the client manually for now. I'll leave this feature request open as I may work on an auto-reconnect feature.

This was something I ran into last year when trying pub-sub right after the GA release, all the examples showed how to get one event for one channel and not how to keep the stream alive (or subscribe to many channels).

From my (albeit limited) understanding of gRPC after research, trial, and error, the rate limit is designed with the ability to request more messages once the number of requested messages has been received - without having to restart the client subscription entirely. The client can request 10, get 10, and request 10 more as @antkl described.

In our app, for simplicity, we have set the numRequested = 1, and then we call subscription.write again after each data message to get more, thus keeping the client connected near infinitely (assuming no error, which is handled on those events).

Perhaps for this library, there could be an optional keepAlive boolean param on the subscribe method that would request more messages after the requested number have been received?

            let recievedCount = 0;
            subscription.on('data', (data) => {
                if (data.events) {
                    const latestReplayId = decodeReplayId(data.latestReplayId);
                    this.#logger.info(
                        `Received ${data.events.length} events, latest replay ID: ${latestReplayId}`
                    );
                    data.events.forEach((event) => {
                        const parsedEvent = parseEvent(schema, event);
                        this.#logger.debug(parsedEvent);
                        eventEmitter.emit('data', parsedEvent);
                        recievedCount++;
                        if(keepAlive && recievedCount === subscribeRequest.numRequested){
                            recievedCount = 0;
                            subscription.write(subscribeRequest);
                        }
                    });
                } else {
                    // If there are no events then every 270 seconds the system will keep publishing the latestReplayId.
                }
            });
pozil commented 1 year ago

I have released v3.0.0 of the library which lets you better track received/requested events and provides a new event that fires when we reach the last event. See the release notes and the updated example in the readme for more details.

BoKKeR commented 10 months ago

The event in the lastevent subscription is not available, I could

this.subscription.on('lastevent', async (event) => {
    console.log({ event });
    console.log(
          `Reached last requested event on channel ${this.subscription.getTopicName()}. Resubscribing to ${NUMBER_OF_EVENTS} events`,
    );
});

The event comes back as undefined. Has anyone else experienced this? It worked before but now it just refuses to pass the event.

pozil commented 10 months ago

@BoKKeR the lastevent event does not hold any data so the event variable in the example above is always undefined. lastevent is just a signal that warns you that the stream has reached the number of requested events and will auto-close soon. Unlike the data event, it doesn't carry Pub Sub API events. I have removed the variable from the lastevent event handler example to avoid any confusion.

BoKKeR commented 10 months ago

@pozil Thank you! that explains it since I took it from the old readme, another thing I ran into is subscription.write being undefined.

I can successfully log subscription, but write is not on the object

private SFClient = new PubSubApiClient(logger);

await this.SFClient.connect();
if (replayLock) {
  logger.info(`Continuing pubSub events from replayId:${replayLock}`);
  this.subscription = await this.SFClient.subscribeFromReplayId(
    constants.pubSubTopic,
    GLOBAL_STATE.NUMBER_OF_EVENTS,
    replayLock,
  );
} else {
  this.subscription = await this.SFClient.subscribe(constants.pubSubTopic, GLOBAL_STATE.NUMBER_OF_EVENTS);
}

// Handle last requested event
this.subscription.on('lastevent', async () => {
  console.log(
    `Reached last requested event on channel ${this.subscription.getTopicName()}. Resubscribing to ${
      GLOBAL_STATE.NUMBER_OF_EVENTS
    } events`,
  );
  try {
    console.log(this.subscription);

    // Re-Subscribe to more incoming events
    this.subscription.write({ numRequested: GLOBAL_STATE.NUMBER_OF_EVENTS });
  } catch (error) {
    console.log(error);
  }
});

image

Any idea what is going on here? I am so sure this used to be working (as opposed to the lastElement)

pozil commented 10 months ago

@BoKKeR this is expected as the low-level gRPC write operation is not exposed publicly in this client. For reference, the return type of the subscribe operations is a PubSubEventEmitter.

All you need to do to request additional events is to re-subscribe like you did earlier:

this.subscription = await this.SFClient.subscribe(constants.pubSubTopic, GLOBAL_STATE.NUMBER_OF_EVENTS);

Since the event schema is already cached when you did the initial subscription, this is in fact the same as performing a gRPC write operation like you were attempting to do.

BoKKeR commented 10 months ago

Thank you that has worked perfectly!

if0s commented 9 months ago

@pozil I have some trouble with this approach: If you already have several messages in queue and start subscription with subscribeFromReplayId to get all messages, you also need to track replayId from last data event (or latestReplayId from keepalive event) and recreate subscription using subscribeFromReplayId with latest replayId.

This method create new subscription each time, but I think much better to continue using same subscription and just request more records on it (like @antkl shown), if you modify this part it should do the thing:

                        if (
                            eventEmitter.getReceivedEventCount() ===
                            eventEmitter.getRequestedEventCount()
                        ) {
                            if (infinitySubscription) { // infinitySubscription - parameter or option
                                subscription.write({
                                    topicName: subscribeRequest.topicName,
                                    numRequested: subscribeRequest.numRequested
                                }); // request new records using same subscription
                                eventEmitter.setReceivedEventCount(0); // add setReceivedEventCount method to PubSubEventEmitter class
                            } else {
                                eventEmitter.emit('lastevent');
                            }
                        }

This method will also work with subscribeFromReplayId - you don't need track and provide replayId if you're requesting more records on the same subscription (you need it only for first iteration)

P/S probably instead of comparing received and requested events count, we can track here data.pendingNumRequested from data event, if it's empty/undefined - this is last event requested, and now we can request more.

pozil commented 9 months ago

Thanks @if0s. I realize that my approach doesn't cover this use case.

Unfortunately, your suggestion will not quite work either because you need to pass the last replay ID in the subscription.write call to resume the subscription from some point in the past and not jump directly to new events. I'll get back to the drawing board and work on this.

if0s commented 9 months ago

@pozil I've did several tests with subscription.write - and it doesn't jump to new events - it continues to take them in batches (using provided numRequested) in right order one by one, until finish queue and will wait for new one

For example, you have 10 records in queue and start subscription with replayId from the first Salesforce event fetching 3 records (numRequested = 3):

 // ...
 subscription.write({
     topicName: '/event/test__e'
     numRequested,
     replayPreset: 2,
     replayId: encodeReplayId(replayId)
 })
 // ...

then add to data listener request for more records on this subscription (without replayId!)

    // ...
    subscription.on('data', (data) => {
        // ...
        if (data.events) {
            data.events.forEach((event) => {
                // ...
                eventEmitter.emit('data', parsedEvent);
                // ...
            });
            if (!data.pendingNumRequested) {
                this.#logger.info(`requesting ${numRequested} more records`);
                this.subscription.write({
                    topicName: '/event/test__e',
                    numRequested
                });
            } else {
                this.#logger.info(`waiting for ${data.pendingNumRequested} more records`);
            }
        }
    });

At this case data event for eventEmitter will trigger 10 times (all records in queue in right order - from old to new) and subscription continue waiting for more 2 records - so if your code continues running, you be able to proceed with old records and infinite number of new Salesforce events.

Main part here - as soon as you get last requested amount of events, you'll request next batch without need to store replayId, because it's the same subscription. This approach also works good if you initialize subscription without replayId - you will start getting the infinite number of events starting from execution time.

Or you can save replayId from last event (or latestReplayId from keepalive), close connection (I think first keepalive message can be used to be sure that there are no more records in queue instead of lastevent) and on next execution you can use it to get all records that were created while you were offline - according to the documentation Salesforce store events for up to 3 days

And of course here lastevent must be revised for this case...

soileaud commented 9 months ago

I can confirm that in our org's implementation, calling subscription.write resumes the stream, even if you re-use the initial subscribeRequest that has the old replayId.

From my research this seems to be the intention of grpc so the client can control the rate of message flow - as long as you submit it within 60s of the last message before the stream ends.

If the client requests more events before the server finishes the last requested amount, the server appends the new amount to the current batch of events it still needs to deliver.

In this case, to keep the subscription stream open, the client must send a new FetchRequest within 60 seconds from when the last FetchResponse was sent. If the client fails to do so, the subscription stream closes and the client must call Subscribe again to open a new stream.

https://developer.salesforce.com/docs/platform/pub-sub-api/guide/flow-control.html

pozil commented 9 months ago

@if0s, @soileaud thanks for the conversation and for the link to the updated documentation (this didn't exist when I started working on the client).

I've taken your feedback and released a new version of client (v3.4.0) with a simpler flow for infinite events while maintaining the current API signatures.

I've also updated the docs to illustrate infinite event support as the default setup and flow control for high volumes of events.