zilverline / sequent

CQRS & event sourcing framework for Ruby
https://sequent.io
MIT License
541 stars 58 forks source link

Impossibility to set number_of_replay_processes more than 1 #383

Closed smilyalexey closed 1 year ago

smilyalexey commented 1 year ago

Hey there. Here is another one issue that I briefly mentioned here https://github.com/zilverline/sequent/pull/376#issuecomment-1604367235

I have right now more than 1M events and running online migration on production is a big pain. I can see that initially number_of_replay_processes equals 4. When I run it I can see errors like this:

D, [2023-07-10T11:14:52.470193 #33] DEBUG -- :   TRANSACTION (5.7ms)  COMMIT
D, [2023-07-10T11:14:52.473739 #33] DEBUG -- :    (2.4ms)  insert into sequent_replayed_ids (event_id) values (169649),(169653),(62640),(62642),(95980),(95982),(45169),(45173),(45529),(45531),(45205),(45207),(353084),(353086),(339147),(339149),(339130),(339151),(110607),(110609),(110611),(110613),(110627),(110629),(110615),(110617),(110619),(343597),(343599),(343615),(343617),(343588),(343590),(343592),(343594),(193851),(193857),(193538),(193540),(193377),(193381),(194029),(194037),(193904),(193910),(193885),(193889),(193769),(193775),(193746),(193748),(193664),(193666),(193962),(193966),(23937),(23939),(23970),(23972),(23941),(23943),(23922),(23924),(23931),(23933),(23959),(23964),(163067),(163069),(163179),(163181),(163144),(163146),(163093),(162974),(162976),(163107),(163109),(163183),(163185)
D, [2023-07-10T11:14:52.667463 #19] DEBUG -- :   Sequent::Migrations::ViewSchema::Versions Load (1.0ms)  SELECT "sequent_versions".* FROM "sequent_versions" ORDER BY version desc LIMIT 1
D, [2023-07-10T11:14:52.670828 #19] DEBUG -- :    (2.9ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_-10'

D, [2023-07-10T11:14:52.672767 #19] DEBUG -- :    (1.6ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_-9'

D, [2023-07-10T11:14:52.677993 #19] DEBUG -- :    (1.5ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_-8'

D, [2023-07-10T11:14:52.679998 #19] DEBUG -- :    (1.5ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_-7'

D, [2023-07-10T11:14:52.681809 #19] DEBUG -- :    (1.5ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_-6'

D, [2023-07-10T11:14:52.683528 #19] DEBUG -- :    (1.5ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_-5'

D, [2023-07-10T11:14:52.685563 #19] DEBUG -- :    (1.8ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_-4'

D, [2023-07-10T11:14:52.687216 #19] DEBUG -- :    (1.4ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_-3'

D, [2023-07-10T11:14:52.689047 #19] DEBUG -- :    (1.6ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_-2'

D, [2023-07-10T11:14:52.690774 #19] DEBUG -- :    (1.5ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_-1'

D, [2023-07-10T11:14:52.692439 #19] DEBUG -- :    (1.4ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_0'

D, [2023-07-10T11:14:52.694204 #19] DEBUG -- :    (1.5ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_1'

D, [2023-07-10T11:14:52.695929 #19] DEBUG -- :    (1.5ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_2'

D, [2023-07-10T11:14:52.697737 #19] DEBUG -- :    (1.5ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_3'

D, [2023-07-10T11:14:52.699452 #19] DEBUG -- :    (1.5ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_4'

D, [2023-07-10T11:14:52.701220 #19] DEBUG -- :    (1.5ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_5'

D, [2023-07-10T11:14:52.702903 #19] DEBUG -- :    (1.4ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_6'

D, [2023-07-10T11:14:52.705534 #19] DEBUG -- :    (2.4ms)  select table_name from information_schema.tables where table_schema = 'view_schema' and table_name LIKE '%_7'

D, [2023-07-10T11:14:52.726099 #19] DEBUG -- :    (20.2ms)  DROP TABLE view_schema.rentals_search_records_7 CASCADE
D, [2023-07-10T11:14:52.737039 #19] DEBUG -- :    (10.5ms)  DROP TABLE view_schema.payment_page_visit_records_7 CASCADE
D, [2023-07-10T11:14:52.748019 #19] DEBUG -- :    (10.4ms)  DROP TABLE view_schema.rentals_search_displayed_rentals_records_7 CASCADE
D, [2023-07-10T11:14:52.779897 #19] DEBUG -- :    (31.4ms)  truncate table sequent_replayed_ids
rake aborted!
Parallel::UndumpableException: Parallel::Kill: Parallel::Kill
/usr/local/bundle/bundler/gems/sequent-ea740010386c/lib/sequent/migrations/view_schema.rb:316:in `rescue in block (3 levels) in replay!'
/usr/local/bundle/bundler/gems/sequent-ea740010386c/lib/sequent/migrations/view_schema.rb:304:in `block (3 levels) in replay!'
/usr/local/bundle/gems/parallel-1.23.0/lib/parallel.rb:627:in `call_with_index'
/usr/local/bundle/gems/parallel-1.23.0/lib/parallel.rb:597:in `process_incoming_jobs'
/usr/local/bundle/gems/parallel-1.23.0/lib/parallel.rb:577:in `block in worker'
/usr/local/bundle/gems/parallel-1.23.0/lib/parallel.rb:568:in `fork'
/usr/local/bundle/gems/parallel-1.23.0/lib/parallel.rb:568:in `worker'
/usr/local/bundle/gems/parallel-1.23.0/lib/parallel.rb:559:in `block in create_workers'
/usr/local/bundle/gems/parallel-1.23.0/lib/parallel.rb:558:in `each'
/usr/local/bundle/gems/parallel-1.23.0/lib/parallel.rb:558:in `each_with_index'
/usr/local/bundle/gems/parallel-1.23.0/lib/parallel.rb:558:in `create_workers'
/usr/local/bundle/gems/parallel-1.23.0/lib/parallel.rb:497:in `work_in_processes'
/usr/local/bundle/gems/parallel-1.23.0/lib/parallel.rb:291:in `map'
/usr/local/bundle/gems/parallel-1.23.0/lib/parallel.rb:300:in `map_with_index'
/usr/local/bundle/bundler/gems/sequent-ea740010386c/lib/sequent/migrations/view_schema.rb:300:in `block (2 levels) in replay!'
/usr/local/bundle/bundler/gems/sequent-ea740010386c/lib/sequent/util/timer.rb:8:in `time'
/usr/local/bundle/bundler/gems/sequent-ea740010386c/lib/sequent/migrations/view_schema.rb:291:in `block in replay!'
/usr/local/bundle/bundler/gems/sequent-ea740010386c/lib/sequent/migrations/view_schema.rb:408:in `with_sequent_config'
/usr/local/bundle/bundler/gems/sequent-ea740010386c/lib/sequent/migrations/view_schema.rb:288:in `replay!'
/usr/local/bundle/bundler/gems/sequent-ea740010386c/lib/sequent/migrations/view_schema.rb:207:in `migrate_online'
/usr/local/bundle/bundler/gems/sequent-ea740010386c/lib/sequent/rake/migration_tasks.rb:111:in `block (3 levels) in register_tasks!'
/usr/local/bundle/gems/sentry-ruby-5.9.0/lib/sentry/rake.rb:26:in `execute'
/usr/local/bundle/gems/ddtrace-1.12.1/lib/datadog/tracing/contrib/rake/instrumentation.rb:30:in `execute'
/usr/local/bundle/gems/bundler-2.4.14/lib/bundler/cli/exec.rb:58:in `load'
/usr/local/bundle/gems/bundler-2.4.14/lib/bundler/cli/exec.rb:58:in `kernel_load'
/usr/local/bundle/gems/bundler-2.4.14/lib/bundler/cli/exec.rb:23:in `run'
/usr/local/bundle/gems/bundler-2.4.14/lib/bundler/cli.rb:492:in `exec'
/usr/local/bundle/gems/bundler-2.4.14/lib/bundler/vendor/thor/lib/thor/command.rb:27:in `run'
/usr/local/bundle/gems/bundler-2.4.14/lib/bundler/vendor/thor/lib/thor/invocation.rb:127:in `invoke_command'
/usr/local/bundle/gems/bundler-2.4.14/lib/bundler/vendor/thor/lib/thor.rb:392:in `dispatch'
/usr/local/bundle/gems/bundler-2.4.14/lib/bundler/cli.rb:34:in `dispatch'
/usr/local/bundle/gems/bundler-2.4.14/lib/bundler/vendor/thor/lib/thor/base.rb:485:in `start'
/usr/local/bundle/gems/bundler-2.4.14/lib/bundler/cli.rb:28:in `start'
/usr/local/bundle/gems/bundler-2.4.14/exe/bundle:37:in `block in <top (required)>'
/usr/local/bundle/gems/bundler-2.4.14/lib/bundler/friendly_errors.rb:117:in `with_friendly_errors'
/usr/local/bundle/gems/bundler-2.4.14/exe/bundle:29:in `<top (required)>'
/usr/local/bundle/bin/bundle:25:in `load'
/usr/local/bundle/bin/bundle:25:in `<main>'
Tasks: TOP => sequent:migrate:online
(See full trace by running task with --trace)

I am not sure how to debug such errors. On staging env where I don't have so many events I can just try to run online migrations several times and it helped. On production I was able to run it only with number_of_replay_processes=1.

I can see also in logs some errors like this:

E, [2023-07-10T14:40:31.473537 #434] ERROR -- : +++++++++++++++ CAUSE +++++++++++++++
E, [2023-07-10T14:40:31.473611 #434] ERROR -- : ERROR:  relation "rentals_search_records_7" does not exist

or like this:

E, [2023-07-10T14:40:31.472640 #434] ERROR -- : +++++++++++++++ CAUSE +++++++++++++++
E, [2023-07-10T14:40:31.472680 #434] ERROR -- : Sequent::Core::EventPublisher::PublishEventError

or like this:

E, [2023-07-10T14:40:31.472348 #434] ERROR -- : +++++++++++++++ CAUSE +++++++++++++++
E, [2023-07-10T14:40:31.472407 #434] ERROR -- : ERROR:  current transaction is aborted, commands ignored until end of transaction block

I am not sure how to debug that.

smilyalexey commented 1 year ago

After waiting for a 1 hour with using number_of_replay_processes=1 I got this error:

E, [2023-07-10T15:50:13.607758 #457] ERROR -- : Replaying failed for ids: ^4ae - 4ae
E, [2023-07-10T15:50:13.607825 #457] ERROR -- : +++++++++++++++ ERROR +++++++++++++++
E, [2023-07-10T15:50:13.607889 #457] ERROR -- : PG::InFailedSqlTransaction: ERROR:  current transaction is aborted, commands ignored until end of transaction block
lvonk commented 1 year ago

Hi,

class Projector < Sequent::Projector
  on Created do
    fail 'bar'
  end
end

It will print something like:

I [sequent] 20230710 18:21:48 - group_exponent: 3
I [sequent] 20230710 18:21:48 - Start replaying events
I [sequent] 20230710 18:21:48 - Number of groups 4096
E [sequent] 20230710 18:21:49 - Replaying failed for ids: ^236 - 236
E [sequent] 20230710 18:21:49 - +++++++++++++++ ERROR +++++++++++++++
E [sequent] 20230710 18:21:49 - Sequent::Core::EventPublisher::PublishEventError
/Users/vonk/.rbenv/versions/3.1.1/lib/ruby/gems/3.1.0/bundler/gems/sequent-ee9e9d3ebe3d/lib/sequent/core/event_publisher.rb:59:in `rescue in block in process_event'
... OMITTED
E [sequent] 20230710 18:21:49 - +++++++++++++++ CAUSE +++++++++++++++
E [sequent] 20230710 18:21:49 - bar

/Users/vonk/checkouts/projector.rb:3:in `block in <class:Projector>'

A PublishEventError should be followed by the real cause.

smilyalexey commented 1 year ago

Hi @lvonk. Thanks for you reply. Right now we didn't include migration in our deployment script. So I deployed the app and then go to the console and run this manually:

SEQUENT_MIGRATION_SCHEMAS="sequent_schema,view_schema" REPLAY_PROCESSES=5 bundle exec rake sequent:migrate:online

My sequent.rb looks like this:

require_relative "../../db/sequent_migrations"

Rails.application.reloader.to_prepare do
  Sequent.configure do |config|
    config.migrations_class_name = "SequentMigrations"
    config.number_of_replay_processes = ENV.fetch("REPLAY_PROCESSES", 4).to_i

    config.command_handlers = [
    ].map(&:new)

    config.event_handlers = [
    ].map(&:new)

    config.database_config_directory = "config"

    # this is the location of your sql files for your view_schema
    config.migration_sql_files_directory = "db/sequent"
  end

  Sequent::Core::EventRecord.table_name = "sequent_schema.event_records"
  Sequent::Core::CommandRecord.table_name = "sequent_schema.command_records"
  Sequent::Core::StreamRecord.table_name = "sequent_schema.stream_records"
end

there is nothing about online_replay_persistor_class.

PS I specified table_names to solve this problem - https://github.com/zilverline/sequent/issues/375 (I opened that issue from my another GH account)


here is my database.yml

default: &default
  adapter: postgresql
  encoding: unicode
  pool: <%= ENV["DB_POOL"] || ENV.fetch("RAILS_MAX_THREADS") { 5 } %>
  url: <%= ENV["DATABASE_URL"] %>
  advisory_locks: <%= ENV.fetch("MIGRATION_ADVISORY_LOCKS", true) %>
  prepared_statements: <%= ENV.fetch("ENABLE_PREPARED_STATEMENTS", true) %>
  schema_search_path: "<%= ENV['SEQUENT_MIGRATION_SCHEMAS'] || 'public, sequent_schema, view_schema' %>"

development:
  <<: *default

test:
  <<: *default

staging:
  <<: *default

production:
  <<: *default

There are a few foreign indexes expecting uniqueness. Can they cause it? I don't see any errors about that in logs and I beleive I already saw it before when that was the issue.

but what I also found is this:

/usr/local/bundle/gems/bundler-2.4.14/lib/bundler/cli.rb:28:in `start'
/usr/local/bundle/gems/bundler-2.4.14/exe/bundle:37:in `block in <top (required)>'
/usr/local/bundle/gems/bundler-2.4.14/lib/bundler/friendly_errors.rb:117:in `with_friendly_errors'
/usr/local/bundle/gems/bundler-2.4.14/exe/bundle:29:in `<top (required)>'
/usr/local/bundle/bin/bundle:25:in `load'
/usr/local/bundle/bin/bundle:25:in `<main>'
E, [2023-07-10T15:53:48.313824 #500] ERROR -- : +++++++++++++++ CAUSE +++++++++++++++
E, [2023-07-10T15:53:48.313882 #500] ERROR -- : ERROR:  duplicate key value violates unique constraint "event_id_pk"
DETAIL:  Key (event_id)=(55782) already exists.

I don't see this error every run, but that's what I saw in my last call when I also specified SEQUENT_MIGRATION_SCHEMAS

lvonk commented 1 year ago

There are a few foreign indexes expecting uniqueness. Can they cause it?

Yes, but I would expect a clear (postgres) error when this occurs. You can't use foreign keys in the view schema since the order in which events are replayed is based upon aggregate_id which does not guarantee the "correct" order for foreign keys. The same applies to belongs_to which expects the ParentRecord to be there (this is an open issue).

So for instance:

class ParentProjector
  manages_tables ParentRecord
end

class ChildProjector
  manages_tables ChildRecord
end

class ParentRecord
  has_many child_records, foreign_key: :parent_aggregate_id, primary_key: :aggregate_id
end

When migration both projectors and the child_records.sql has a foreign key on the parent_aggregate_id there is no guarantee that the parent record is already inserted.

The only guarantee Sequent gives is that events for a single aggregate are replayed in order.

The event sourced way to is ensure correctness of relations in the Aggregate's so a foreign key constraint in the view schema is not needed. You can of course create an index if needed.

duplicate key value violates unique constraint "event_id_pk" DETAIL: Key (event_id)=(55782) already exists.

This is a strange error since it indicates one event is being replayed twice. Did you maybe start two bundle exec rake sequent:migrate:online at the same time, or perhaps one was still running but you didn't know?

smilyalexey commented 1 year ago

@lvonk sorry I am a bit confused here. Are we talking about foreign keys or about association in general? We use read models to build some reports for QuickSight. So we have something like this:

CREATE TABLE table_1 (
    id BIGSERIAL NOT NULL,
   aggregate_id uuid NOT NULL,
)
CREATE UNIQUE INDEX table_1_index ON table_1 USING btree (aggregate_id);

CREATE TABLE table_2 (
    id BIGSERIAL NOT NULL,
   aggregate_id uuid NOT NULL,
   table_1_aggregate_id  uuid NOT NULL
)
CREATE UNIQUE INDEX table_2_index ON table_2 USING btree (aggregate_id);
CREATE INDEX table_2_table_1_index ON table_2 USING btree (aggregate_id, table_1_aggregate_id);

CREATE TABLE table_3 (
    id BIGSERIAL NOT NULL,
   aggregate_id uuid NOT NULL,
   table_1_aggregate_id  uuid NOT NULL
   table_2_aggregate_id  uuid NOT NULL
)
CREATE UNIQUE INDEX table_3_index ON table_2 USING btree (aggregate_id);
CREATE INDEX table_3_table_1_index ON table_2 USING btree (aggregate_id, table_1_aggregate_id);
CREATE INDEX table_3_table_2_table_1_index ON table_2 USING btree (aggregate_id, table_2_aggregate_id, table_1_aggregate_id);

But at the same time our read models are empty:

class Table1Record < Sequent::ApplicationRecord
end

class Table2Record < Sequent::ApplicationRecord
end

class Table3Record < Sequent::ApplicationRecord
end

So we build reports on QuickSight by using inner/left/right joins for this tables. So we don't use this association directly in read models, like you mentioned in your message. We use unique indexes only for aggregate_id. But at the same time we build sql requests through this fields. Do you see any potential issues with that?

smilyalexey commented 1 year ago

I am not sure if that's the case but I also specify several projectors in new migration version:

'7' => [ Table1Projector, Table2Projector, Table3Projector]
lvonk commented 1 year ago

Are we talking about foreign keys

foreign key constraints like ALTER TABLE table_name ADD CONSTRAINT tablefk FOREIGN KEY (foo) REFERENCES foos (foo)

Indices are not a problem.

Another thing to pay attention to is that in one projector you should not depend on Record that it doesn't manage:

So something like:

class Table1Projector < Sequent::Projector
  manages_tables [Table1Record]
  on Created do |event|
    table2 = Table2Record.find_by(aggregate_id: event.table2_aggregate_id)
    create_records(Table1Record.new(table2_name: table2.name)
  end
end

Will not work when replaying since it is not guaranteed that Table2Record will exist at that moment.

smilyalexey commented 1 year ago

well, we use Aggregate for that. We fetch it by id and put the name. Something like this:

aggregate = Sequent.aggregate_repository.load_aggregate(event.aggregate_id, Table2Aggregate)
create_records(Table1Record.new(table2_name: table2.name)

I guess it's safe in that case, isn't it?


And I also have one more question. Is it possible that if I expect some errors on projector side, amount of read models might be not the same after running online migrations? For example if I have something like this:

class Projector < Sequent::Projector
  on Created do
       SomeService.call # it raises an error
  rescue => nil
  end
end

before running online and offline migrations amount of records will be 100, after that will be 90?

lvonk commented 1 year ago

well, we use Aggregate for that. We fetch it by id and put the name.

Yes this will work but I wouldn't recommend it. It will slow down replaying considerably. But more important projections should be build based on events, not via querying the domain. In this case, I would probably apply an event with the data needed or make sure I can join the two records with each other.

To make a more descriptive example then table1 and table2. Let's say you have an Invoice and a Customer. Whenever you want to make a relation between them it should be dealt with within the command handler:

on SetCustomerOnInvoice do |command|
  invoice = Sequent.aggregate_repository.load_aggregate(command.aggregate_id, Invoice)
  customer = Sequent.aggregate_repository.load_aggregate(command.customer_id, Customer)
  invoice.set_recipient(customer.id)
end

The projectors would then look like:

class InvoiceProjector < Sequent::Projector
  on InvoiceRecipientSet do |event|
    update_all_records(InvoiceRecord, {aggregate_id: event.aggregate_id }, { customer_id: event.recipient_id })
  end
end

class InvoiceRecord < Sequent::ApplicationRecord
  belongs_to :customer_record, foreign_key: :recipient_id primary_key: :aggregate_id
end

Don't call any services in Projectors since those can fail and will result in predictable results. The idea of a projector is to have a Projection of (some) state of you aggregates that you need, for instance in some views. It should solely be based on Events since it already happened. Calling some outside services breaks that promise.

smilyalexey commented 1 year ago

Ok. Thanks for info. What about create and update event? Let's say we have 1 aggregate with 2 events:

    class User < Sequent::AggregateRoot
      attr_accessor :name, :lastname

      def initialize(command)
        super(command.aggregate_id)

        apply UserCreated, name: command.name, lastname: command.lastname
      end

      on UserCreated do |event|
        @name = event.name
        @lastname = event.lastname
      end

      on UserNameUpdated do |event|
         @name = event.name
      end
    end

How online migration will process this events for projector like that?

class UserProjector < Sequent::Projector
    manages_tables UserRecord

    on User::UserCreated do |event|
      create_record(UserRecord, {
          aggregate_id: event.aggregate_id,
          name: event.name,
        }
      )
    end

    on UserNameUpdated do |event|
       if record = UserRecord.find_by(aggregate_id: event.aggregate_id)
         record.update!(name: event.name)
       end
    end
end

I guess it may loose some records during online migration due to different event order, right?

And I can also see in the docs you suggested use update_all_records and get_record! methods. Are the safer for such needs?

lvonk commented 1 year ago

I guess it may loose some records during online migration due to different event order, right?

No, events of a single aggregate are guaranteed to be replayed in order of sequence_number.

on UserNameUpdated do |event|
  if record = UserRecord.find_by(aggregate_id: event.aggregate_id)
    record.update!(name: event.name)
  end
end

Although this works, it is not recommended since this does a select and then update (2 statements) and therefor slower.

Better is:

on UserNameUpdated do |event|
  update_all_records(
    UserRecord,
    {aggregate_id: event.aggregate_id},
    {name: event.name}
  )
end
lvonk commented 1 year ago

Closing this ticket due to inactivity. If there is still an issue please feel free to re-open or open a new ticket.