kdcllc / CometD.NetCore.Salesforce

CometD Salesforce Implementation.
MIT License
45 stars 24 forks source link

ReplayId - getting older messages after a period of time where channel was subscribed using the last successful replayId #22

Open tschroedertn opened 3 years ago

tschroedertn commented 3 years ago

We are having an issue with setting the replayId at the time a channel subscription is re-subscribed.

Here is what we do to enable setting the replayid on subscribing the channel. We setup the Replay Extension at setup time (excerpt): var bayeuxClient = new BayeuxClient(endpoint, new[] {transport}); bayeuxClient.AddExtension(new ReplayExtension());

later when subscribing the channel the replayId is set for getting new messages: public void Subscribe(BayeuxClient bayeuxClient) { long replayId = GetLastSuccessfulReplayId(); //pulling last successfull replayid for channel from database Channel = bayeuxClient.GetChannel(ChannelName, replayId); Channel.Subscribe(this); }

If we are re-subscribing the channel due to a connection issue, the handshake method is received and the subscription with an update replayid above has no effect - it restarts the subscription from the first time it subscribed.

We need a way to SET the replayId in the case of a re-subscription, but the ReplayId is read only.

In summary, we start up the Bayeux client, add listeners to all of the meta channels, and perform a Handshake on Bayeux client. On the handshake meta channel listener, we detect a successful handshake and in turn loop through a collection of channel listeners and have them self-subscribe to the channel based on the last successfully processed replayid (stored in database per channel).

The general idea we are following is that when subscribing, we use the last successful replayid stored in the database and when processing an event we store the replayid for the channel in the database. It seems that handshake event happens several times during the day perhaps based on what the client is detecting as an issue with the connection. Is that normal? It seems to be after one of these multiple self-initiated handshakes the setting of the replayid on subscription is ignored; it is only used when subscribing for the first time. If anyone has any ideas or experiences it would be appreciated!

brettkc commented 3 years ago

@tschroedertn did you come up with anything on this? Also in the process of considering a clean way to replay events in the case of a connection disruption using some sort of state tracking.

nssidhu commented 3 years ago

I am also looking for the solution to this

ancientyouthman commented 3 years ago

Also looking for a solution on this - if anyone has any update it would be greatly appreciated!

ancientyouthman commented 3 years ago

The workaround we found to work was to simply disconnect the BayeuxClient from the Salesforce stream, and then reconnect and re-subscribe to the channels whenever we receive a "403::Unknown client" error in the /meta/connect channel listener. I would still be interested to know if there's a solution that doesn't involve having to do this, though.

YuriiMocherniuk commented 3 years ago

I've also experienced the same kind of issue. After a while the same message gets picked up over and over again.

YuriiMocherniuk commented 3 years ago

@ancientyouthman, BTW, in which field of IMessage input parameter do you look for "403::Unknown client" error? As far as I can see there is only Successful boolean property available. image

YuriiMocherniuk commented 3 years ago

After a short while I've found out the answer. The error is a part of JSON: if (!message.Successful && message.Json.Contains("403::Unknown client"))

tschroedertn commented 3 years ago

We came up with the same solution as @ancientyouthman suggested earlier. The workaround that we implemented is basically to use the ErrorExtensions to listen for Connection Errors. When a 403 is detected, we reset subscriptions, disconnect the client and restart the whole process of establishing a client and subscribing to the channels again. Since we cannot update the ReplayId, we basically shut down the client/subscriptions and start over. At subscription time you can set the replay id, so it picks up where it left off. Salesforce periodically shuts down the client (at some point I found an article that mentioned Salesforce only allows the connection for a set period of time) so it seems you have to respond to that error event and re-establish your client and subscriptions:

`private void ErrorExtension_ConnectionError( object sender, string e) { // authentication failure if (string.Equals(e, "403::Handshake denied", StringComparison.OrdinalIgnoreCase) || string.Equals(e, "403:denied_by_security_policy:create_denied", StringComparison.OrdinalIgnoreCase) || string.Equals(e, "403::unknown client", StringComparison.OrdinalIgnoreCase)) { _logger.LogWarning("In {className}-{methodName} - handled CometD Exception: {message}", nameof(SalesforceEventPollingService), nameof(ErrorExtension_ConnectionError), e);

            RecreateClientAndSubscribeChannels();
        }
        else
        {
            _logger.LogError("In {className}-{methodName} - failed with the following message: {message}", nameof(SalesforceEventPollingService), nameof(ErrorExtension_ConnectionError), e);
        }
    }` 

"RecreateClientAndSubscribeChannels()" first calls my Disconnect method, then steps through all of the same code it did at startup to re-create the client and subscriptions:

`public void Disconnect(int timeout) { if (_isDisposed) { throw new ObjectDisposedException("Cannot disconnect when disposed"); }

        BayeuxClient?.ResetSubscriptions();

        _logger.LogInformation("In {className}-{methodName}: Disconnecting bayeux client...", nameof(SalesforceEventPollingService), nameof(Disconnect));
        BayeuxClient?.Disconnect();
        BayeuxClient?.WaitFor(timeout, new[] { BayeuxClient.State.DISCONNECTED });
        _logger.LogInformation("In {className}-{methodName}: bayeux client disconnected - Client Statuses=connected:{bayeuxClientConnected}, disconnected:{bayeuxClientDisconnected}, handshook:{bayeuxClientHandshook}", nameof(SalesforceEventPollingService), nameof(Disconnect), BayeuxClient?.Connected, BayeuxClient?.Disconnected, BayeuxClient?.Handshook);

        if (_errorExtension != null)
        {
            _errorExtension.ConnectionError -= ErrorExtension_ConnectionError;
            _errorExtension.ConnectionException -= ErrorExtension_ConnectionException;
            _errorExtension.ConnectionMessage -= ErrorExtension_ConnectionMessage;
        }

        _logger.LogInformation("In {className}-{methodName}:Disconnected...", nameof(SalesforceEventPollingService), nameof(Disconnect));
    }`

It may take some refactoring your startup code so that you can call it again later, but that seems to have solved our issue. I hope this helps someone else. It would be nice to be able to just update the replayId, but this workaround has been stable for several months.

YuriiMocherniuk commented 3 years ago

@tschroedertn, thanks. Yes, we're using a similar approach with complete reconnection + subscription that is done from "meta/connect" channel listener. So far it works fine and resolved our remaining issues with SFDC Event Bus and a proper replayId support.

davidfilkins commented 2 years ago

This thread, while brief, is great... Thank you all for the insight into wiring up replayId and the ErrorExtension setup. I've been banging my head against the wall half the day on how to do both.

BetimShala commented 9 months ago

@tschroedertn, thanks. Yes, we're using a similar approach with complete reconnection + subscription that is done from "meta/connect" channel listener. So far it works fine and resolved our remaining issues with SFDC Event Bus and a proper replayId support.

Can you please provide code snippets if possible on your workaround/solution to this problem?

Thanks in advance

YuriiMocherniuk commented 9 months ago

@BetimShala, unfortunately, we're dealing with proprietary code that cannot be disclosed, but overall it looks like using GetChannel method for Bauex client with argument of "/meta/connect" and then calling AddListener method into which you can pass your object that implements IMessageListener interface. Here's an independent example: bayeuxClient.GetChannel("/meta/connect").AddListener(new MyMessageListener());

BetimShala commented 9 months ago

@BetimShala, unfortunately, we're dealing with proprietary code that cannot be disclosed, but overall it looks like using GetChannel method for Bauex client with argument of "/meta/connect" and then calling AddListener method into which you can pass your object that implements IMessageListener interface. Here's an independent example: bayeuxClient.GetChannel("/meta/connect").AddListener(new MyMessageListener());

Thanks for your reply,

I did something like this and looks like its working, posting here so it might be useful for others

public class MetaConnectListener : IMessageListener
{
    private SFStreamingClient _client;

    public MetaConnectListener(SFStreamingClient client)
    {
        _client = client;
    }

    public void OnMessage(IClientSessionChannel channel, IMessage message)
    {
        if (!message.Successful && message.Json.Contains("403::Unknown client"))
        {
            _client.RecreateClientAndSubscribeChannels();
        }
    }
}

usage

private void CreateBayeuxClient()
{
    if (_isDisposed)
    {
        throw new ObjectDisposedException("Cannot create connection when disposed");
    }

    _logger.LogDebug("Creating {name} ...", nameof(BayeuxClient));

    var accessToken = _tokenResponse.Value().Result;

    var serverUri = new Uri(accessToken.InstanceUrl);
    var endpoint = $"{serverUri.Scheme}://{serverUri.Host}{_options.CometDUri}";

    var headers = new NameValueCollection { { nameof(HttpRequestHeader.Authorization), $"{accessToken.TokenType} {accessToken.AccessToken}" } };

    // Salesforce socket timeout during connection(CometD session) = 110 seconds
    var options = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
    {
        { ClientTransport.TIMEOUT_OPTION, _readTimeOut },
        { ClientTransport.MAX_NETWORK_DELAY_OPTION, _readTimeOut }
    };

    _clientTransport = new LongPollingTransport(options, headers);

    _bayeuxClient = new BayeuxClient(endpoint, _clientTransport);

    _errorExtension = new ErrorExtension();
    _errorExtension.ConnectionError += ErrorExtension_ConnectionError;
    _errorExtension.ConnectionException += ErrorExtension_ConnectionException;
    _errorExtension.ConnectionMessage += ErrorExtension_ConnectionMessage;
    _bayeuxClient.AddExtension(_errorExtension);

    _replayIdExtension = new ReplayExtension();
    _bayeuxClient.AddExtension(_replayIdExtension);

    var metaConnectChannel = _bayeuxClient.GetChannel(ChannelFields.META_CONNECT);
    metaConnectChannel.AddListener(new MetaConnectListener(this));
}

public void RecreateClientAndSubscribeChannels()
{
    Disconnect();
    _tokenResponse.Invalidate();
    CreateBayeuxClient();
    Reconnect?.Invoke(this, true);
    ResubscribeAllChannels();
}

private void ResubscribeAllChannels()
{
    foreach (var subscription in _subscriptions)
    {
        var topicName = subscription.Key;
        var replayId = subscription.Value.Item1;
        var listeners = subscription.Value.Item2;
        foreach (var listener in listeners)
        {
            SubscribeTopic(topicName, listener, replayId);
        }
    }
}

Thanks

YuriiMocherniuk commented 9 months ago

@BetimShala, unfortunately, we're dealing with proprietary code that cannot be disclosed, but overall it looks like using GetChannel method for Bauex client with argument of "/meta/connect" and then calling AddListener method into which you can pass your object that implements IMessageListener interface. Here's an independent example: bayeuxClient.GetChannel("/meta/connect").AddListener(new MyMessageListener());

Thanks for your reply,

I did something like this and looks like its working, posting here so it might be useful for others

public class MetaConnectListener : IMessageListener
{
    private SFStreamingClient _client;

    public MetaConnectListener(SFStreamingClient client)
    {
        _client = client;
    }

    public void OnMessage(IClientSessionChannel channel, IMessage message)
    {
        if (!message.Successful && message.Json.Contains("403::Unknown client"))
        {
            _client.RecreateClientAndSubscribeChannels();
        }
    }
}

usage

private void CreateBayeuxClient()
{
    if (_isDisposed)
    {
        throw new ObjectDisposedException("Cannot create connection when disposed");
    }

    _logger.LogDebug("Creating {name} ...", nameof(BayeuxClient));

    var accessToken = _tokenResponse.Value().Result;

    var serverUri = new Uri(accessToken.InstanceUrl);
    var endpoint = $"{serverUri.Scheme}://{serverUri.Host}{_options.CometDUri}";

    var headers = new NameValueCollection { { nameof(HttpRequestHeader.Authorization), $"{accessToken.TokenType} {accessToken.AccessToken}" } };

    // Salesforce socket timeout during connection(CometD session) = 110 seconds
    var options = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
    {
        { ClientTransport.TIMEOUT_OPTION, _readTimeOut },
        { ClientTransport.MAX_NETWORK_DELAY_OPTION, _readTimeOut }
    };

    _clientTransport = new LongPollingTransport(options, headers);

    _bayeuxClient = new BayeuxClient(endpoint, _clientTransport);

    _errorExtension = new ErrorExtension();
    _errorExtension.ConnectionError += ErrorExtension_ConnectionError;
    _errorExtension.ConnectionException += ErrorExtension_ConnectionException;
    _errorExtension.ConnectionMessage += ErrorExtension_ConnectionMessage;
    _bayeuxClient.AddExtension(_errorExtension);

    _replayIdExtension = new ReplayExtension();
    _bayeuxClient.AddExtension(_replayIdExtension);

    var metaConnectChannel = _bayeuxClient.GetChannel(ChannelFields.META_CONNECT);
    metaConnectChannel.AddListener(new MetaConnectListener(this));
}

public void RecreateClientAndSubscribeChannels()
{
    Disconnect();
    _tokenResponse.Invalidate();
    CreateBayeuxClient();
    Reconnect?.Invoke(this, true);
    ResubscribeAllChannels();
}

private void ResubscribeAllChannels()
{
    foreach (var subscription in _subscriptions)
    {
        var topicName = subscription.Key;
        var replayId = subscription.Value.Item1;
        var listeners = subscription.Value.Item2;
        foreach (var listener in listeners)
        {
            SubscribeTopic(topicName, listener, replayId);
        }
    }
}

Thanks

You're welcome!

jordanmcdonald-rh commented 5 months ago

Hi All, What is _subscriptions and where does it get populated?