xavierjefferson / Snork.AspNet.SignalR.FluentNHibernate

This is a SignalR backplane that works with FluentNHibernate-compatible RDBMS engines.
GNU Lesser General Public License v3.0
2 stars 3 forks source link

Invoking SignalR Function throws GenericADOException PostgreSQL #2

Open leutenecker opened 4 years ago

leutenecker commented 4 years ago

Hello Xavier, I am evaluating your great backplane code for classic SignalR. I am using a PostgreSQL DB, all tables are created the backplane is working, signalr messages are synchronizing between two running instances of my application (an existing console app with SignalR already working). The problem seems to arise if each of the instance is trying to invoke the same signalr function at the same time then the following exception is thrown from your nuget:

NHibernate.Exceptions.GenericADOException HResult=0x80131600 Nachricht = could not update: [Snork.AspNet.SignalR.FluentNHibernate.Domain.Messages_0_Id#1][SQL: UPDATE SignalR_Messages_0_Id SET PayloadId = ? WHERE RowId = ?] Quelle = NHibernate Stapelüberwachung: bei NHibernate.Persister.Entity.AbstractEntityPersister.Update(Object id, Object[] fields, Object[] oldFields, Object rowId, Boolean[] includeProperty, Int32 j, Object oldVersion, Object obj, SqlCommandInfo sql, ISessionImplementor session) bei NHibernate.Persister.Entity.AbstractEntityPersister.UpdateOrInsert(Object id, Object[] fields, Object[] oldFields, Object rowId, Boolean[] includeProperty, Int32 j, Object oldVersion, Object obj, SqlCommandInfo sql, ISessionImplementor session) bei NHibernate.Persister.Entity.AbstractEntityPersister.Update(Object id, Object[] fields, Int32[] dirtyFields, Boolean hasDirtyCollection, Object[] oldFields, Object oldVersion, Object obj, Object rowId, ISessionImplementor session) bei NHibernate.Action.EntityUpdateAction.Execute() bei NHibernate.Engine.ActionQueue.Execute(IExecutable executable) bei NHibernate.Engine.ActionQueue.ExecuteActions(IList list) bei NHibernate.Engine.ActionQueue.ExecuteActions() bei NHibernate.Event.Default.AbstractFlushingEventListener.PerformExecutions(IEventSource session) bei NHibernate.Event.Default.DefaultFlushEventListener.OnFlush(FlushEvent event) bei NHibernate.Impl.SessionImpl.Flush() bei NHibernate.Transaction.AdoTransaction.Commit() bei Snork.AspNet.SignalR.FluentNHibernate.FNHSender2.InsertMessage(IList1 messages) in E:\users\xavierj\repos\Snork.AspNet.SignalR.FluentNHibernate\Snork.AspNet.SignalR.FluentNHibernate\FNHSender.cs: Zeile104 bei Snork.AspNet.SignalR.FluentNHibernate.FNHSender2.Send(IList1 messages) in E:\users\xavierj\repos\Snork.AspNet.SignalR.FluentNHibernate\Snork.AspNet.SignalR.FluentNHibernate\FNHSender.cs: Zeile33 bei Snork.AspNet.SignalR.FluentNHibernate.FNHStream2.Send(IList1 messages) in E:\users\xavierj\repos\Snork.AspNet.SignalR.FluentNHibernate\Snork.AspNet.SignalR.FluentNHibernate\FNHStream.cs: Zeile62 bei Snork.AspNet.SignalR.FluentNHibernate.FNHMessageBus.Send(Int32 streamIndex, IList`1 messages) in E:\users\xavierj\repos\Snork.AspNet.SignalR.FluentNHibernate\Snork.AspNet.SignalR.FluentNHibernate\FNHMessageBus.cs: Zeile47 bei Microsoft.AspNet.SignalR.Messaging.ScaleoutStreamManager.Send(Object state) bei Microsoft.AspNet.SignalR.Messaging.ScaleoutStreamManager.<>c.b__12_0(Object state) bei Microsoft.AspNet.SignalR.Messaging.ScaleoutStream.SendContext.InvokeSend()

Diese Ausnahme wurde ursprünglich von dieser Aufrufliste ausgelöst: Npgsql.NpgsqlConnector.ReadMessage.ReadMessageLong|0(Npgsql.DataRowLoadingMode, bool, bool) System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw() Npgsql.NpgsqlConnector.ReadMessage.ReadMessageLong|0(Npgsql.DataRowLoadingMode, bool, bool) System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(System.Threading.Tasks.Task) System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(System.Threading.Tasks.Task) Npgsql.NpgsqlDataReader.NextResult(bool, bool) System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(System.Threading.Tasks.Task) System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(System.Threading.Tasks.Task) Npgsql.NpgsqlDataReader.NextResult() Npgsql.NpgsqlCommand.ExecuteDbDataReader(System.Data.CommandBehavior, bool, System.Threading.CancellationToken) ... [Aufrufliste abgeschnitten]

Inner Exception 1: PostgresException: 40001: could not serialize access due to concurrent update

Any ideas about this problem? Greetings, Moritz

leutenecker commented 4 years ago

Hi, I think I resolved the issue myself. The problem arises with PostgreSQL Database and Isolation level RepeatableRead in combination with the PayloadId in FNHSender.cs as soon as tx.Commit() is executed. If another instance has already inserted the same PayloadId then an NHibernate.ADOException is thrown due to the unique index of the PayloadId. To compensate that I simply retry the transaction part. The patch is commented in code below with begin/end:

        private int InsertMessage(IList<Message> messages)
        {
            try
            {
                _trace.TraceVerbose("inserting");
                int result;
                using (var session = _sessionFactory.OpenSession())
                {
                    long newPayloadId = 0;

                    // patch begins

                    bool retryInsert = false;

                    do
                    {
                        retryInsert = false;

                        using (var tx = session.BeginTransaction(IsolationLevel.RepeatableRead))
                        {
                            try
                            {
                                var messageId = session.Query<TIdType>().FirstOrDefault();
                                if (messageId == null)
                                {
                                    messageId = new TIdType { PayloadId = 1, RowId = 1 };
                                    session.Save(messageId);
                                }
                                else
                                {
                                    messageId.PayloadId++;
                                    session.Save(messageId);
                                }
                                newPayloadId = messageId.PayloadId;
                                session.Save(new TMessageType
                                {
                                    Payload = FNHPayload.ToBytes(messages),
                                    PayloadId = newPayloadId
                                });
                                tx.Commit();
                            }
                            catch (Exception exS)
                            {
                                if (exS.InnerException != null && exS is NHibernate.ADOException)
                                {
                                    // PostgreSQL IsolationLevel RepeatableRead throws exception:
                                    // PayloadId assignment failed due to concurrent writing in transaction and primary key unique index
                                    // Resolution: just retry the insert transaction
                                    retryInsert = true;
                                }
                            }
                        }
                    }
                    while (retryInsert == true);

                    // patch ends

                    result = 1;
                    var maxTableSize = 10000;
                    var blockSize = 2500;
                    if (newPayloadId % blockSize == 0)
                    {
                        using (var tx = session.BeginTransaction(IsolationLevel.ReadCommitted))
                        {
                            var queryable = session.Query<TMessageType>();
                            var aggregates = queryable
                                .Select(m => new {Count = queryable.Count(), Min = queryable.Min(i => i.PayloadId)})
                                .First();

                            var rowCount = aggregates.Count;
                            var minPayloadId = aggregates.Min;
                            if (rowCount > maxTableSize)
                            {
                                var overMaxBy = rowCount - maxTableSize;
                                var endPayloadId = minPayloadId + blockSize - overMaxBy;
                                var sql = string.Format("delete from `{0}` where {1} between :min and :max",
                                    typeof(TMessageType).Name,
                                    nameof(MessagesItemBase.PayloadId));
                                result = session.CreateQuery(sql)
                                    .SetParameter("min", minPayloadId)
                                    .SetParameter("max", endPayloadId)
                                    .ExecuteUpdate();
                            }
                            tx.Commit();
                        }
                    }
                }
                return result;
            }
            catch (Exception ex)
            {
                _trace.TraceError("Issue with send", ex);
                throw;
            }
        }

I hope this will help other people too using your great library.

Xavier, do you see any drawbacks or improvements to this workaround?

Greetings Moritz