kdcllc / CometD.NetCore.Salesforce

CometD Salesforce Implementation.
MIT License
45 stars 24 forks source link

Silently fails when replay id is too old. #20

Closed kvansaders closed 3 years ago

kvansaders commented 4 years ago

I created a listener that grabs the latest received replayId from a separate table and reconnects using that particular replayId for continuity. The listener stayed alive for 2 days with no errors, but must of had a stale connection of sorts, as new messages stopped coming in. After realizing that the messages were being missed, I checked the PushTopic on Workbench and entered the latest replayId the listener was trying.

Workbench Error:

  1. Subscription Failure: 400::The replayId {82} you provided was invalid. Please provide a valid ID, -2 to replay all events, or -1 to replay only new events.

    "error": "400::The replayId {82} you provided was invalid. Please provide a valid ID, -2 to replay all events, or -1 to replay only new events.", "successful": false

The CometD client knew nothing of this error and just pretended that the connection was all fine and dandy. As a workaround for now, I am now restarting my listener every hour to make sure that the connection is fresh.

It would make sense to me that if this error actually was received and an exception thrown, I could handle it with a new connection asking for -2 and doing a reconciliation of duplicates as they come in.

kvansaders commented 4 years ago

Package Version : 3.0.0

apaulro commented 3 years ago

hi

What I did to handle this is to catch this exception in ResilientStreamingClient.cs and reset the ReplayId to -1, something like the below. So basically as soon as the client gets this error (basically there has been no events in Salesforce for ~72h), it would reconnect with -1 (register for new events). When a new event appears, it's ReplayId would be persisted - this is the approach I chose as opposed to connecting with -2 which means dealing with duplicates downstream (which can be quite a lot depending on the volume of events you are dealing with).

private void ErrorExtension_ConnectionError(
            object sender,
            string e)
        {
            // authentication failure
            if (string.Equals(e, "403::Handshake denied", StringComparison.OrdinalIgnoreCase)
                || string.Equals(e, "403:denied_by_security_policy:create_denied", StringComparison.OrdinalIgnoreCase)
                || string.Equals(e, "403::unknown client", StringComparison.OrdinalIgnoreCase))
            {
                _logger.LogWarning("Handled CometD Exception: {message}", e);

                // 1. Disconnect existing client.
                Disconnect();

                // 2. Invalidate the access token.
                _tokenResponse.Invalidate();

                _logger.LogDebug("Invalidate token for {name} ...", nameof(BayeuxClient));

                // 3. Recreate BayeuxClient and populate it with a new transport with new security headers.
                CreateBayeuxClient();

                // 4. Invoke the Reconnect Event
                Reconnect?.Invoke(this, true);
            }
            else if (e.StartsWith("400::The replayId ", StringComparison.OrdinalIgnoreCase)
                  && e.Contains("you provided was invalid.  Please provide a valid ID, -2 to replay all events, or -1 to replay only new events", StringComparison.OrdinalIgnoreCase))
            {
                _logger.LogWarning("Handled CometD Exception: {message}", e);
                /*
                logic to persist replayId which is being reset to -1 for this particular channel.
                that value is read when the channel is initialized.
                */

                _logger.LogDebug($"Reset {channelName} with ReplayId -1");
                _logger.LogDebug($"Initiate reconnect process..");

                // 1. Disconnect existing client.
                Disconnect();

                // 2. Invalidate the access token.
                _tokenResponse.Invalidate();

                _logger.LogDebug("Invalidate token for {name} ...", nameof(BayeuxClient));

                // 3. Recreate BayeuxClient and populate it with a new transport with new security headers.
                CreateBayeuxClient();

                // 4. Invoke the Reconnect Event
                Reconnect?.Invoke(this, true);
            }
kvansaders commented 3 years ago

Is this fix available in 3.0.1?

kvansaders commented 3 years ago

@apaulro , do I have to override the existing method? I'm using the nuget package.

apaulro commented 3 years ago

Hi @kvansaders Apologies for the delayed response. The ErrorExtension_ConnectionError( ) method isn't virtual so in this case, it was a fork of the project with the custom code noted above. Probably worth a PR to allow this method to be virtual in the ResilientStreamingClient.cs class.

kdcllc commented 3 years ago

@kvansaders and @apaulro please get the latest and use the override to get the job done. In addition, I added di extensions methods to support inherited class. enjoy!