kdcllc / CometD.NetCore

CometD for use with Salesforce Platform Events
MIT License
29 stars 16 forks source link

Handshake issue when getting replayid from async call to database #36

Open kathariyasunny16 opened 1 year ago

kathariyasunny16 commented 1 year ago

I am trying to get the replayid from the cosmos database before creating a Bayeux client object. I am not able to handshake when after bayeuxclient.Handshake(). The same code is working fine when I am not using the cosmos db call. Please find below code snippet.

var authResponse = await _authenticateConnectedApp.Authenticate().ConfigureAwait(false);
            if (authResponse.AccessToken is not null)
            {
                var (channel, apiVersion) = Config.GetSaleForceAccountChannelDetails();
                //DB call
                var lastUsedReplayId = await _operatingParametersRepository.GetSaleForceLastUsedReplayId(channel).ConfigureAwait(false);

                var endpointUrl = $"{authResponse.InstanceUrl}/cometd/{apiVersion}";
                var readTimeOut = 120 * 1000;

                var transportOptions = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase)
                {
                    {ClientTransport.TIMEOUT_OPTION, readTimeOut},
                    {ClientTransport.MAX_NETWORK_DELAY_OPTION, readTimeOut}
                };
                var headers = new NameValueCollection
                    {{HttpRequestHeader.Authorization.ToString(), $"Bearer {authResponse.AccessToken}"}};

                var transport = new LongPollingTransport(transportOptions, headers);
                var transports = new ClientTransport[]
                {
                    transport
                };

                // Create a CometD client instance
                var bayeuxClient = new BayeuxClient(endpointUrl, transports);
                bayeuxClient.Handshake();
                bayeuxClient.WaitFor(100000, new List<BayeuxClient.State> {BayeuxClient.State.CONNECTED});
                log.LogInformation("Handshake Status : {Status}", bayeuxClient.Handshook);
                bayeuxClient.GetChannel(channel, lastUsedReplayId).Subscribe(new SalesForceMessageListener());
                log.LogInformation("Connection Status : {Status}. Now, Waiting for the event", bayeuxClient.Connected);
            }

// CosmosHelperRepository Code Called from above function

public async Task<long> GetSaleForceLastUsedReplayId(string channel)
    {
        try
        {
            var dbExistingRecordList = await _cosmosHelper.GetQuery(x =>
                x.Pk == nameof(SalesForceReplayIdRunDetail)
                && x.ChannelName == channel);
            var lastUsedReplayId = dbExistingRecordList.FirstOrDefault()?.LastUsedReplayId ?? -1;
            return lastUsedReplayId;
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
            throw;
        }

    }

// Implementation of Cosmos helper GetQuery method called in above repository function

public async Task<IList<T>> GetQuery(Expression<Func<T, bool>> predicate)
    {
        var container = GetContainer();
        var query = container.GetItemLinqQueryable<T>()
            .Where(predicate);
        var list = new List<T>();
        using var feedIterator = _feedIteratorProvider.GetFeedIterator(query);
        while (feedIterator.HasMoreResults) list.AddRange(await feedIterator.ReadNextAsync());

        return list;
    }

Just an FYI, I am able to get the replayId from cosmos db using the above code but the handshake is not happening.

kathariyasunny16 commented 1 year ago

@kdcllc please have a look. I am trying to resolve this issue for the last 2 days. Any help will be much appreciated.

Thank you so much for your response @kdcllc . I have updated the code in the issue and sharing here as well for your reference.


var authResponse = await _authenticateConnectedApp.Authenticate().ConfigureAwait(false);
if (authResponse.AccessToken is not null)
{
    var (channel, apiVersion) = Config.GetSaleForceAccountChannelDetails();

    //DB call
    var lastUsedReplayId = await _operatingParametersRepository.GetSaleForceLastUsedReplayId(channel).ConfigureAwait(false);

    var endpointUrl = $"{authResponse.InstanceUrl}/cometd/{apiVersion}";
    var readTimeOut = 120 * 1000;

    var transportOptions = new Dictionary<string,object>(StringComparer.OrdinalIgnoreCase)
    {
        {ClientTransport.TIMEOUT_OPTION, readTimeOut},
        {ClientTransport.MAX_NETWORK_DELAY_OPTION, readTimeOut}
    };

    var headers = new NameValueCollection{
        {HttpRequestHeader.Authorization.ToString(),
        $"Bearer {authResponse.AccessToken}"}};

    var transport = new LongPollingTransport(transportOptions, headers);
    var transports = new ClientTransport[]
    {
        transport
    };

    // Create a CometD client instance
    var bayeuxClient = new BayeuxClient(endpointUrl, transports);
    bayeuxClient.Handshake();
    bayeuxClient.WaitFor(100000, new List<BayeuxClient.State> {BayeuxClient.State.CONNECTED});
    log.LogInformation("Handshake Status : {Status}", bayeuxClient.Handshook);

    bayeuxClient.GetChannel(channel,lastUsedReplayId).Subscribe(new SalesForceMessageListener());
    log.LogInformation("Connection Status : {Status}. Now, Waiting for the event", bayeuxClient.Connected);
}
// CosmosHelperRepository Code Called from above function
public async Task<long> GetSaleForceLastUsedReplayId(string channel)
{
    try
    {
        var dbExistingRecordList = await _cosmosHelper.GetQuery(x=>  x.Pk == nameof(SalesForceReplayIdRunDetail) && x.ChannelName == channel);
        var lastUsedReplayId = dbExistingRecordList.FirstOrDefault()?.LastUsedReplayId ?? -1;
        return lastUsedReplayId;
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
        throw;
    }

}
// Implementation of Cosmos helper GetQuery method called in above
// repository function
public async Task<IList<T>> GetQuery(Expression<Func<T, bool>> predicate)
{
    var container = GetContainer();
    var query = container.GetItemLinqQueryable<T>()
        .Where(predicate);
    var list = new List<T>();
    using var feedIterator = _feedIteratorProvider.GetFeedIterator(query);
    while (feedIterator.HasMoreResults) list.AddRange(await feedIterator.ReadNextAsync());

    return list;
}

Just an FYI, I am able to get the replayId from cosmos db using the above code but the handshake is not happening.

kathariyasunny16 commented 1 year ago

Hi @kdcllc,

I debugged this and got to know that the issue is not with an async call to the cosmos repository to get the replayid. I think the handshake is not happening when I am using the below code in Azure function startup to resolve the dependency of the cosmos repository and helper.

Code used in startup to resolve the dependency:

builder.Services.AddTransient<IOperatingParametersRepository>(s => new OperatingParametersRepository(
            new CosmosHelper<SalesForceReplayIdRunDetail>(s.GetService<CosmosClient>(),
                Settings.Cosmos.DatabaseName, Settings.Cosmos.OperatingParametersContainer,
                s.GetService<IFeedIteratorProvider>(), s.GetService<ISerializer>(),
                s.GetService<ILogger<CosmosHelper<SalesForceReplayIdRunDetail>>>())));

Cosmos Repository Constructor:

public OperatingParametersRepository(ICosmosHelper<SalesForceReplayIdRunDetail> cosmosHelper)
    {
        _cosmosHelper = cosmosHelper;
    }

Cosmos Helper Constructor:

public CosmosHelper(CosmosClient cosmosClient, string databaseName, string containerName,
        IFeedIteratorProvider feedIteratorProvider, ISerializer serializer, ILogger<CosmosHelper<T>> logger)
        : base(cosmosClient, containerName, serializer, logger)
    {
        _cosmosClient = cosmosClient;
        _databaseName = databaseName;
        _containerName = containerName;
        _feedIteratorProvider = feedIteratorProvider;
    }
kathariyasunny16 commented 1 year ago

@kdcllc @martin-podlubny @r-matias @jesbacon, Any guidance on this?

kdcllc commented 1 year ago

Hi @kdcllc,

I debugged this and got to know that the issue is not with an async call to the cosmos repository to get the replayid. I think the handshake is not happening when I am using the below code in Azure function startup to resolve the dependency of the cosmos repository and helper.

Code used in startup to resolve the dependency:

builder.Services.AddTransient<IOperatingParametersRepository>(s => new OperatingParametersRepository(
            new CosmosHelper<SalesForceReplayIdRunDetail>(s.GetService<CosmosClient>(),
                Settings.Cosmos.DatabaseName, Settings.Cosmos.OperatingParametersContainer,
                s.GetService<IFeedIteratorProvider>(), s.GetService<ISerializer>(),
                s.GetService<ILogger<CosmosHelper<SalesForceReplayIdRunDetail>>>())));

Cosmos Repository Constructor:

public OperatingParametersRepository(ICosmosHelper<SalesForceReplayIdRunDetail> cosmosHelper)
    {
        _cosmosHelper = cosmosHelper;
    }

Cosmos Helper Constructor:

public CosmosHelper(CosmosClient cosmosClient, string databaseName, string containerName,
        IFeedIteratorProvider feedIteratorProvider, ISerializer serializer, ILogger<CosmosHelper<T>> logger)
        : base(cosmosClient, containerName, serializer, logger)
    {
        _cosmosClient = cosmosClient;
        _databaseName = databaseName;
        _containerName = containerName;
        _feedIteratorProvider = feedIteratorProvider;
    }

@kathariyasunny16 Does this code work as console application and not as azure function?

What operating system are you using?

kathariyasunny16 commented 1 year ago

@kdcllc We are using Windows 10 and haven't tried the console app as we need to use this in the Azure function.

kathariyasunny16 commented 11 months ago

Hello @kdcllc @martin-podlubny @r-matias , I have a quick question. How long the long-polling connection will be alive when I am subscribed to one channel? My subscriber is able to receive the events for 3 hours after subscribing and it automatically reconnects after 2 minutes for 3 hours. But it's not able to receive the events after 3 hours and instead of /cometd/54.0/connect request, it sends /cometd/54.0/handshake after 3 hours and then stops the subscription. So, I just want to know whether I need to run my function which subscribes to the channel again after 3 hours. If yes, then when I try to run the function it's not able to make a successful handshake. Please suggest some solution. Thanks in advance.

kathariyasunny16 commented 11 months ago

Hi @kdcllc @martin-podlubny @r-matias @jesbacon , Please reply to the above comment. I am stuck on this and need your support.

jordanmcdonald-rh commented 5 months ago

@kathariyasunny16 when I use a lastReplayId or a -2 I get none of the missed responses. Is it working for you?

What does your SalesForceMessageListener class look like?