LSEG-API-Samples / Example.DataLibrary.DotNet

Example projects demonstrating access to the Refinitiv Data Platform using the Refinitiv Data Library for .NET.
7 stars 1 forks source link

Unhandled Exception in Session Management Causing Application Crash `[ERROR] [EndpointDefinition] GetDataAsync failed for endpoint: https://api.refinitiv.com/auth/cloud-credentials/v1/. Data services unavailable. Session is closed` #2

Open ArturWincenciak opened 1 month ago

ArturWincenciak commented 1 month ago

We have encountered an unhandled exception in our application that caused a crash. The log below was recorded just before the crash, and it originates from your library:

[ERROR] [253] [EndpointDefinition] GetDataAsync failed for endpoint: https://api.refinitiv.com/auth/cloud-credentials/v1/. Data services unavailable.  Session is closed
Unhandled exception. System.InvalidOperationException: Data services unavailable.  Session is closed
   at Refinitiv.Data.Delivery.Request.EndpointDefinition.GetDataAsync(ISession session, Action`3 cb, CancellationToken cancellationToken)
   at Refinitiv.Data.Delivery.Queue.QueueNode.RefreshCloudCredentialsAsync()
   at Refinitiv.Data.Delivery.Queue.QueueNode.CloudRefreshTimerHandler(Object source, ElapsedEventArgs e)
   at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state)
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()

We are unable to catch this exception in our code, and we would like to request that you implement exception handling within your library to prevent such exceptions from crashing the entire application.

To provide more context, below is how we create the session, queue, and subscription:

public async Task Subscribe(Func<IQueueResponse, CancellationToken, Task> callback, CancellationToken ct)
{
    _session = PlatformSession.Definition()
        .AppKey(options.AppKey)
        .OAuthGrantType(new GrantPassword()
            .UserName(options.UserName)
            .Password(options.Password))
        .TakeSignonControl(true)
        .GetSession()
        .OnState((state, msg, _) =>
            logger.LogInformation("On session state changed: {state}. {msg}", state, msg))
        .OnEvent((eventCode, msg, _) =>
            logger.LogInformation("Event: {eventCode}. {msg}", eventCode, msg));

    var openSessionStatus = await _session.OpenAsync(ct);
    if (openSessionStatus != Session.State.Opened)
        logger.LogWarning("Session is not open, status: {status}", openSessionStatus);

    var queueDefinition = Queue.Definition(options.Endpoint);
    var queueManager = queueDefinition
        .CreateQueueManager()
        .OnError((err, _) => logger.LogError("Error: {error}", err));

    var queueCriteria = new JObject {{prop, value}};
    var queue = await queueManager.CreateQueueAsync(queueCriteria, Queue.CloudType.AWS, ct);

    var subscriber = queueDefinition.CreateAWSSubscriber(queue);
    var status = await subscriber.StartPollingAsync((response, _) =>
    {
        callback(response, ct).GetAwaiter().GetResult();
    });

    if (!status)
        logger.LogWarning("Start polling failed");
}

The synchronization point in our application is the subscriber.StartPollingAsync method call. This is where message processing begins, and we are able to wrap this line in a try { } catch { } block to catch exceptions, as shown below:

try
{
    await client.Subscribe(async (news, ct) =>
    {
        await pipeline.Handle(news, ct); // Perform business logic
    }, cancellationToken);
}
catch (Exception ex)
{
    switch (ex)
    {
        case AmazonSQSException sqsEx:
            // Log the exception
            // Close the current session and queue subscription
            // Re-create the session and re-subscribe to the queue
            break;
        default:
            // Log any other exceptions
            break;
    }
}

However, the exception that caused our application to crash was not caught. It appears to have originated elsewhere, leading to the application's unexpected and permanent termination.

That method _session.OpenAsync does not allow us to catch the [ERROR] [253] [EndpointDefinition] exception. If this method were blocking further processing, it would be possible to catch all exceptions related to the session implementation.

To give more context: in our application, we also make HTTP API calls in the following way:

try
{
    var response = await EndpointRequest.Definition(endpointUrl).GetDataAsync(ct);
    // Perform business logic
}
catch (Exception ex)
{
    // Log the exception
}

Under the hood, this static method EndpointRequest.Definition implicitly uses the previously created session.

As we understand, the session functions like a singleton that all parts of your library implicitly rely on. Given the log messages [ERROR] [253] [EndpointDefinition] that were recorded before the crash, it seems the problem might be related to the EndpointRequest.Definition method. However, we did not catch any exceptions log in the try { } catch { } block.

I suspect that the log [ERROR] [253] [EndpointDefinition] originates from the implementation of the EndpointRequest.Definition method. However, in this case, the method was not invoked by our logic but by some internal mechanism in the library responsible for session/token renewal.

I would also like to address the AmazonSQSException, which occasionally occurs in our application. Despite a correct setup, this exception still appears. As we understand it, your library handles the responsibility for token refresh logic, session management, and session recovery. However, we observe situations where this exception occurs, ranging from several times a day to once every few days. When it does happen, we close the old session, unsubscribe from the queue, clean up, and then re-create the session and re-subscribe to the queues. We would like to ask if this is the correct approach and if you could provide any guidance or best practices to avoid these exceptions.

If our approach is correct, this information might help identify areas in your library that could be improved.

Lastly, on a related but separate note, could you provide an overload of the subscriber.StartPollingAsync method that accepts an asynchronous callback, such as:

Task<bool> StartPollingAsync(Func<IQueueResponse, CancellationToken, Task> cb);

Currently, the only available method is:

Task<bool> StartPollingAsync(Action<IQueueResponse, IQueueSubscriber> cb);

This requires us to create a synchronization point for the callback using GetAwaiter and GetResult like this:

var status = await subscriber.StartPollingAsync((response, _) =>
{
    callback(response, ct).GetAwaiter().GetResult();
});

Providing an asynchronous version of this method would simplify our code and could help in better handling exceptions.

cristian-baciuwahl-lseg commented 1 month ago

Hello Artur,

Thank you for notifying us about this issue.

Please create a post on the LSEG Developer Portal, describing the issue you encountered. You can find the portal here: Top Questions - Forum | Refinitiv Developer Communityhttps://community.developers.refinitiv.com/index.html Bellow the search engine box, click on the “Ask a question” and follow the process of posting the issue. This will help us to start an investigation and provide solutions to the problem at hand.

Best regards, Baciu Wahl Cristian

From: Artur Wincenciak Teo.Vincent @.> Sent: Friday, August 9, 2024 6:01 PM To: LSEG-API-Samples/Example.DataLibrary.DotNet @.> Cc: Subscribed @.***> Subject: [LSEG-API-Samples/Example.DataLibrary.DotNet] Unhandled Exception in Session Management Causing Application Crash `[ERROR] [EndpointDefinition] GetDataAsync failed for endpoint: https://api.refinitiv.com/auth/cloud-credentials/v1...

EXTERNAL EMAIL

We have encountered an unhandled exception in our application that caused a crash. The log below was recorded just before the crash, and it originates from your library:

[ERROR] [253] [EndpointDefinition] GetDataAsync failed for endpoint: https://api.refinitiv.com/auth/cloud-credentials/v1/https://api.refinitiv.com/auth/cloud-credentials/v1/. Data services unavailable. Session is closed

Unhandled exception. System.InvalidOperationException: Data services unavailable. Session is closed

at Refinitiv.Data.Delivery.Request.EndpointDefinition.GetDataAsync(ISession session, Action`3 cb, CancellationToken cancellationToken)

at Refinitiv.Data.Delivery.Queue.QueueNode.RefreshCloudCredentialsAsync()

at Refinitiv.Data.Delivery.Queue.QueueNode.CloudRefreshTimerHandler(Object source, ElapsedEventArgs e)

at System.Threading.Tasks.Task.<>c.b__128_1(Object state)

at System.Threading.ThreadPoolWorkQueue.Dispatch()

at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()

We are unable to catch this exception in our code, and we would like to request that you implement exception handling within your library to prevent such exceptions from crashing the entire application.

To provide more context, below is how we create the session, queue, and subscription:

public async Task Subscribe(Func<IQueueResponse, CancellationToken, Task> callback, CancellationToken ct)

{

_session = PlatformSession.Definition()

    .AppKey(options.AppKey)

    .OAuthGrantType(new GrantPassword()

        .UserName(options.UserName)

        .Password(options.Password))

    .TakeSignonControl(true)

    .GetSession()

    .OnState((state, msg, _) =>

        logger.LogInformation("On session state changed: {state}. {msg}", state, msg))

    .OnEvent((eventCode, msg, _) =>

        logger.LogInformation("Event: {eventCode}. {msg}", eventCode, msg));

var openSessionStatus = await _session.OpenAsync(ct);

if (openSessionStatus != Session.State.Opened)

    logger.LogWarning("Session is not open, status: {status}", openSessionStatus);

var queueDefinition = Queue.Definition(options.Endpoint);

var queueManager = queueDefinition

    .CreateQueueManager()

    .OnError((err, _) => logger.LogError("Error: {error}", err));

var queueCriteria = new JObject {{prop, value}};

var queue = await queueManager.CreateQueueAsync(queueCriteria, Queue.CloudType.AWS, ct);

var subscriber = queueDefinition.CreateAWSSubscriber(queue);

var status = await subscriber.StartPollingAsync((response, _) =>

{

    callback(response, ct).GetAwaiter().GetResult();

});

if (!status)

    logger.LogWarning("Start polling failed");

}

The synchronization point in our application is the subscriber.StartPollingAsync method call. This is where message processing begins, and we are able to wrap this line in a try { } catch { } block to catch exceptions, as shown below:

try

{

await client.Subscribe(async (news, ct) =>

{

    await pipeline.Handle(news, ct); // Perform business logic

}, cancellationToken);

}

catch (Exception ex)

{

switch (ex)

{

    case AmazonSQSException sqsEx:

        // Log the exception

        // Close the current session and queue subscription

        // Re-create the session and re-subscribe to the queue

        break;

    default:

        // Log any other exceptions

        break;

}

}

However, the exception that caused our application to crash was not caught. It appears to have originated elsewhere, leading to the application's unexpected and permanent termination.

That method _session.OpenAsync does not allow us to catch the [ERROR] [253] [EndpointDefinition] exception. If this method were blocking further processing, it would be possible to catch all exceptions related to the session implementation.

To give more context: in our application, we also make HTTP API calls in the following way:

try

{

var response = await EndpointRequest.Definition(endpointUrl).GetDataAsync(ct);

// Perform business logic

}

catch (Exception ex)

{

// Log the exception

}

Under the hood, this static method EndpointRequest.Definition implicitly uses the previously created session.

As we understand, the session functions like a singleton that all parts of your library implicitly rely on. Given the log messages [ERROR] [253] [EndpointDefinition] that were recorded before the crash, it seems the problem might be related to the EndpointRequest.Definition method. However, we did not catch any exceptions log in the try { } catch { } block.

I suspect that the log [ERROR] [253] [EndpointDefinition] originates from the implementation of the EndpointRequest.Definition method. However, in this case, the method was not invoked by our logic but by some internal mechanism in the library responsible for session/token renewal.

I would also like to address the AmazonSQSException, which occasionally occurs in our application. Despite a correct setup, this exception still appears. As we understand it, your library handles the responsibility for token refresh logic, session management, and session recovery. However, we observe situations where this exception occurs, ranging from several times a day to once every few days. When it does happen, we close the old session, unsubscribe from the queue, clean up, and then re-create the session and re-subscribe to the queues. We would like to ask if this is the correct approach and if you could provide any guidance or best practices to avoid these exceptions.

If our approach is correct, this information might help identify areas in your library that could be improved.

Lastly, on a related but separate note, could you provide an overload of the subscriber.StartPollingAsync method that accepts an asynchronous callback, such as:

Task StartPollingAsync(Func<IQueueResponse, CancellationToken, Task> cb);

Currently, the only available method is:

Task StartPollingAsync(Action<IQueueResponse, IQueueSubscriber> cb);

This requires us to create a synchronization point for the callback using GetAwaiter and GetResult like this:

var status = await subscriber.StartPollingAsync((response, _) =>

{

callback(response, ct).GetAwaiter().GetResult();

});

Providing an asynchronous version of this method would simplify our code and could help in better handling exceptions.

— Reply to this email directly, view it on GitHubhttps://github.com/LSEG-API-Samples/Example.DataLibrary.DotNet/issues/2, or unsubscribehttps://github.com/notifications/unsubscribe-auth/BIPBIVIWXS3VB4HZ7VGOK5DZQTYU3AVCNFSM6AAAAABMI2BBP2VHI2DSMVQWIX3LMV43ASLTON2WKOZSGQ2TQMZSGEYTMNA. You are receiving this because you are subscribed to this thread.Message ID: @.***>

Please read these warnings and restrictions: This e-mail transmission is strictly confidential and intended solely for the ordinary user of the e-mail address to which it was addressed. It may contain legally privileged and/or CONFIDENTIAL information. The unauthorised use, disclosure, distribution and/or copying of this e-mail or any information it contains is prohibited and could, in certain circumstances, constitute a criminal offence. If you have received this e-mail in error or are not an intended recipient please inform London Stock Exchange Group (“LSEG”) immediately by return e-mail or telephone 020 7797 1000. LSEG may collect, process and retain your personal information for its business purposes. For more information please see our Privacy Policy. We advise that in keeping with good computing practice the recipient of this e-mail should ensure that it is virus free. We do not accept responsibility for any virus that may be transferred by way of this e-mail. E-mail may be susceptible to data corruption, interception and unauthorised amendment, and we do not accept liability for any such corruption, interception or amendment or any consequences thereof. Calls to London Stock Exchange Group may be recorded to enable LSEG to carry out its regulatory responsibilities. For more details on the LSEG group of companies click here London Stock Exchange Group plc 10 Paternoster Square London EC4M 7LS Registered in England and Wales No 05369106


cristian-baciuwahl-lseg commented 1 month ago

Hello Artur,

When creating a queue using credentials, the refresh of the credentials is done at the level of the queue node using an Endpoint Definition request. When the session is not Open, credentials data (including expiration time for the timer) cannot be refreshed, leading to an Invalid Operation Exception.

Given the info above, I have a clarification question. If you have a platform session open and you are polling messages, what do you expect to happen when the session closes without stopping the polling ?

Other Recommendations In your code, please verify if other functionalities might be closing the session. Also make sure that you create definitions and request data only after the session is Open. When working with queues, it is recommended to stop the polling before stopping the session. Same with streams, our recommendation is to stop any running streams before stopping the session.

Kind reminder to create a post on the Developer Portal, using the “Ask a question” link and following the process. Dev Portal can be found at Top Questions - Forum | Refinitiv Developer Communityhttps://community.developers.refinitiv.com/index.html

Best regards, Baciu Wahl Cristian

From: Artur Wincenciak Teo.Vincent @.> Sent: Friday, August 9, 2024 6:01 PM To: LSEG-API-Samples/Example.DataLibrary.DotNet @.> Cc: Subscribed @.***> Subject: [LSEG-API-Samples/Example.DataLibrary.DotNet] Unhandled Exception in Session Management Causing Application Crash `[ERROR] [EndpointDefinition] GetDataAsync failed for endpoint: https://api.refinitiv.com/auth/cloud-credentials/v1...

EXTERNAL EMAIL

We have encountered an unhandled exception in our application that caused a crash. The log below was recorded just before the crash, and it originates from your library:

[ERROR] [253] [EndpointDefinition] GetDataAsync failed for endpoint: https://api.refinitiv.com/auth/cloud-credentials/v1/https://api.refinitiv.com/auth/cloud-credentials/v1/. Data services unavailable. Session is closed

Unhandled exception. System.InvalidOperationException: Data services unavailable. Session is closed

at Refinitiv.Data.Delivery.Request.EndpointDefinition.GetDataAsync(ISession session, Action`3 cb, CancellationToken cancellationToken)

at Refinitiv.Data.Delivery.Queue.QueueNode.RefreshCloudCredentialsAsync()

at Refinitiv.Data.Delivery.Queue.QueueNode.CloudRefreshTimerHandler(Object source, ElapsedEventArgs e)

at System.Threading.Tasks.Task.<>c.b__128_1(Object state)

at System.Threading.ThreadPoolWorkQueue.Dispatch()

at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()

We are unable to catch this exception in our code, and we would like to request that you implement exception handling within your library to prevent such exceptions from crashing the entire application.

To provide more context, below is how we create the session, queue, and subscription:

public async Task Subscribe(Func<IQueueResponse, CancellationToken, Task> callback, CancellationToken ct)

{

_session = PlatformSession.Definition()

    .AppKey(options.AppKey)

    .OAuthGrantType(new GrantPassword()

        .UserName(options.UserName)

        .Password(options.Password))

    .TakeSignonControl(true)

    .GetSession()

    .OnState((state, msg, _) =>

        logger.LogInformation("On session state changed: {state}. {msg}", state, msg))

    .OnEvent((eventCode, msg, _) =>

        logger.LogInformation("Event: {eventCode}. {msg}", eventCode, msg));

var openSessionStatus = await _session.OpenAsync(ct);

if (openSessionStatus != Session.State.Opened)

    logger.LogWarning("Session is not open, status: {status}", openSessionStatus);

var queueDefinition = Queue.Definition(options.Endpoint);

var queueManager = queueDefinition

    .CreateQueueManager()

    .OnError((err, _) => logger.LogError("Error: {error}", err));

var queueCriteria = new JObject {{prop, value}};

var queue = await queueManager.CreateQueueAsync(queueCriteria, Queue.CloudType.AWS, ct);

var subscriber = queueDefinition.CreateAWSSubscriber(queue);

var status = await subscriber.StartPollingAsync((response, _) =>

{

    callback(response, ct).GetAwaiter().GetResult();

});

if (!status)

    logger.LogWarning("Start polling failed");

}

The synchronization point in our application is the subscriber.StartPollingAsync method call. This is where message processing begins, and we are able to wrap this line in a try { } catch { } block to catch exceptions, as shown below:

try

{

await client.Subscribe(async (news, ct) =>

{

    await pipeline.Handle(news, ct); // Perform business logic

}, cancellationToken);

}

catch (Exception ex)

{

switch (ex)

{

    case AmazonSQSException sqsEx:

        // Log the exception

        // Close the current session and queue subscription

        // Re-create the session and re-subscribe to the queue

        break;

    default:

        // Log any other exceptions

        break;

}

}

However, the exception that caused our application to crash was not caught. It appears to have originated elsewhere, leading to the application's unexpected and permanent termination.

That method _session.OpenAsync does not allow us to catch the [ERROR] [253] [EndpointDefinition] exception. If this method were blocking further processing, it would be possible to catch all exceptions related to the session implementation.

To give more context: in our application, we also make HTTP API calls in the following way:

try

{

var response = await EndpointRequest.Definition(endpointUrl).GetDataAsync(ct);

// Perform business logic

}

catch (Exception ex)

{

// Log the exception

}

Under the hood, this static method EndpointRequest.Definition implicitly uses the previously created session.

As we understand, the session functions like a singleton that all parts of your library implicitly rely on. Given the log messages [ERROR] [253] [EndpointDefinition] that were recorded before the crash, it seems the problem might be related to the EndpointRequest.Definition method. However, we did not catch any exceptions log in the try { } catch { } block.

I suspect that the log [ERROR] [253] [EndpointDefinition] originates from the implementation of the EndpointRequest.Definition method. However, in this case, the method was not invoked by our logic but by some internal mechanism in the library responsible for session/token renewal.

I would also like to address the AmazonSQSException, which occasionally occurs in our application. Despite a correct setup, this exception still appears. As we understand it, your library handles the responsibility for token refresh logic, session management, and session recovery. However, we observe situations where this exception occurs, ranging from several times a day to once every few days. When it does happen, we close the old session, unsubscribe from the queue, clean up, and then re-create the session and re-subscribe to the queues. We would like to ask if this is the correct approach and if you could provide any guidance or best practices to avoid these exceptions.

If our approach is correct, this information might help identify areas in your library that could be improved.

Lastly, on a related but separate note, could you provide an overload of the subscriber.StartPollingAsync method that accepts an asynchronous callback, such as:

Task StartPollingAsync(Func<IQueueResponse, CancellationToken, Task> cb);

Currently, the only available method is:

Task StartPollingAsync(Action<IQueueResponse, IQueueSubscriber> cb);

This requires us to create a synchronization point for the callback using GetAwaiter and GetResult like this:

var status = await subscriber.StartPollingAsync((response, _) =>

{

callback(response, ct).GetAwaiter().GetResult();

});

Providing an asynchronous version of this method would simplify our code and could help in better handling exceptions.

— Reply to this email directly, view it on GitHubhttps://github.com/LSEG-API-Samples/Example.DataLibrary.DotNet/issues/2, or unsubscribehttps://github.com/notifications/unsubscribe-auth/BIPBIVIWXS3VB4HZ7VGOK5DZQTYU3AVCNFSM6AAAAABMI2BBP2VHI2DSMVQWIX3LMV43ASLTON2WKOZSGQ2TQMZSGEYTMNA. You are receiving this because you are subscribed to this thread.Message ID: @.***>

Please read these warnings and restrictions: This e-mail transmission is strictly confidential and intended solely for the ordinary user of the e-mail address to which it was addressed. It may contain legally privileged and/or CONFIDENTIAL information. The unauthorised use, disclosure, distribution and/or copying of this e-mail or any information it contains is prohibited and could, in certain circumstances, constitute a criminal offence. If you have received this e-mail in error or are not an intended recipient please inform London Stock Exchange Group (“LSEG”) immediately by return e-mail or telephone 020 7797 1000. LSEG may collect, process and retain your personal information for its business purposes. For more information please see our Privacy Policy. We advise that in keeping with good computing practice the recipient of this e-mail should ensure that it is virus free. We do not accept responsibility for any virus that may be transferred by way of this e-mail. E-mail may be susceptible to data corruption, interception and unauthorised amendment, and we do not accept liability for any such corruption, interception or amendment or any consequences thereof. Calls to London Stock Exchange Group may be recorded to enable LSEG to carry out its regulatory responsibilities. For more details on the LSEG group of companies click here London Stock Exchange Group plc 10 Paternoster Square London EC4M 7LS Registered in England and Wales No 05369106


ArturWincenciak commented 1 month ago

Hi, thank you for your response and quick reaction. We have created a ticket on the specified portal. Here is the link https://community.developers.refinitiv.com/questions/120419/net-refinitivdata-unhandled-exception-in-session-m.html to the ticket. We will continue the correspondence on the portal as recommended.