After a few months of use in a webservice (WebJob) in Azure, I experienced that one of the instances of ResilientStreamingClient stopped (I use 4 clients for 4 separate listeners; 3 kept working and 1 stopped).
The issue is that I am unable to find any cause as to why it suddenly stopped.
I have made some changes to the original source code.
/// <summary>
/// Public only for testing purposes
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
public virtual void ErrorExtension_ConnectionError(
object? sender,
string e)
{
replayIdToUseOnReconnect = KnownInvalidReplayId;
_logger.LogError("{name} failed with the following message: {message}", nameof(ResilientStreamingClient), e);
// authentication failure
if (string.Equals(e, "403::Handshake denied", StringComparison.OrdinalIgnoreCase)
|| string.Equals(e, "403:denied_by_security_policy:create_denied", StringComparison.OrdinalIgnoreCase)
|| string.Equals(e, "403::unknown client", StringComparison.OrdinalIgnoreCase)
|| string.Equals(e, "401::Authentication invalid", StringComparison.OrdinalIgnoreCase))
{
_logger.LogWarning("Handled CometD Exception: {message}", e);
ReconnectAfterFailure();
}
else if (e.Contains("you provided was invalid"))
{
_logger.LogError("{name} failed with the following message: {message}", nameof(ResilientStreamingClient), e);
var start = e.IndexOf('{');
var end = e.IndexOf('}');
var replayIdString = e.Substring(start + 1, end - (start + 1));
if (int.TryParse(replayIdString, out var replayId))
{
InvalidReplayIdStrategy(replayId);
ReconnectAfterFailure();
}
else
{
_logger.LogCritical("Unable to parse invalid replayId. Unrecoverable without manual intervention");
}
}
else
{
_logger.LogError("{name} failed with the following message: {message}", nameof(ResilientStreamingClient), e);
}
}
private void ReconnectAfterFailure()
{
// 1. Disconnect existing client.
Disconnect();
// 2. Invalidate the access token.
lock (_tokenLock)
{
_tokenResponse.AccessToken = "";
_tokenResponse.ExpiresAt = DateTimeOffset.MinValue;
}
_logger.LogDebug("Invalidate token for {name} ...", nameof(BayeuxClient));
// 3. Recreate BayeuxClient and populate it with a new transport with new security headers.
CreateBayeuxClient();
// 4. Invoke the Reconnect Event
Reconnect?.Invoke(this, true);
}
protected virtual void ErrorExtension_ConnectionException(
object? sender,
Exception ex)
{
// ongoing time out issue not to be considered as error in the log.
if (ex?.Message == "The operation has timed out.")
{
_logger.LogDebug(ex.Message);
}
else if (ex != null)
{
_logger.LogError(ex.ToString());
}
}
protected virtual void ErrorExtension_ConnectionMessage(
object? sender,
string message)
{
_logger.LogDebug(message);
}
private void CreateBayeuxClient()
{
if (_isDisposed)
{
throw new ObjectDisposedException("Cannot create connection when disposed");
}
_logger.LogDebug("Creating {name} ...", nameof(BayeuxClient));
lock(_tokenLock)
{
if(_tokenResponse.ExpiresAt < DateTimeOffset.Now || string.IsNullOrWhiteSpace(_tokenResponse.AccessToken))
{
_tokenResponse = _authenticator.AsyncAuthRequest().GetAwaiter().GetResult();
}
// only need the scheme and host, strip out the rest
var serverUri = new Uri(_tokenResponse.InstanceUrl);
var endpoint = $"{serverUri.Scheme}://{serverUri.Host}/cometd/45.0";
var headers = new NameValueCollection { { nameof(HttpRequestHeader.Authorization), $"OAuth {_tokenResponse.AccessToken}" } };
// Salesforce socket timeout during connection(CometD session) = 110 seconds
var options = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
{
{ ClientTransport.TIMEOUT_OPTION, _readTimeOutMs },
{ ClientTransport.MAX_NETWORK_DELAY_OPTION, _readTimeOutMs }
};
_clientTransport = new LongPollingTransport(options, headers);
_bayeuxClient = new BayeuxClient(endpoint, _clientTransport);
// adds logging and also raises an event to process reconnection to the server.
_errorExtension = new ErrorExtension();
_errorExtension.ConnectionError += ErrorExtension_ConnectionError;
_errorExtension.ConnectionException += ErrorExtension_ConnectionException;
_errorExtension.ConnectionMessage += ErrorExtension_ConnectionMessage;
_bayeuxClient.AddExtension(_errorExtension);
_replayIdExtension = new ReplayExtension();
_bayeuxClient.AddExtension(_replayIdExtension);
_bayeuxClient.Handshake();
foreach (var listener in _listeners)
{
int realReplayId = 0;
if(replayIdToUseOnReconnect == KnownInvalidReplayId)
{
realReplayId = listener.ReplayId;
}
else
{
realReplayId = replayIdToUseOnReconnect;
_logger.LogWarning($"ReplayId was previously invalid. Setting it to {realReplayId} for PushTopic: {listener.TopicName}");
}
SubscribeTopic(listener.TopicName, listener, realReplayId);
}
_logger.LogDebug("{name} was created...", nameof(BayeuxClient));
}
}
The logical cause would be hitting some of the "dead-end" else cases/ConnectionMessage/ConnectionException, but I can not locate those logs in application insights.
So I am asking here if anyone else have experienced this or have any pointers to what might have gone wrong.
*P.S I have since removed all the dead-end else cases and instead re-create the client in those cases; Since then I have not experienced one of the clients stopping.
Hello :)
After a few months of use in a webservice (WebJob) in Azure, I experienced that one of the instances of ResilientStreamingClient stopped (I use 4 clients for 4 separate listeners; 3 kept working and 1 stopped).
The issue is that I am unable to find any cause as to why it suddenly stopped.
I have made some changes to the original source code.
The logical cause would be hitting some of the "dead-end" else cases/ConnectionMessage/ConnectionException, but I can not locate those logs in application insights.
So I am asking here if anyone else have experienced this or have any pointers to what might have gone wrong.
*P.S I have since removed all the dead-end else cases and instead re-create the client in those cases; Since then I have not experienced one of the clients stopping.