akkadotnet / Akka.Persistence.Sql

Linq2Db implementation of Akka.Persistence.Sql. Common implementation for SQL Server, Sqlite, Postgres, Oracle, and MySql.
Apache License 2.0
9 stars 9 forks source link

High memory usage #346

Open tudorFrizeru opened 5 months ago

tudorFrizeru commented 5 months ago

Version Information AKKA 1.5.16 Modules: AKKA.Persistence.SQL 1.5.13, AKKA Streams 1.5.16, AKKA Serilog 1.5.12.1

Describe the performance issue After migrating to AKKA 1.5, .NET 8 and the new AKKA.Persistence.SQL, the memory usage has increased to 1 GB (sometimes 2 GB between separate runs, with no changes).

Before the upgrade, the project was running AKKA 1.4.49, .NET 6 and the AKKA.Persistence.SqlServer 1.4.35 and the memory allocation was flattening at 400 MB (mainly due to asp.net core hosting).

I did the upgrade performing the complete database schema upgrade with the migration of Tags to the seaprate table and enabling the UUID in the EventJournal.

Data and Specs

Memory dump, sample persistence configuration and a screenshot with the diagnostic events available here

Expected behavior Similar memory alocation as before

Actual behavior Memory allocation has increased to 1 GB, and doesn't increase further (as opposed to 400 MB before the upgrade), and sometimes for unexplained reasons it caps at 2 GB.

Environment Running on Windows 11, .NET 8, SQL Server 2022, hosting the ActorSystem in an asp.net core environment. Not using the new AKKA Hosting, for historical reasons I create the ActorSystem through a delegate when the asp.net server starts. No Cluster, no Remoting.

Additional context Since the migraiton, I keep seeing this line in the debug console log, every 1-2 seconds:

[22:54:19 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [9] [22:54:21 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [10] [22:54:22 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [11] [22:54:27 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [12] [22:54:29 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [13] [22:54:30 DBG] Start timer [Akka.Persistence.Sql.Query.InternalProtocol.QueryOrderingIdsTimerKey] with generation [14]

I don't know if it is relevant, the Serilog also has a sink to a Loki instance, in addition to the Console sink.

to11mtm commented 5 months ago

@tudorFrizeru I've taken a look at your dump file and it appears that the growth may be in/around unmanaged memory which should not typically be an issue with Akka.Persistence.Sql.

If I had to guess, I'm betting this is related to a couple of bugs in 8.0 around Native thread local storage cleanup (https://github.com/dotnet/runtime/pull/95722) that won't get released till the 2024/02/13 (about a week from now). Examples of other people running into similar issues on .NET8: https://github.com/dotnet/runtime/issues/96581 , https://github.com/dotnet/runtime/issues/95922

Will keep digging in repo code however.

tudorFrizeru commented 5 months ago

@to11mtm thank you for your quick response! I downgraded to .net 6 again and the problem remains the same.

Here's a screenshot with the managed memory allocations. I can upload a new memory dump file if required.

Screenshot from  NET 6
to11mtm commented 5 months ago

@tudorFrizeru If you could that may help, there are a few other possibilities and that may narrow things down.

tudorFrizeru commented 5 months ago

@to11mtm I added 2 new dump files at the same link :

I left them running for a long time, and they both pertain to the .net6 build

to11mtm commented 5 months ago

🤔

Question, is this running in IIS or via console?

tudorFrizeru commented 5 months ago

The application is hosted as a windows service (using TopShelf for window service hosting, running http.sys as a web server). We do not use IIS. The dump files you have seen above is the service running in console mode, from visual studio.

tudorFrizeru commented 4 months ago

@to11mtm FYI, I managed to isolate the problem to the Akka.Persistence.Sql package : I reverted back to using the Akka.persistence.SqlServer, with .net 8, and I have no memory issues anymore.

I will be happy to test again once there's a fix available! Thank you for your support and for your great work!

to11mtm commented 4 months ago

@to11mtm FYI, I managed to isolate the problem to the Akka.Persistence.Sql package : I reverted back to using the Akka.persistence.SqlServer, with .net 8, and I have no memory issues anymore.

I will be happy to test again once there's a fix available! Thank you for your support and for your great work!

Thank you for that feedback!

I do have a PR up that I believe will solve some issues.

Was there anything in your app doing an EventsByTag query with a lot of existing 'in-filter' events (i.e. Thousands to tens of thousands) present after the starting offset? One issue I found, there was a missing limit on EventsByTag query, so cases like what I mentioned would potentially cause a lot of pressure on the Large Object Heap (Which would explain, likely quite a bit!)

Aaronontheweb commented 4 months ago

@to11mtm has worked on some memory optimization improvements here around closures: https://github.com/akkadotnet/Akka.Persistence.Sql/pull/347

MagusDraconis commented 4 months ago

Hello everyone, I encountered a similar issue and managed to resolve it in my scenario. The problem stemmed from loading Entity Framework 8's DbContext in the constructor.

To avoid this, refrain from utilizing the DbContext property directly within the class. Instead, opt to use it within methods.

The recommended approach is to instantiate a new DbContext within each method call. You can achieve this either by creating a new instance explicitly (new DbContext()), or by retrieving it from the services (services.GetService()).

This practice ensures that each method operates with a fresh DbContext, preventing potential conflicts and ensuring better management of resources.

Aaronontheweb commented 4 months ago

The problem stemmed from loading Entity Framework 8's DbContext in the constructor.

What does that have to do with this project? We use Linq2Db, not EF.

MagusDraconis commented 4 months ago

The problem stemmed from loading Entity Framework 8's DbContext in the constructor.

What does that have to do with this project? We use Linq2Db, not EF.

I think it has nothing to do with the Entity Framework, but with Net8.0 and the GC. In my case, the dbContext cache was not released by the GC as long as the class was used. In net4.8 it worked as expected. I thought there might be a similar problem with Linq2Db.

anpin commented 2 months ago

I think in the Query DAO all callbacks for DbStateHolder.ExecuteWithTransactionAsync could be made static which should reduce allocations.

Some of them already are static https://github.com/akkadotnet/Akka.Persistence.Sql/blob/fe49a6bd028763d99b07c889d787d1ffa58acb81/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs#L117

While others are not https://github.com/akkadotnet/Akka.Persistence.Sql/blob/fe49a6bd028763d99b07c889d787d1ffa58acb81/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs#L151

https://github.com/akkadotnet/Akka.Persistence.Sql/blob/fe49a6bd028763d99b07c889d787d1ffa58acb81/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs#L60

It seems to me that ExecuteWithTransactionAsync was specifically designed to accept transient state argument and pass it to a static query

to11mtm commented 2 months ago

It seems to me that ExecuteWithTransactionAsync was specifically designed to accept transient state argument and pass it to a static query.

Agreed and I thought I started working on fixing this at one point. >_<

@anpin If you'd like to submit a PR I'll happily review, or I can try to check my branches (switched laptops over weekend so a bit disorganized at the moment, apologies.)

Another thing that needs to be changed, is the default number of sequences grabbed in a fetch.

At one point we were grabbing up to 10,000 ordering values at a time in JournalSequenceActor, which has a tendency to pollute the LOH. I thought I fixed that... but if the statics are still there, that might be missing as well.

tl;dr - max ordering per query should be a number where the final array won't pollute LOH. given the size, I'd say 2000 is a safer-ish number in general, around 32Kib. 4000 might be OK as well but eh?

anpin commented 2 months ago

if you'd like to submit a PR I'll happily review

I definitely should, but need to run benchmarks first

anpin commented 2 months ago

tried to make these calls static, but it doesn't seem to make any difference. In fact even makes it worse.


BenchmarkDotNet v0.13.12, NixOS 24.05 (Uakari)
Intel Xeon W-10855M CPU 2.80GHz, 1 CPU, 12 logical and 6 physical cores
.NET SDK 8.0.203
  [Host]     : .NET 8.0.3 (8.0.324.11423), X64 RyuJIT AVX2
  DefaultJob : .NET 8.0.3 (8.0.324.11423), X64 RyuJIT AVX2

Before

Method TagMode Mean Error StdDev Gen0 Gen1 Allocated
QueryByTag10 Csv 1,445.347 ms 15.4952 ms 14.4943 ms - - 399.56 KB
QueryByTag100 Csv 1,455.931 ms 14.5265 ms 13.5881 ms - - 1368.77 KB
QueryByTag1000 Csv 1,431.524 ms 9.0378 ms 8.4540 ms - - 10592.08 KB
QueryByTag10000 Csv 1,522.753 ms 10.4946 ms 9.8167 ms 1000.0000 - 102297.37 KB
QueryByTag10 TagTable 1.426 ms 0.0225 ms 0.0188 ms 3.9063 - 344.6 KB
QueryByTag100 TagTable 2.530 ms 0.0415 ms 0.0388 ms 7.8125 - 1254.35 KB
QueryByTag1000 TagTable 15.248 ms 0.2141 ms 0.2003 ms 125.0000 62.5000 10584.34 KB
QueryByTag10000 TagTable 145.401 ms 2.9079 ms 4.6957 ms 1000.0000 - 102578.96 KB

After

Method TagMode Mean Error StdDev Gen0 Gen1 Allocated
QueryByTag10 Csv 1,442.252 ms 13.4158 ms 12.5492 ms - - 399.33 KB
QueryByTag100 Csv 1,442.605 ms 8.8208 ms 7.8194 ms - - 1307.99 KB
QueryByTag1000 Csv 1,430.370 ms 11.1805 ms 10.4582 ms - - 10653.91 KB
QueryByTag10000 Csv 1,530.338 ms 11.4300 ms 10.6916 ms 1000.0000 - 102245.6 KB
QueryByTag10 TagTable 1.421 ms 0.0269 ms 0.0251 ms 3.9063 - 344.39 KB
QueryByTag100 TagTable 2.489 ms 0.0311 ms 0.0276 ms 7.8125 - 1256.02 KB
QueryByTag1000 TagTable 14.875 ms 0.2083 ms 0.1948 ms 125.0000 62.5000 10581.61 KB
QueryByTag10000 TagTable 149.890 ms 2.9124 ms 3.9865 ms 1000.0000 - 102516.67 KB

Changes

diff --git a/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs b/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
index d062b73..512a3bf 100644
--- a/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
+++ b/src/Akka.Persistence.Sql/Query/Dao/BaseByteReadArrayJournalDao.cs
@@ -47,17 +47,14 @@ namespace Akka.Persistence.Sql.Query.Dao
             _deserializeFlow = serializer.DeserializeFlow();
         }

-        public Source<string, NotUsed> AllPersistenceIdsSource(long max)
-        {
-            var maxTake = MaxTake(max);
-
-            return AsyncSource<string>.FromEnumerable(
-                new { _dbStateHolder, maxTake },
-                static async input =>
+        public Source<string, NotUsed> AllPersistenceIdsSource(long max) => 
+            AsyncSource<string>.FromEnumerable(
+                new { _dbStateHolder, maxTake = MaxTake(max) },
+                static async  input =>
                 {
                     return await input._dbStateHolder.ExecuteWithTransactionAsync(
                         input.maxTake,
-                        async (connection, token,take) =>
+                        static async (connection, token, take) =>
                         {
                             return await connection
                                 .GetTable<JournalRow>()
@@ -68,7 +65,6 @@ namespace Akka.Persistence.Sql.Query.Dao
                                 .ToListAsync(token);
                         });
                 });
-        }

         public Source<Try<(IPersistentRepresentation, IImmutableSet<string>, long)>, NotUsed> EventsByTag(
             string tag,
@@ -83,11 +79,9 @@ namespace Akka.Persistence.Sql.Query.Dao
             {
                 TagMode.Csv => AsyncSource<JournalRow>
                     .FromEnumerable(
-                        new { args= new QueryArgs(offset,maxOffset,maxTake,
-                            $"{separator}{tag}{separator}"), _dbStateHolder },
+                        new { _dbStateHolder, args = new QueryArgs(offset,maxOffset,maxTake,$"{separator}{tag}{separator}") },
                         static async input =>
                         {
-                            //var tagValue = input.tag;
                             return await input._dbStateHolder.ExecuteWithTransactionAsync(
                                 input.args,
                                 static async (connection, token, inVals) =>
@@ -109,7 +103,7 @@ namespace Akka.Persistence.Sql.Query.Dao

                 TagMode.TagTable => AsyncSource<JournalRow>
                     .FromEnumerable(
-                        new { _dbStateHolder, args= new QueryArgs(offset,maxOffset,maxTake,tag)},
+                        new {_dbStateHolder, args = new QueryArgs(offset,maxOffset,maxTake,tag) },
                         static async input =>
                         {
                             return await input._dbStateHolder.ExecuteWithTransactionAsync(
@@ -143,12 +137,12 @@ namespace Akka.Persistence.Sql.Query.Dao
             => Task.FromResult(
                 AsyncSource<JournalRow>
                     .FromEnumerable(
-                        new {  persistenceId, fromSequenceNr, toSequenceNr, toTake = MaxTake(max), _dbStateHolder },
+                        new { _dbStateHolder, persistenceId, fromSequenceNr, toSequenceNr,  toTake = MaxTake(max) },
                         static async state =>
                         {
                             return await state._dbStateHolder.ExecuteWithTransactionAsync(
                                 state,
-                                async (connection, token, txState) =>
+                                static async (connection, token, txState) =>
                                 {
                                     var query = connection
                                         .GetTable<JournalRow>()
@@ -182,7 +176,7 @@ namespace Akka.Persistence.Sql.Query.Dao
         public Source<long, NotUsed> JournalSequence(long offset, long limit)
         {
             return AsyncSource<long>.FromEnumerable(
-                new { maxTake = MaxTake(limit), offset, _dbStateHolder },
+                new {_dbStateHolder,  offset, maxTake = MaxTake(limit) },
                 async input =>
                 {
                     return await input._dbStateHolder.ExecuteWithTransactionAsync(
@@ -207,7 +201,7 @@ namespace Akka.Persistence.Sql.Query.Dao
             return await ConnectionFactory.ExecuteWithTransactionAsync(
                 ReadIsolationLevel,
                 ShutdownToken,
-                async (connection, token) =>
+                static async (connection, token) =>
                 {
                     // persistence-jdbc does not filter deleted here.
                     var result = await connection
@@ -233,18 +227,15 @@ namespace Akka.Persistence.Sql.Query.Dao
             var maxTake = MaxTake(max);

             return AsyncSource<JournalRow>.FromEnumerable(
-                new {_dbStateHolder , args=new QueryArgs(offset,maxOffset,maxTake) },
-                static async input =>
-                {
-                    return await ExecuteEventQuery(input._dbStateHolder, input._dbStateHolder.Mode, input.args);
-                }
-            ).Via(_deserializeFlow);
+                new {_dbStateHolder , args =  new QueryArgs(offset,maxOffset,maxTake) },
+                  static args => ExecuteEventQuery(args._dbStateHolder,args.args)
+                ).Via(_deserializeFlow);
         }
-        
-        
-        internal static async Task<List<JournalRow>> ExecuteEventQuery(DbStateHolder stateHolder, TagMode tagMode, QueryArgs queryArgs)
+
+
+        internal static async Task<IEnumerable<JournalRow>> ExecuteEventQuery(DbStateHolder stateHolder, QueryArgs queryArgs)
         {
-            return tagMode != TagMode.TagTable
+            return stateHolder.Mode != TagMode.TagTable
                 ? await ExecuteEventQueryNonTagTable(stateHolder, queryArgs)
                 : await ExecuteEventQueryTagTable(stateHolder, queryArgs);
         }