kdcllc / CometD.NetCore.Salesforce

CometD Salesforce Implementation.
MIT License
45 stars 24 forks source link

System.NullReferenceException: Object reference not set to an instance of an object in CometD.NetCore.Salesforce.ResilientStreamingClient.ErrorExtension_ConnectionError function. #41

Open KwizaKaneria opened 2 years ago

KwizaKaneria commented 2 years ago

Currently, we are using this library to listen to push topics whenever there is a change in salesforce. We are using this library as follows.

Code for registering an event and configuring the Salesforce Streaming Client in Program.cs is:

public static void Main(string[] args) { var builder = WebApplication.CreateBuilder(args); builder.Host.ConfigureServices((hostContext, services) => { var salesforceConfiguration = services.BuildServiceProvider().GetRequiredService<IOptions>().Value; services.AddResilientStreamingClient("", "", (conf) => { conf.ClientId = salesforceConfiguration.ClientId; conf.ClientSecret = salesforceConfiguration.ClientSecret; conf.RefreshToken = salesforceConfiguration.RefreshToken; conf.LoginUrl = salesforceConfiguration.BaseUrl; conf.OAuthUri = Constants.OAuthUriConfig; conf.EventOrTopicUri = Constants.EventOrTopicConfig; conf.CometDUri = salesforceConfiguration.CometdUrl; }); services.AddSingleton<IEventBus, EventBus>(); services.AddHostedService(); services.AddTransient<IMessageListener, UpdatedListener>(); services.AddTransient<IMessageListener, DeletedListener>(); });

    var app = builder.Build();
    app.Run();
}

Code for subscribing to events is:

public class SalesforceEventBusHostedService:IHostedService { private readonly ILogger _logger; private readonly IEventBus _eventBus; private readonly ICacheService _cacheService;

//Make a list of push topics that we created in salesforce private readonly List<Tuple<string,int,Type>> _eventsMapping = new() { new Tuple<string,int,Type>("topic/Updated",-1,typeof(UpdatedListener)), new Tuple<string, int, Type>("topic/Deleted", -1, typeof(DeletedListener)) };

public SalesforceEventBusHostedService(ILogger<SalesforceEventBusHostedService> logger,IEventBus eventBus, ICacheService<ReplayIdDto> cacheService)
{
    _logger = logger;
    _eventBus = eventBus;
    _cacheService = cacheService;
    _cacheService.CacheRegion = $"{Constants.MessageListenerCacheRegion}";
}

private static object[] GetPlatformEventObject(string eventName,int replayId,Type eventType)
{
    var platformEvent = Activator.CreateInstance(typeof(PlatformEvent<>).MakeGenericType(eventType));
    platformEvent?.GetType().GetProperty("Name")?.SetValue(platformEvent, eventName);
    platformEvent?.GetType().GetProperty("ReplayId")?.SetValue(platformEvent, replayId);
    return new[] {platformEvent};
}

private async Task SubscribeToEvent(string eventName,int replayId,Type eventType)
{
    var platformEvent = GetPlatformEventObject(eventName, replayId, eventType);
    var subscribeMethod = typeof(IEventBus).GetMethod("Subscribe")?.MakeGenericMethod(eventType);
    await ((Task) subscribeMethod?.Invoke(_eventBus, platformEvent))!;
}

private async Task UnSubscribeToEvent(string eventName,int replayId,Type eventType)
{
    var platformEvent = GetPlatformEventObject(eventName, replayId, eventType);
    var subscribeMethod = typeof(IEventBus).GetMethod("Unsubscribe")?.MakeGenericMethod(eventType);
    await ((Task) subscribeMethod?.Invoke(_eventBus, platformEvent))!;
}

public async Task StartAsync(CancellationToken cancellationToken)
{
    _logger.LogInformation($"{nameof(SalesforceEventBusHostedService)} starting.");
    for (var i = 0; i < _eventsMapping.ToList().Count; i++)
    {
        var (originalEventName, originalReplayId, originalEventType) = _eventsMapping[i];
        var replayIdFromCache = (await _cacheService.GetItemAsync(StreamingApiHelper.RemoveEventOrTopicPrefix(originalEventName)))?.ReplayId??originalReplayId;
        var (eventName, replayId, eventType) = _eventsMapping[i] = new Tuple<string, int, Type>(originalEventName, replayIdFromCache, originalEventType);
        await SubscribeToEvent(eventName, replayId, eventType);
    }
}

public async Task StopAsync(CancellationToken cancellationToken)
{
    _logger.LogInformation($"{nameof(SalesforceEventBusHostedService)} stopped.");
    foreach (var (eventName,replayId, eventType) in _eventsMapping)
    {
        await UnSubscribeToEvent(eventName, replayId, eventType);
    }
}

}

But I face an issue like:

System.NullReferenceException: Object reference not set to an instance of an object. at CometD.NetCore.Salesforce.ResilientStreamingClient.ErrorExtension_ConnectionError(Object sender, String e) in C:\projects\cometd-netcore-salesforce\src\CometD.NetCore.Salesforce\ResilientStreamingClient.cs:line 210 at CometD.NetCore.Client.Extension.ErrorExtension.ReceiveMeta(IClientSession session, IMutableMessage message) in C:\projects\cometd-netcore\src\CometD.NetCore\Client\Extension\ErrorExtension.cs:line 74 at CometD.NetCore.Common.AbstractClientSession.ExtendReceive(IMutableMessage message) in C:\projects\cometd-netcore\src\CometD.NetCore\Common\AbstractClientSession.cs:line 188 at CometD.NetCore.Common.AbstractClientSession.Receive(IMutableMessage message) in C:\projects\cometd-netcore\src\CometD.NetCore\Common\AbstractClientSession.cs:line 155 at CometD.NetCore.Client.BayeuxClient.PublishTransportListener.OnMessages(IList`1 messages) in C:\projects\cometd-netcore\src\CometD.NetCore\Client\BayeuxClient.cs:line 769 at CometD.NetCore.Client.Transport.LongPollingTransport.GetResponseCallback(IAsyncResult asynchronousResult) in C:\projects\cometd-netcore\src\CometD.NetCore\Client\Transport\LongPollingTransport.cs:line 310

Can anyone please guide me on why this error is received and if there is something I should be updating on our end?

mtech-ahokkonen commented 9 months ago

We're encountering similar issues with our Salesforce subscriber implementation.

Specifically, we have three active subscribers for push topics that track replay IDs, and about once a week, one subscriber (always same) encounters an error during reconnection. Our temporary fix has been to reset the replay ID to -2, but the issue tends to resurface after a few days.