zzzprojects / EntityFramework-Extensions

Entity Framework Bulk Operations | Improve Entity Framework performance with Bulk SaveChanges, Insert, update, delete and merge for SQL Server, SQL Azure, SQL Compact, MySQL and SQLite.
https://entityframework-extensions.net
341 stars 57 forks source link

Retry operation for resiliency strategy is not working #596

Open RaHorusFreak opened 1 month ago

RaHorusFreak commented 1 month ago

Description

I have a method for executing several operations against a PostgreSQL database from an API in .Net 8. The database context has configurated the following resiliency strategy:

protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) => optionsBuilder.UseNpgsql(connection, x =>
{
    x.UseNetTopologySuite();
    if(retrier?.Any() == true)
        if(retrier.Full())
            x.EnableRetryOnFailure(retrier.Attempts.Value, TimeSpan.FromSeconds(retrier.NextRetrySeconds), retrier.ErrorCodes);
        else if(retrier.AnyAttemp())
            x.EnableRetryOnFailure(retrier.Attempts.Value);
        else
            x.EnableRetryOnFailure(retrier.ErrorCodes);
});

And also, we configured all EntityFramework-Extensions options to be executed identically:

BulkOperationOptions<IndicatorDatum> bulkOptionsNoReturn = new BulkOperationOptions<IndicatorDatum>()
{
    Transaction = transaction.GetDbTransaction(),
    AutoMapOutputDirection = false
};
bulkOptionsNoReturn._ApplyConnectionRetrier(context.Value.ConnectionRetrier);

public static void _ApplyConnectionRetrier(this BulkOperationOptions options, ConnectionRetrier retrier)
{
    if(retrier?.AnyAttemp() == true)
    {
        options.RetryCount = retrier.Attempts.Value;
        options.RetryInterval = TimeSpan.FromSeconds(retrier.NextRetrySeconds);
    }
}

The next code shows an extract of the method I’m using for deleting a very high amount of data (among other operations):

await context.Value.Database.CreateExecutionStrategy().Execute(async () =>
{
    using(IDbContextTransaction transaction = context.Value.Database.BeginTransaction())
    {
        transaction.CreateSavepoint(crudTransaction);
        try
        {
            bulkOptionsReturn._ApplyConnectionRetrier(context.Value.ConnectionRetrier);
            BulkOperationOptions<IndicatorDatum> bulkOptionsNoReturn = new BulkOperationOptions<IndicatorDatum>()
            {
                Transaction = transaction.GetDbTransaction(),
                AutoMapOutputDirection = false
            };
            bulkOptionsNoReturn._ApplyConnectionRetrier(context.Value.ConnectionRetrier);
            if(removeAll)
                await context.Value.IndicatorData.BulkDeleteAsync(context.Value.IndicatorData.AsNoTracking().Where(idat => idat.IndicatorId == indicatorId), bulkOptionsNoReturn, ct);
            else
            {
                if(geoDomainId != null)
                    foreach(IEnumerable<IndicatorDatum> sublist in dataForInsert.Select(cursor => cursor.subitems.Select(sub => BuildIndicatorDatum(indicatorId, dimDom, cursor.ddoms, cursor.idat.LevelDomainIdIdx2, geoDomainId))).SelectMany(x => x)._LazyBatch(100))
                        await context.Value.IndicatorData.BulkDeleteAsync(context.Value.IndicatorData.AsNoTracking().Where(CreateExistsFilter(indicatorId, null, sublist)), bulkOptionsNoReturn, ct);
                foreach(IEnumerable<IndicatorDatum> sublist in dataForInsert.Select(cursor => BuildIndicatorDatum(indicatorId, dimDom, cursor.ddoms))._LazyBatch(100))
                    await context.Value.IndicatorData.BulkDeleteAsync(context.Value.IndicatorData.AsNoTracking().Where(CreateExistsFilter(indicatorId, null, sublist)), bulkOptionsNoReturn, ct);
                foreach(IEnumerable<IndicatorDatum> sublist in combinations.Select(cursor => BuildIndicatorDatum(indicatorId, dimDom, cursor))._LazyBatch(100))
                    await context.Value.IndicatorData.BulkDeleteAsync(context.Value.IndicatorData.AsNoTracking().Where(CreateExistsFilter(indicatorId, null, sublist)), bulkOptionsNoReturn, ct);
            }
            ...
            transaction.Commit();
        }
        catch
        {
            transaction.RollbackToSavepoint(crudTransaction);
            throw;
        }
    }
});

And the problem is that, in the event of connection failures (random or forced to test), exceptions are always generated, and operations are never retried.

Exception

System.InvalidOperationException: Connection is not open\r\n at Npgsql.ThrowHelper.ThrowInvalidOperationException(String message)\r\n at Npgsql.NpgsqlCommand.ExecuteReader(Boolean async, CommandBehavior behavior, CancellationToken cancellationToken)\r\n at Npgsql.NpgsqlCommand.ExecuteNonQuery(Boolean async, CancellationToken cancellationToken)\r\n at Npgsql.NpgsqlCommand.ExecuteNonQuery()\r\n at Microsoft.EntityFrameworkCore.Storage.RelationalTransaction.RollbackToSavepoint(String name)\r\n at SEAMIND.Datamodels.Logic.Model.IndicatorDatumLogic.Finalization(IDbContextTransaction transaction, Exception exc, List1 logs, Action1 replicator, CancellationToken& ct)\r\n at SEAMIND.Datamodels.Logic.Model.IndicatorDatumLogic.<>cDisplayClass13_0.<b2>d.MoveNext()\r\n--- End of stack trace from previous location ---\r\n at SEAMIND.Datamodels.Logic.Model.IndicatorDatumLogic.ProcessData_INE(Int32 userId, Int32 indicatorId, DateTime date, IndicatorUploadStatus iuStatus, Int32 iuTypeId, IEnumerable1 rows, DDomainEntityCodeContainer dimDom, Boolean isEn, PreserverDictionary2 map, IndicatorImportData importData, Action1 setTotal, Action1 setPart, Action1 setSkip, Action1 replicator, CancellationToken ct)

Placing breakpoints and shutting down the connection to the database server evidences that there's no retry function being executed:

if(geoDomainId != null)
    foreach(IEnumerable<IndicatorDatum> sublist in dataForInsert.Select(cursor => cursor.subitems.Select(sub => BuildIndicatorDatum(indicatorId, dimDom, cursor.ddoms, cursor.idat.LevelDomainIdIdx2, geoDomainId))).SelectMany(x => x)._LazyBatch(100))
        await context.Value.IndicatorData.BulkDeleteAsync(context.Value.IndicatorData.AsNoTracking().Where(CreateExistsFilter(indicatorId, null, sublist)), bulkOptionsNoReturn, ct);
foreach(IEnumerable<IndicatorDatum> sublist in dataForInsert.Select(cursor => BuildIndicatorDatum(indicatorId, dimDom, cursor.ddoms))._LazyBatch(100))
    await context.Value.IndicatorData.BulkDeleteAsync(context.Value.IndicatorData.AsNoTracking().Where(CreateExistsFilter(indicatorId, null, sublist)), bulkOptionsNoReturn, ct);
foreach(IEnumerable<IndicatorDatum> sublist in combinations.Select(cursor => BuildIndicatorDatum(indicatorId, dimDom, cursor))._LazyBatch(100))
    await context.Value.IndicatorData.BulkDeleteAsync(context.Value.IndicatorData.AsNoTracking().Where(CreateExistsFilter(indicatorId, null, sublist)), bulkOptionsNoReturn, ct);

I’ve tried with isolated EF queries and the retry policy is working as expected. I putted ne next code within an infinity loop, and when I closed the database connection, the process starts retrying itself:

var id = context.Value.IndicatorData.ToArray();

Thank you in advance for any help you can provide to me.

JonathanMagnan commented 1 month ago

Hello @RaHorusFreak ,

The Retry logic currently works for Transient errors in SQL Server.

So, it currently does not affect PostgreSQL and indeed confirms your observation about it.

We could change it, but even then, I'm not sure it will 100% cover your scenario as our library just re-try the same operation. It doesn't re-open currently. Try to re-open the connection (which should still be open, I believe, since you are inside a transaction)... We will just re-try the same operation in case of a transient error (or error code you choose)

If that's enough for you, then we can look at adding the support to PostgreSQL as well.

Best Regards,

Jon

RaHorusFreak commented 1 month ago

Hello @JonathanMagnan, thank you very much for your time. We managed to change the logic of the method and we are handling right now the rollback process because of https://github.com/dotnet/efcore/issues/34214. So if the bulk operations are able to retry the same instruction within a transient failure escenario (no transaction involved) for PostgreSQL, it will be great for us. As you are extending from EF, you can try a

context.Database.OpenConnection(); 

at the beginning of each batch, paying attention to the IsTransient property of the DbException that occurred. About handling error codes to consider as Transient, remember that Npgsql for EF Core handles error codes as strings instead of as numbers (traditional method for most DBMS) (https://www.postgresql.org/docs/current/errcodes-appendix.html). Any help will be greatly appreciated. Best regards.

JonathanMagnan commented 1 month ago

Thank you for the additional information,

I cannot promise you anything yet, but we will surely look into it.

Best Regards,

Jon