akkadotnet / Akka.Persistence.Sql

Linq2Db implementation of Akka.Persistence.Sql. Common implementation for SQL Server, Sqlite, Postgres, Oracle, and MySql.
Apache License 2.0
11 stars 11 forks source link

Running custom journals does not work with Akka.Persistence.Query #403

Closed Aaronontheweb closed 3 months ago

Aaronontheweb commented 3 months ago

Version Information Version of Akka.NET? latest Which Akka.NET Modules? Akka.Persistence.Sql

Describe the bug

When using the following configuration:

  builder.WithSqlPersistence(
            autoInitialize: true,
            connectionString: connectionStringForJournalAndSnapshots,
            isDefaultPlugin: false,
            providerName: ProviderName.PostgreSQL,
            pluginIdentifier: OwnershipReadJournal.TYPE_NAME,
            journalBuilder: journalBuilder =>
            {
                journalBuilder.AddWriteEventAdapter<OwnershipDomainTagger>("ownership-domain-tagger",
                    new[] { typeof(IOwnershipDomain) });
            });

The HOCON ends up looking like this, which is slightly wrong / weird

    persistence : {
      query : {
        journal : {
          ownership : {
            class : "Akka.Persistence.Sql.Query.SqlReadJournalProvider, Akka.Persistence.Sql"
            write-plugin : 
            max-buffer-size : 500
            refresh-interval : 1s
            connection-string : 
            tag-read-mode : TagTable
            journal-sequence-retrieval : {
              batch-size : 10000
              max-tries : 10
              query-delay : 1s
              max-backoff-query-delay : 60s
              ask-timeout : 1s
            }
            provider-name : 
            table-mapping : default
            buffer-size : 5000
            batch-size : 100
            replay-batch-size : 1000
            parallelism : 3
            max-row-by-row-size : 100
            use-clone-connection : true
            tag-separator : ;
            read-isolation-level : unspecified
            dao : "Akka.Persistence.Sql.Journal.Dao.ByteArrayJournalDao, Akka.Persistence.Sql"
            default : {
              schema-name : 
              journal : {
                use-writer-uuid-column : true
                table-name : journal
                columns : {
                  ordering : ordering
                  deleted : deleted
                  persistence-id : persistence_id
                  sequence-number : sequence_number
                  created : created
                  tags : tags
                  message : message
                  identifier : identifier
                  manifest : manifest
                  writer-uuid : writer_uuid
                }
              }
              metadata : {
                table-name : journal_metadata
                columns : {
                  persistence-id : persistence_id
                  sequence-number : sequence_number
                }
              }
              tag : {
                table-name : tags
                columns : {
                  ordering-id : ordering_id
                  tag-value : tag
                  persistence-id : persistence_id
                  sequence-nr : sequence_nr
                }
              }
            }
            sql-server : {
              schema-name : dbo
              journal : {
                use-writer-uuid-column : false
                table-name : EventJournal
                columns : {
                  ordering : Ordering
                  deleted : IsDeleted
                  persistence-id : PersistenceId
                  sequence-number : SequenceNr
                  created : Timestamp
                  tags : Tags
                  message : Payload
                  identifier : SerializerId
                  manifest : Manifest
                }
              }
              metadata : {
                table-name : Metadata
                columns : {
                  persistence-id : PersistenceId
                  sequence-number : SequenceNr
                }
              }
            }
            sqlite : {
              schema-name : 
              journal : {
                use-writer-uuid-column : false
                table-name : event_journal
                columns : {
                  ordering : ordering
                  deleted : is_deleted
                  persistence-id : persistence_id
                  sequence-number : sequence_nr
                  created : timestamp
                  tags : tags
                  message : payload
                  identifier : serializer_id
                  manifest : manifest
                }
              }
              metadata : {
                table-name : journal_metadata
                columns : {
                  persistence-id : persistence_id
                  sequence-number : sequence_nr
                }
              }
            }
            postgresql : {
              schema-name : public
              journal : {
                use-writer-uuid-column : false
                table-name : event_journal
                columns : {
                  ordering : ordering
                  deleted : is_deleted
                  persistence-id : persistence_id
                  sequence-number : sequence_nr
                  created : created_at
                  tags : tags
                  message : payload
                  identifier : serializer_id
                  manifest : manifest
                }
              }
              metadata : {
                table-name : metadata
                columns : {
                  persistence-id : persistence_id
                  sequence-number : sequence_nr
                }
              }
            }
            mysql : {
              schema-name : 
              journal : {
                use-writer-uuid-column : false
                table-name : event_journal
                columns : {
                  ordering : ordering
                  deleted : is_deleted
                  persistence-id : persistence_id
                  sequence-number : sequence_nr
                  created : created_at
                  tags : tags
                  message : payload
                  identifier : serializer_id
                  manifest : manifest
                }
              }
              metadata : {
                table-name : metadata
                columns : {
                  persistence-id : persistence_id
                  sequence-number : sequence_nr
                }
              }
            }
          }
        }
      }
      snapshot-store : {
        ownership : {
          class : "Akka.Persistence.Sql.Snapshot.SqlSnapshotStore, Akka.Persistence.Sql"
          plugin-dispatcher : akka.persistence.dispatchers.default-plugin-dispatcher
          connection-string : "Host=localhost;Database=phoenix-persistence-ownership-development;Username=postgres;Password=postgres;Port=5432;Maximum Pool Size=100;Pooling=True"
          provider-name : PostgreSQL
          use-clone-connection : true
          table-mapping : default
          serializer : 
          dao : "Akka.Persistence.Sql.Snapshot.ByteArraySnapshotDao, Akka.Persistence.Sql"
          auto-initialize : on
          warn-on-auto-init-fail : true
          read-isolation-level : unspecified
          write-isolation-level : unspecified
          default : {
            schema-name : 
            snapshot : {
              table-name : snapshot
              columns : {
                persistence-id : persistence_id
                sequence-number : sequence_number
                created : created
                snapshot : snapshot
                manifest : manifest
                serializerId : serializer_id
              }
            }
          }
          sql-server : {
            schema-name : dbo
            snapshot : {
              table-name : SnapshotStore
              columns : {
                persistence-id : PersistenceId
                sequence-number : SequenceNr
                created : Timestamp
                snapshot : Snapshot
                manifest : Manifest
                serializerId : SerializerId
              }
            }
          }
          sqlite : {
            schema-name : 
            snapshot : {
              table-name : snapshot
              columns : {
                persistence-id : persistence_id
                sequence-number : sequence_nr
                snapshot : payload
                manifest : manifest
                created : created_at
                serializerId : serializer_id
              }
            }
          }
          postgresql : {
            schema-name : public
            snapshot : {
              table-name : snapshot_store
              columns : {
                persistence-id : persistence_id
                sequence-number : sequence_nr
                snapshot : payload
                manifest : manifest
                created : created_at
                serializerId : serializer_id
              }
            }
          }
          mysql : {
            schema-name : 
            snapshot : {
              table-name : snapshot_store
              columns : {
                persistence-id : persistence_id
                sequence-number : sequence_nr
                snapshot : snapshot
                manifest : manifest
                created : created_at
                serializerId : serializer_id
              }
            }
          }
        }
      }
      journal : {
        ownership : {
          class : "Akka.Persistence.Sql.Journal.SqlWriteJournal, Akka.Persistence.Sql"
          plugin-dispatcher : akka.persistence.dispatchers.default-plugin-dispatcher
          connection-string : "Host=localhost;Database=phoenix-persistence-ownership-development;Username=postgres;Password=postgres;Port=5432;Maximum Pool Size=100;Pooling=True"
          provider-name : PostgreSQL
          delete-compatibility-mode : false
          table-mapping : default
          buffer-size : 5000
          batch-size : 100
          db-round-trip-max-batch-size : 1000
          prefer-parameters-on-multirow-insert : false
          replay-batch-size : 1000
          parallelism : 3
          max-row-by-row-size : 100
          use-clone-connection : true
          materializer-dispatcher : akka.actor.default-dispatcher
          tag-write-mode : TagTable
          tag-separator : ;
          auto-initialize : on
          warn-on-auto-init-fail : true
          dao : "Akka.Persistence.Sql.Journal.Dao.ByteArrayJournalDao, Akka.Persistence.Sql"
          serializer : 
          read-isolation-level : unspecified
          write-isolation-level : unspecified
          default : {
            schema-name : 
            journal : {
              use-writer-uuid-column : true
              table-name : journal
              columns : {
                ordering : ordering
                deleted : deleted
                persistence-id : persistence_id
                sequence-number : sequence_number
                created : created
                tags : tags
                message : message
                identifier : identifier
                manifest : manifest
                writer-uuid : writer_uuid
              }
            }
            metadata : {
              table-name : journal_metadata
              columns : {
                persistence-id : persistence_id
                sequence-number : sequence_number
              }
            }
            tag : {
              table-name : tags
              columns : {
                ordering-id : ordering_id
                tag-value : tag
                persistence-id : persistence_id
                sequence-nr : sequence_nr
              }
            }
          }
          sql-server : {
            schema-name : dbo
            journal : {
              use-writer-uuid-column : false
              table-name : EventJournal
              columns : {
                ordering : Ordering
                deleted : IsDeleted
                persistence-id : PersistenceId
                sequence-number : SequenceNr
                created : Timestamp
                tags : Tags
                message : Payload
                identifier : SerializerId
                manifest : Manifest
              }
            }
            metadata : {
              table-name : Metadata
              columns : {
                persistence-id : PersistenceId
                sequence-number : SequenceNr
              }
            }
          }
          sqlserver : {
            schema-name : dbo
            journal : {
              use-writer-uuid-column : false
              table-name : EventJournal
              columns : {
                ordering : Ordering
                deleted : IsDeleted
                persistence-id : PersistenceId
                sequence-number : SequenceNr
                created : Timestamp
                tags : Tags
                message : Payload
                identifier : SerializerId
                manifest : Manifest
              }
            }
            metadata : {
              table-name : Metadata
              columns : {
                persistence-id : PersistenceId
                sequence-number : SequenceNr
              }
            }
          }
          sqlite : {
            schema-name : 
            journal : {
              use-writer-uuid-column : false
              table-name : event_journal
              columns : {
                ordering : ordering
                deleted : is_deleted
                persistence-id : persistence_id
                sequence-number : sequence_nr
                created : timestamp
                tags : tags
                message : payload
                identifier : serializer_id
                manifest : manifest
              }
            }
            metadata : {
              table-name : journal_metadata
              columns : {
                persistence-id : persistence_id
                sequence-number : sequence_nr
              }
            }
          }
          postgresql : {
            schema-name : public
            journal : {
              use-writer-uuid-column : false
              table-name : event_journal
              columns : {
                ordering : ordering
                deleted : is_deleted
                persistence-id : persistence_id
                sequence-number : sequence_nr
                created : created_at
                tags : tags
                message : payload
                identifier : serializer_id
                manifest : manifest
              }
            }
            metadata : {
              table-name : metadata
              columns : {
                persistence-id : persistence_id
                sequence-number : sequence_nr
              }
            }
          }
          mysql : {
            schema-name : 
            journal : {
              use-writer-uuid-column : false
              table-name : event_journal
              columns : {
                ordering : ordering
                deleted : is_deleted
                persistence-id : persistence_id
                sequence-number : sequence_nr
                created : created_at
                tags : tags
                message : payload
                identifier : serializer_id
                manifest : manifest
              }
            }
            metadata : {
              table-name : metadata
              columns : {
                persistence-id : persistence_id
                sequence-number : sequence_nr
              }
            }
          }
          event-adapters : {
            ownership-domain-tagger : "netfile.phoenix.ownership.actors.OwnershipDomainTagger, netfile.phoenix.ownership.actors"
          }
          event-adapter-bindings : {
            "netfile.phoenix.ownership.domain.IOwnershipDomain, netfile.phoenix.ownership.domain" : [ownership-domain-tagger]
          }
        }
      }
    }

The suspicious looking part are the Akka.Persistence.Query settings - looks like there is no Provider or connection string.

[11:13:40 ERR] An exception occured inside SelectAsync while executing Task. Supervision strategy: Stop
System.AggregateException: One or more errors occurred. (DataProvider '' not found.)
 ---> LinqToDB.LinqToDBException: DataProvider '' not found.
   at LinqToDB.Data.DataConnection.GetDataProviderEx(String providerName, String connectionString)
   at LinqToDB.Data.DataConnection.ConfigurationApplier.Apply(DataConnection dataConnection, ConnectionOptions options)
   at LinqToDB.Data.ConnectionOptions.LinqToDB.Common.IApplicable<LinqToDB.Data.DataConnection>.Apply(DataConnection obj)
   at LinqToDB.DataOptions.Apply(DataConnection dataConnection)
   at LinqToDB.Data.DataConnection..ctor(DataOptions options)
   at Akka.Persistence.Sql.Db.AkkaPersistenceDataConnectionFactory.<>c__DisplayClass4_0.<.ctor>b__0()
   at System.Lazy`1.ViaFactory(LazyThreadSafetyMode mode)
--- End of stack trace from previous location ---
   at System.Lazy`1.CreateValue()
   at Akka.Persistence.Sql.Db.AkkaPersistenceDataConnectionFactory.GetConnection()
   at Akka.Persistence.Sql.Extensions.ConnectionFactoryExtensions.ExecuteWithTransactionAsync[TState,T](AkkaPersistenceDataConnectionFactory factory, TState state, IsolationLevel level, CancellationToken token, Func`4 handler)
   at Akka.Persistence.Sql.Query.Dao.BaseByteReadArrayJournalDao.<>c.<<JournalSequence>b__7_0>d.MoveNext()
   --- End of inner exception stack trace ---
   at Akka.Actor.PipeToSupport.PipeTo[T](Task`1 taskToPipe, ICanTell recipient, Boolean useConfigureAwait, IActorRef sender, Func`2 success, Func`2 failure)

This happens while attempting to run Akka.Persistence.Query to do projections.

Expected behavior

The non-default journal should be able to run queries normally.

// uses the same custom persistence id, but fully-qualified "akka.persistence.query.journal.ownershp"
var readJournal = Context.System.ReadJournalFor<SqlReadJournal>(OwnershipReadJournal.Identifier);;

Actual behavior

Crashes at startup

Additional context

Worth noting: this code also crashes when we have a default Akka.Persistence.Sql journal specified in addition to this custom journal. When we make our custom journal the default, everything works.

Aaronontheweb commented 3 months ago

We need two tests:

  1. That a custom journal can run, including snapshots and Query, without a default journal being specified
  2. That multiple custom journals can run independently - their own snapshot stores, metadata tables, event journals, and we need to test Akka.Persistence.Query in those scenarios too