midenok / mariadb

MariaDB server is a community developed fork of MySQL server. Started by core members of the original MySQL team, MariaDB actively works with outside developers to deliver the most featureful, stable, and sanely licensed open SQL server in the industry.
GNU General Public License v2.0
0 stars 0 forks source link

SAMU-64 Allow administrators to enable or disable parallel replication on a per-table basis #109

Open midenok opened 1 year ago

midenok commented 1 year ago

Links

Plan of actions

  1. Get table name from Log_event. See rpl_parallel::do_event()
2714        Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
2715        uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
2716                           rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
2717                           0 : gtid_ev->domain_id);
2718        if (!(e= find(domain_id)))
....
2724        current= e;
...
2746      /*
2747        Find a worker thread to queue the event for.
2748        Prefer a new thread, so we maximise parallelism (at least for the group
2749        commit). But do not exceed a limit of --slave-domain-parallel-threads;
2750        instead re-use a thread that we queued for previously.
2751      */
2752      cur_thread=
2753        e->choose_thread(serial_rgi, &did_enter_cond, &old_stage,
2754                         typ != GTID_EVENT);
....
(rr) p e
$17 = (rpl_parallel_entry *) 0x7f80200535e8
  1. According to the parallel policy of the table choose the corresponding thread.
  2. Construct parallel policy registry from configuration.
  3. Maybe do everything at the master and push the parallel policy with log event?

Info

enum enum_slave_parallel_mode {
  SLAVE_PARALLEL_NONE,
  SLAVE_PARALLEL_MINIMAL,
  SLAVE_PARALLEL_CONSERVATIVE,
  SLAVE_PARALLEL_OPTIMISTIC,
  SLAVE_PARALLEL_AGGRESSIVE
};

2715        uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
2716                           rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
2717                           0 : gtid_ev->domain_id);

(rr) p rli
$21 = (Relay_log_info *) 0x5562fab1d8a8
(rr) p rli->mi
$22 = (Master_info *) 0x5562fab1bbe0

FL_ALLOW_PARALLEL disables parallel replication

Controlled by skip_parallel_replication variable.

        if (!(gtid_flags & Gtid_log_event::FL_TRANSACTIONAL) ||
            ( (!(gtid_flags & Gtid_log_event::FL_ALLOW_PARALLEL) ||
               (gtid_flags & Gtid_log_event::FL_WAITED)) &&
              (mode < SLAVE_PARALLEL_AGGRESSIVE)))
        {
          /*
            This transaction should not be speculatively run in parallel with
            what came before, either because it cannot safely be rolled back in
            case of a conflict, or because it was marked as likely to conflict
            and require expensive rollback and retry.

            Here we mark it as such, and then the worker thread will do a
            wait_for_prior_commit() before starting it. We do not introduce a
            new group_commit_orderer, since we still want following transactions
            to run in parallel with transactions prior to this one.
          */
          speculation= rpl_group_info::SPECULATE_WAIT;
        }

On thread pool

rpl_parallel_entry is domain reservation of threads controlled by slave_domain_parallel_threads. The total number of worker threads is set by slave_parallel_threads.

What if all the domains reserve the same portion of threads? By what algorithm the portions intersect?

On GTID

Out-of-order is achived by different domain ID. Dedicated domain for serialized transactions could have only one thread. The another solution would be to have dedicated thread independent of domains.

Existing options

midenok commented 1 year ago

RPL filter check on check_table_map() (RBR only)

#0  Rpl_filter::is_on (this=0x55f8e0137b40) at ../src/sql/rpl_filter.cc:266
#1  0x000055f8dd32ae27 in check_table_map (rgi=0x55f8e0162810, table_list=0x55f8e06e4108) at ../src/sql/log_event.cc:12942
#2  0x000055f8dd32aaa1 in Table_map_log_event::do_apply_event (this=0x55f8e06a85e8, rgi=0x55f8e0162810) at ../src/sql/log_event.cc:13020
#3  0x000055f8dcd0dcda in Log_event::apply_event (this=0x55f8e06a85e8, rgi=0x55f8e0162810) at ../src/sql/log_event.h:1492
#4  0x000055f8dccfd730 in apply_event_and_update_pos_apply (ev=0x55f8e06a85e8, thd=0x55f8e067d968, rgi=0x55f8e0162810, reason=0) at ../src/sql/slave.cc:3807
#5  0x000055f8dccfd352 in apply_event_and_update_pos (ev=0x55f8e06a85e8, thd=0x55f8e067d968, rgi=0x55f8e0162810) at ../src/sql/slave.cc:3969
#6  0x000055f8dcd06b4c in exec_relay_log_event (thd=0x55f8e067d968, rli=0x55f8e067a118, serial_rgi=0x55f8e0162810) at ../src/sql/slave.cc:4286
#7  0x000055f8dccf4edb in handle_slave_sql (arg=0x55f8e0678450) at ../src/sql/slave.cc:5460

frame 2

12939     if ((rgi->thd->slave_thread /* filtering is for slave only */ ||
12940           IF_WSREP((WSREP(rgi->thd) && rgi->thd->wsrep_applier), 0)) &&
12941         (!rli->mi->rpl_filter->db_ok(table_list->db.str) ||
12942          (rli->mi->rpl_filter->is_on() && !rli->mi->rpl_filter->tables_ok("", table_list))))
12943       res= FILTERED_OUT;
midenok commented 1 year ago

rpl_filter->tables_ok() check

In check_table_map() (RBR only)

--- sql/log_event.cc
+++ sql/log_event.cc
@@ -12954,7 +12955,10 @@ check_table_map(rpl_group_info *rgi, RPL_TABLE_LIST *table_list)
         IF_WSREP((WSREP(rgi->thd) && rgi->thd->wsrep_applier), 0)) &&
       (!rli->mi->rpl_filter->db_ok(table_list->db.str) ||
        (rli->mi->rpl_filter->is_on() && !rli->mi->rpl_filter->tables_ok("", table_list))))
+  {
+    DBUG_ASSERT(0);
     res= FILTERED_OUT;
+  }
   else
   {
     RPL_TABLE_LIST *ptr= static_cast<RPL_TABLE_LIST*>(rgi->tables_to_lock);

rpl.rpl_replicate_do

#0  Rpl_filter::tables_ok (this=0x55877a981640, db=0x55877723afa7 "", tables=0x55877aeab548) at ../src/sql/rpl_filter.cc:105
#1  0x00005587778cd8d1 in check_table_map (rgi=0x55877ad98eb0, table_list=0x55877aeab548) at /home/midenok/src/mariadb/10.4/src/sql/log_event.cc:12957
#2  0x00005587778cd511 in Table_map_log_event::do_apply_event (this=0x55877aeab458, rgi=0x55877ad98eb0) at /home/midenok/src/mariadb/10.4/src/sql/log_event.cc:13038
#3  0x0000558777a065ea in Log_event::apply_event (this=0x55877aeab458, rgi=0x55877ad98eb0) at /home/midenok/src/mariadb/10.4/src/sql/log_event.h:1492
#4  0x00005587779f6150 in apply_event_and_update_pos_apply (ev=0x55877aeab458, thd=0x55877ae82988, rgi=0x55877ad98eb0, reason=0) at /home/midenok/src/mariadb/10.4/src/sql/slave.cc:3809
#5  0x00005587779f5d72 in apply_event_and_update_pos (ev=0x55877aeab458, thd=0x55877ae82988, rgi=0x55877ad98eb0) at /home/midenok/src/mariadb/10.4/src/sql/slave.cc:3971
#6  0x00005587779ffdfc in exec_relay_log_event (thd=0x55877ae82988, rli=0x55877ae7f128, serial_rgi=0x55877ad98eb0) at /home/midenok/src/mariadb/10.4/src/sql/slave.cc:4290
#7  0x00005587779ed8fb in handle_slave_sql (arg=0x55877ae7d460) at /home/midenok/src/mariadb/10.4/src/sql/slave.cc:5476

Tests

rpl.rpl_auto_increment rpl.rpl_replicate_do rpl.rpl_row_lcase_tblnames rpl.rpl_row_annotate_do rpl.rpl_row_annotate_dont rpl.rpl_row_corruption rpl.rpl_multi_delete2 rpl.rpl_ignore_table_update rpl.rpl_err_ignoredtable rpl.rpl_multi_update4 rpl.rpl_ignore_grant rpl.rpl_parallel rpl.rpl_replicate_ignore_db rpl.rpl_ignore_revoke rpl.ignore_table_autoinc-9737

In mysql_execute_command() (STMT only)

rpl.rpl_filter_tables_dynamic

#0  Rpl_filter::tables_ok (this=0x5612364ec640, db=0x7ffa4404b9a8 "test", tables=0x7ffa440729d0) at ../src/sql/rpl_filter.cc:96
#1  0x0000561232e81bb0 in all_tables_not_ok (thd=0x7ffa440516b8, tables=0x7ffa440729d0) at /home/midenok/src/mariadb/10.4/src/sql/sql_parse.cc:408
#2  0x0000561232e6cbf5 in mysql_execute_command (thd=0x7ffa440516b8) at /home/midenok/src/mariadb/10.4/src/sql/sql_parse.cc:3616
#3  0x0000561232e66a6b in mysql_parse (thd=0x7ffa440516b8, rawbuf=0x7ffa4405bf93 "UPDATE t7 LEFT JOIN (t1, t4, t2) ON (t7.id=t1.id and t7.id=t4.id and t7.id=t2.id) SET a=0, b=0, d=0, g=0 where t7.id=1", length=118, parser_state=0x7ffa48061898, is_com_multi=false, is_next_command=false) at /home/midenok/src/mariadb/10.4/src/sql/sql_parse.cc:7984
#4  0x0000561232bcab4b in Query_log_event::do_apply_event (this=0x7ffa4405bae8, rgi=0x7ffa44049a30, query_arg=0x7ffa4405bf93 "UPDATE t7 LEFT JOIN (t1, t4, t2) ON (t7.id=t1.id and t7.id=t4.id and t7.id=t2.id) SET a=0, b=0, d=0, g=0 where t7.id=1", q_len_arg=118) at /home/midenok/src/mariadb/10.4/src/sql/log_event.cc:5737
#5  0x0000561232bc988a in Query_log_event::do_apply_event (this=0x7ffa4405bae8, rgi=0x7ffa44049a30) at /home/midenok/src/mariadb/10.4/src/sql/log_event.cc:5415
#6  0x0000561232d195fa in Log_event::apply_event (this=0x7ffa4405bae8, rgi=0x7ffa44049a30) at /home/midenok/src/mariadb/10.4/src/sql/log_event.h:1492
#7  0x0000561232d09160 in apply_event_and_update_pos_apply (ev=0x7ffa4405bae8, thd=0x7ffa440516b8, rgi=0x7ffa44049a30, reason=0) at /home/midenok/src/mariadb/10.4/src/sql/slave.cc:3809
#8  0x0000561232d08d82 in apply_event_and_update_pos (ev=0x7ffa4405bae8, thd=0x7ffa440516b8, rgi=0x7ffa44049a30) at /home/midenok/src/mariadb/10.4/src/sql/slave.cc:3971
#9  0x0000561232d12e0c in exec_relay_log_event (thd=0x7ffa440516b8, rli=0x5612369ea308, serial_rgi=0x7ffa44049a30) at /home/midenok/src/mariadb/10.4/src/sql/slave.cc:4290
#10 0x0000561232d0090b in handle_slave_sql (arg=0x5612369e8640) at /home/midenok/src/mariadb/10.4/src/sql/slave.cc:5476

In Load_log_event::do_apply_event() (not tested by rpl suite)

--- sql/log_event.cc
+++ sql/log_event.cc
@@ -7430,6 +7430,7 @@ int Load_log_event::do_apply_event(NET* net, rpl_group_info *rgi,
       // TODO: this is a bug - this needs to be moved to the I/O thread
       if (net)
         skip_load_data_infile(net);
+      DBUG_ASSERT(0);
     }
     else
     {

Tests

No tests!

midenok commented 1 year ago

Let P be the set of tables whose replication can be parallel. Let O be the set of tables whose replication must be ordered. Any transaction that includes any of O tables is considered O-transaction no matter how many of P tables it includes. Any other transaction includes only P tables and is considered P-transaction.

How it works now

Each replication event has GTID (non-GTID events are not processed by parallel replication). Each GTID has domain ID (first number). Each domain can have up to slave_domain_parallel_threads threads from the global pool of slave_parallel_threads. Domain takes thread from the global pool on thread selection and event queue stage (rpl_parallel::do_event()). Threads in domain can be reused so their queues grow. If there is not enough threads in global pool domain waits until the one is freed. A thread is returned to the global pool when its queue becomes empty.

To achive parallelism domain utilizes its pool of threads at maximum, it assigns events in the domain pool of threads in round-robin fashion. The only exception to that are non-GTID events, those simply do not trigger round-robin switch.

Now we get into an interesting observation: even if we disabled parallel replication by setting skip_parallel_replication the domain nonetheless will utilize the whole its pool of threads. That means events wait all their previous events in the domain to be committed, the threads will wait each other.

How and what to implement

First, we have to distinguish O-transactions and P-transactions. We have some rules built from configuration directives. According to that rules we may assign the table to O or P. But on slave side it is difficult or impossible to implement:

  1. There is no knowledge about the tables on thread selection and event queue stage;
  2. There must be two different implementations for RBR and SBR;
  3. No feasible solution for SBR as we get the knowledge about the tables at SQL parsing stage and that is too late (event was already queued).

On master side this can be achieved similarly to how replicate_do_table is implemented on slave side: see all_tables_not_ok() in mysql_execute_command(). After we found that transaction is O-transaction we pass !FL_ALLOW_PARALLEL flag with the event to the slave (i.e. we drop FL_ALLOW_PARALLEL flag). This is similar to what skip_parallel_replication does. Note that this flag is overridden by aggressive parallel mode. So this does not work in aggressive mode!

According to SAMU-64 description P is relatively small number of tables comparing to O. If the same proportion is true for the number of transactions, we cannot schedule O-transactions to the whole pool of threads: it will just overload the domain thread-pool and threads will be stuck waiting each other. To avoid that we need a single thread for O-transactions. That can be achieved in two variants:

Variant 1: Dedicated thread

Each domain has it's own dedicated thread for processing O-transactions (and non-GTID transactions maybe). Nearby domain thread-pool we may keep one more thread for processing O-transactions. Getting FL_ALLOW_PARALLEL value before thread selection routine (choose_thread()) seems to be feasible. Any events that have !FL_ALLOW_PARALLEL can go to a dedicated thread. The possible drawback to this is that O-transaction waits any prior P-transactions. This may or may not be wanted. F.ex. when the table from P gets first into P-transaction P1 then into O-transaction O2 it is a must O2 waits P1.

Another effect of that is that skip_parallel_replication and O tables will work the same and will differ only by affected scope: session vs tables. Whether it is strength or weakness depends on specific use cases.

Variant 2: Dedicated domain

Dedicated domain processes O-transactions. Slave knows this domain is special and makes its pool contain only one thread. Possible advantages to this:

  1. O-transactions run completely independent from P-transactions;
  2. FL_ALLOW_PARALLEL flag is not needed to mark O-transaction. Domain ID may be used instead.
  3. This solution can be scaled to a multiple dedicated domains, so several group of tables run independently from each other. This can be even more flexible with assigning multiple dedicated domains by adding a mask to original domain ID.

Disadvantages:

  1. Table T1 from P got into O-transaction O1 and into P-transaction P1. P1 and O1 are unrelated, the order is undefined and the result of DML will differ for T1 on master and slave (MASTER_USE_GTID= slave_pos) or the transactions will block each other (MASTER_USE_GTID= no).
  2. If we don't distinguish our O-transactions from skip_parallel_replication this can have the risk of incompatibility. To avoid that additional configuration option should be introduced on slave that will identify the domain for ordered transactions.

Conclusion

While Variant 2 looks more versatile it may lead to unexpected dead-locks or data inconsistencies depending on the configuration. To avoid that some additional measures must be taken which multiplies task complexity. As of the current state of software I would suggest to stay with the simpler Variant 1.

midenok commented 1 year ago

Thread released

--- sql/rpl_parallel.cc
+++ sql/rpl_parallel.cc
@@ -2094,6 +2094,7 @@ rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner,
 void
 rpl_parallel_thread_pool::release_thread(rpl_parallel_thread *rpt)
 {
+  DBUG_ASSERT(0);
   rpl_parallel_thread *list;

   mysql_mutex_assert_owner(&rpt->LOCK_rpl_thread);
#7  0x00005570999d9587 in rpl_parallel_thread_pool::release_thread (this=0x55709a720310 <global_rpl_thread_pool>, rpt=0x7fa99c015a00) at ../src/sql/rpl_parallel.cc:2097
#8  0x00005570999d6b9f in handle_rpl_parallel_thread (arg=0x7fa99c015a00) at ../src/sql/rpl_parallel.cc:1489
#9  0x0000557099c004b5 in pfs_spawn_thread (arg=0x7fa99c016da8) at ../src/storage/perfschema/pfs.cc:1869
#10 0x00007fa9bf4902e2 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#11 0x00007fa9bf51e3f4 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:100

frame 8

1488          if (!rpt->stop)
1489            rpt->pool->release_thread(rpt);
midenok commented 1 year ago

Tests that involve parallel and non-GTID events

rpl.rpl_mdev6020 rpl.rpl_mark_optimize_tbl_ddl rpl.parallel_backup rpl.rpl_parallel_partition rpl.rpl_mdev6386 rpl.rpl_parallel_conflicts rpl.rpl_delayed_slave2 rpl.rpl_domain_id_filter_parallel rpl.rpl_mdev10863 rpl.rpl_old_master rpl.rpl_parallel rpl.rpl_parallel_mdev6589 rpl.rpl_parallel_optimistic rpl.rpl_parallel_retry rpl.rpl_parallel_optimistic_until rpl.rpl_relay_max_extension rpl.rpl_parallel_temptable rpl.rpl_parallel_no_log_slave_updates rpl.a rpl.rpl_parallel2 rpl.rpl_parallel_ignored_errors rpl.rpl_mdev8193 rpl.rpl_parallel_charset rpl.rpl_parallel_multilevel rpl.rpl_parallel_multilevel2 rpl.rpl_parallel_optimistic_nobinlog rpl.rpl_slave_shutdown_mdev20821

Tests sensitive to separate thread (thread-pool - 1)

rpl.parallel_backup rpl.rpl_parallel_optimistic_until rpl.rpl_parallel_ignored_errors rpl.rpl_parallel

Tests that involve skip_parallel_replication

rpl.rpl_parallel_optimistic rpl.rpl_parallel_multilevel

Debugging

 mtr --debug-sync-timeout=5 --mysqld=--debug=d,Query,debug_sync,debug_sync_exec rpl.par,stmt
midenok commented 1 year ago

1. count_queued_event_groups incremented

#0  0x00005634217efa5d in rpl_parallel::do_event (this=0x563425b3d998, serial_rgi=0x7f987001a7c0, ev=0x7f9870035af8, event_size=42) at ../src/sql/rpl_parallel.cc:2919
#1  0x0000563421498493 in exec_relay_log_event (thd=0x7f987001b298, rli=0x563425b3a778, serial_rgi=0x7f987001a7c0) at ../src/sql/slave.cc:4206
#2  0x000056342148633b in handle_slave_sql (arg=0x563425b38ab0) at ../src/sql/slave.cc:5476

frame 0

2891        if (new_gco)
2892        {
2893          /*
2894            Do not run this event group in parallel with what came before; instead
2895            wait for everything prior to at least have started its commit phase, to
2896            avoid any risk of performing any conflicting action too early.
2897
2898            Remember the count that marks the end of the previous batch of event
2899            groups that run in parallel, and allocate a new gco.
2900          */
2901          uint64 count= e->count_queued_event_groups;
2902
2903          if (!(gco= cur_thread->get_gco(count, gco, e->current_sub_id)))
2904          {
....
2910            return 1;
2911          }
2912          gco->flags|= force_switch_flag;
2913          e->current_gco= gco;
2914        }
2915        rgi->gco= gco;
....
2919        ++e->count_queued_event_groups;
2920      }

2. wait_count set

#0  0x00005634217edabb in rpl_parallel_thread::get_gco (this=0x7f9870024508, wait_count=2, prev=0x7f987003cfc8, prior_sub_id=8) at ../src/sql/rpl_parallel.cc:1989
#1  0x00005634217ef974 in rpl_parallel::do_event (this=0x563425b3d998, serial_rgi=0x7f987001a7c0, ev=0x7f987003d698, event_size=44) at ../src/sql/rpl_parallel.cc:2903
#2  0x0000563421498493 in exec_relay_log_event (thd=0x7f987001b298, rli=0x563425b3a778, serial_rgi=0x7f987001a7c0) at ../src/sql/slave.cc:4206
#3  0x000056342148633b in handle_slave_sql (arg=0x563425b38ab0) at ../src/sql/slave.cc:5476

frame 0

1974    group_commit_orderer *
1975    rpl_parallel_thread::get_gco(uint64 wait_count, group_commit_orderer *prev,
1976                                 uint64 prior_sub_id)
1977    {
....
1989      gco->wait_count= wait_count

frame 1

2901          uint64 count= e->count_queued_event_groups;
2902
2903          if (!(gco= cur_thread->get_gco(count, gco, e->current_sub_id)))

3. rpl_parallel_start_waiting_for_prior triggered

#0  do_gco_wait (rgi=0x7f987003acb0, gco=0x7f987003d728, did_enter_cond=0x7f9866452c97, old_stage=0x7f9866452c78) at ../src/sql/rpl_parallel.cc:358
#1  0x00005634217e93b5 in handle_rpl_parallel_thread (arg=0x7f9870024508) at ../src/sql/rpl_parallel.cc:1243
#2  0x0000563421cf91c5 in pfs_spawn_thread (arg=0x7f988805c4a8) at ../src/storage/perfschema/pfs.cc:1869
#3  0x00007f989f8902e2 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#4  0x00007f989f91e3f4 in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:100
midenok commented 1 year ago

Tracing patch

diff --git sql/rpl_parallel.cc sql/rpl_parallel.cc
index 2c431ccc12d..c83773cd16f 100644
--- sql/rpl_parallel.cc
+++ sql/rpl_parallel.cc
@@ -2591,6 +2591,11 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
   enum Log_event_type typ;
   bool is_group_event;
   bool did_enter_cond= false;
+  bool new_gco= true;
+  uint8 force_switch_flag= 0;
+  enum rpl_group_info::enum_speculation speculation= rpl_group_info::SPECULATE_NO;
+  Gtid_log_event *gtid_ev;
+
   PSI_stage_info old_stage;

   DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE(););
@@ -2723,7 +2728,7 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
   if (typ == GTID_EVENT)
   {
     rpl_gtid gtid;
-    Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
+    gtid_ev= static_cast<Gtid_log_event *>(ev);
     uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ||
                        rli->mi->parallel_mode <= SLAVE_PARALLEL_MINIMAL ?
                        0 : gtid_ev->domain_id);
@@ -2782,12 +2787,9 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
   if (typ == GTID_EVENT)
   {
     Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev);
-    bool new_gco;
     enum_slave_parallel_mode mode= rli->mi->parallel_mode;
     uchar gtid_flags= gtid_ev->flags2;
     group_commit_orderer *gco;
-    uint8 force_switch_flag;
-    enum rpl_group_info::enum_speculation speculation;

     if (!(rgi= cur_thread->get_rgi(rli, gtid_ev, e, event_size)))
     {
@@ -2814,9 +2816,6 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
     rgi->wait_commit_sub_id= e->current_sub_id;
     rgi->wait_commit_group_info= e->current_group_info;

-    speculation= rpl_group_info::SPECULATE_NO;
-    new_gco= true;
-    force_switch_flag= 0;
     gco= e->current_gco;
     if (likely(gco))
     {
@@ -2969,6 +2968,22 @@ rpl_parallel::do_event(rpl_group_info *serial_rgi, Log_event *ev,
     qev->rgi= e->current_group_info;
   }

+  if (typ == GTID_EVENT)
+  {
+    DBUG_PRINT("rpl",
+              ("pos: %llu  type: G  idx: %u  spcl: %u  fsf: %u  ng: %u  "
+               "seq: %llu  com: %llu  domain: %u  groups_q: %llu",
+               ev->log_pos, e->rpl_thread_idx, speculation, force_switch_flag, new_gco,
+               gtid_ev->seq_no, gtid_ev->commit_id, gtid_ev->domain_id,
+               e->count_queued_event_groups));
+  }
+  else
+  {
+    DBUG_PRINT("rpl",
+              ("pos: %llu  type: %u  idx: %u",
+               ev->log_pos, typ, e->rpl_thread_idx));
+  }
+
   /*
     Queue the event for processing.
   */
midenok commented 1 year ago

Progress update

Dedicated thread for ordered events was made. The thread is reserved inside the domain pool of threads and is used only for SPECULATE_WAIT events. The queue of that thread should be longer and should be configured separately.

So far no code problems was detected with the tests. The most of the tests just required the tweaking of slave_parallel_threads as now the number of parallel threads is 1 less. But the most problems are inside the rpl_parallel.test. This is overloaded test that tests parallel replication rather in a hackish and bug-prone way that causes failures in the test itself*. So far I progressed ~40% inside this test and didn't reveal any code problems. I still need to finish the rest so I am sure it cannot reveal any bugs with my code. While doing so I'm getting to know parallel replication code better and getting the ideas how this is generally tested.

Time estimation. This week I'm working with the test and starting to add configuration options. So the next Tuesday I expect to have some working prototype. Until the mid-February there should be load and functional testing

_* In later versions rplparallel.test was refactored into a set of smaller tests, but still has bugs unfixed.

midenok commented 1 year ago

Tests

par.zip

diff -u /tmp/rpl.g /tmp/rpl.b

--- /tmp/rpl.g  2023-01-25 12:49:53.813340122 +0300
+++ /tmp/rpl.b  2023-01-25 12:50:22.123361507 +0300
@@ -1,8 +1,6 @@
-exec_relay_log_event: rpl: pos: 415 [373] GTID 0-1-1  cid=0  idx: 1  spcl: 0  fsf: 2  ng: 1  groups_q: 1
-exec_relay_log_event: rpl: pos: 538  type: 2  idx: 1
-exec_relay_log_event: rpl: pos: 580 [538] GTID 0-1-2  cid=0  idx: 2  spcl: 0  fsf: 0  ng: 1  groups_q: 2
-exec_relay_log_event: rpl: pos: 675  type: 2  idx: 2
-exec_relay_log_event: rpl: pos: 706  type: 16  idx: 2
+exec_relay_log_event: rpl: pos: 580 [538] GTID 0-1-2  cid=0  idx: 1  spcl: 0  fsf: 0  ng: 1  groups_q: 1
+exec_relay_log_event: rpl: pos: 675  type: 2  idx: 1
+exec_relay_log_event: rpl: pos: 706  type: 16  idx: 1
 exec_relay_log_event: rpl: pos: 748 [706] GTID 2-1-1  cid=0  idx: 1  spcl: 0  fsf: 0  ng: 1  groups_q: 1
 exec_relay_log_event: rpl: pos: 912  type: 2  idx: 1
 exec_relay_log_event: rpl: pos: 1129  type: 2  idx: 1
@@ -19,8 +17,14 @@
 exec_relay_log_event: rpl: pos: 2387  type: 2  idx: 1
 exec_relay_log_event: rpl: pos: 2418  type: 16  idx: 1
 exec_relay_log_event: rpl: pos: 2460 [2418] GTID 0-1-4  cid=0  idx: 2  spcl: 0  fsf: 0  ng: 1  groups_q: 2
-exec_relay_log_event: rpl: pos: 2642  type: 2  idx: 2
-exec_relay_log_event: rpl: pos: 2673  type: 16  idx: 2
-exec_relay_log_event: rpl: pos: 2717 [2673] GTID 0-1-5  cid=229  idx: 3  spcl: 0  fsf: 0  ng: 1  groups_q: 3
-exec_relay_log_event: rpl: pos: 2827  type: 2  idx: 3
-exec_relay_log_event: rpl: pos: 2858  type: 16  idx: 3
+exec_relay_log_event: rpl: pos: 2704  type: 2  idx: 2
+exec_relay_log_event: rpl: pos: 2735  type: 16  idx: 2
+exec_relay_log_event: rpl: pos: 2779 [2735] GTID 0-1-5  cid=229  idx: 3  spcl: 0  fsf: 0  ng: 1  groups_q: 3
+exec_relay_log_event: rpl: pos: 2889  type: 2  idx: 3
+exec_relay_log_event: rpl: pos: 2920  type: 16  idx: 3
+exec_relay_log_event: rpl: pos: 2964 [2920] GTID 0-1-6  cid=229  idx: 4  spcl: 0  fsf: 0  ng: 0  groups_q: 4
+exec_relay_log_event: rpl: pos: 3074  type: 2  idx: 4
+exec_relay_log_event: rpl: pos: 3105  type: 16  idx: 4
+exec_relay_log_event: rpl: pos: 3149 [3105] GTID 0-1-7  cid=229  idx: 5  spcl: 0  fsf: 0  ng: 0  groups_q: 5
+exec_relay_log_event: rpl: pos: 3259  type: 2  idx: 5
+exec_relay_log_event: rpl: pos: 3290  type: 16  idx: 5

Good: rpl_parallel_start_waiting_for_prior was hit

1: GTID 0-1-2

#0  do_gco_wait (rgi=0x7f2ff003a940, gco=0x7f2ff003b338, did_enter_cond=0x7f2fd1dbbc97, old_stage=0x7f2fd1dbbc78) at ../src/sql/rpl_parallel.cc:358
#1  0x0000556d83b603b5 in handle_rpl_parallel_thread (arg=0x7f2ff001b3f8) at ../src/sql/rpl_parallel.cc:1243
#2  0x0000556d840702c5 in pfs_spawn_thread (arg=0x7f2ff001cd48) at ../src/storage/perfschema/pfs.cc:1869
frame 1 ```c++ (rr) p *((Gtid_log_event *)qev->ev) $2 = { = { ... log_pos = 580, ... exec_time = 0, data_written = 42, server_id = 1, flags = 8, cache_type = 0, slave_exec_mode = 11936128518282651045, writer = 0xa5a5a5a5a5a5a5a5, thd = 0x0, ... }, members of Gtid_log_event: seq_no = 2, commit_id = 0, domain_id = 0, flags2 = 12 '\f', ... } ```

2: GTID 0-1-5

frame 1 ```c++ (rr) p *((Gtid_log_event *)qev->ev) $4 = { = { _vptr$Log_event = 0x556d84f8f4f0 , log_pos = 2717, temp_buf = 0x7f2fdc03d4d8 "\027+\320c\242\001", event_owns_temp_buf = true, when = 1674586903, when_sec_part = 18446744073709551615, exec_time = 0, data_written = 44, server_id = 1, flags = 8, cache_type = 0, slave_exec_mode = 11936128518282651045, writer = 0xa5a5a5a5a5a5a5a5, thd = 0x0, checksum_alg = BINLOG_CHECKSUM_ALG_CRC32 }, members of Gtid_log_event: seq_no = 5, commit_id = 229, domain_id = 0, flags2 = 14 '\016', static FL_STANDALONE = 1 '\001', static FL_GROUP_COMMIT_ID = 2 '\002', static FL_TRANSACTIONAL = 4 '\004', static FL_ALLOW_PARALLEL = 8 '\b', static FL_WAITED = 16 '\020', static FL_DDL = 32 ' ' } ```

Bad: rpl_parallel_start_waiting_for_prior was hit

1: GTID 0-1-5

2: GTID 0-1-7

3: GTID 0-1-6

Cause

GTID 0-1-2 was not hit.

midenok commented 1 year ago

Master log for good and bad

#230124 22:01:19 server id 1  end_log_pos 415 CRC32 0xef79f951  GTID 0-1-1 ddl
...
CREATE TABLE t3 (a INT PRIMARY KEY, b INT) ENGINE=InnoDB
#230124 22:01:20 server id 1  end_log_pos 580 CRC32 0xecb524a0  GTID 0-1-2 trans
...
INSERT INTO t3 VALUES (59,0)

GTID 0-1-2 looks unrelated as it is initialization commands and there is no signals on INSERT(59).

midenok commented 1 year ago

#0  Rpl_filter::get_do_db (this=0x557dd0de2e90, str=0x7f9452cae5a0) at /home/midenok/src/mariadb/10.4/src/sql/rpl_filter.cc:817
#1  0x0000557dcde2ec06 in Sys_var_rpl_filter::global_value_ptr (this=0x557dcec0dd18 <Sys_replicate_do_db>, thd=0x7f9434000d68, base_name=0x7f9434012770) at /home/midenok/src/mariadb/10.4/src/sql/sys_vars.cc:5112
#2  0x0000557dcdaac1ff in sys_var::value_ptr (this=0x557dcec0dd18 <Sys_replicate_do_db>, thd=0x7f9434000d68, type=SHOW_OPT_GLOBAL, base=0x7f9434012770) at /home/midenok/src/mariadb/10.4/src/sql/set_var.cc:282
#3  0x0000557dcd8d421b in Item_func_get_system_var::fix_length_and_dec (this=0x7f94340126a0) at /home/midenok/src/mariadb/10.4/src/sql/item_func.cc:5767
#4  0x0000557dcd8bcfcd in Item_func::fix_fields (this=0x7f94340126a0, thd=0x7f9434000d68, ref=0x7f94340127d0) at /home/midenok/src/mariadb/10.4/src/sql/item_func.cc:373
#5  0x0000557dcd86b1fb in Item::fix_fields_if_needed (this=0x7f94340126a0, thd=0x7f9434000d68, ref=0x7f94340127d0) at /home/midenok/src/mariadb/10.4/src/sql/item.h:966
#6  0x0000557dcd86ec39 in Item::fix_fields_if_needed_for_scalar (this=0x7f94340126a0, thd=0x7f9434000d68, ref=0x7f94340127d0) at /home/midenok/src/mariadb/10.4/src/sql/item.h:970
#7  0x0000557dcdb54954 in setup_fields (thd=0x7f9434000d68, ref_pointer_array=..., fields=..., column_usage=MARK_COLUMNS_READ, sum_func_list=0x7f9434013450, pre_fix=0x7f9434012368, allow_sum_func=true) at /home/midenok/src/mariadb/10.4/src/sql/sql_base.cc:7716
#8  0x0000557dcdc8fa50 in JOIN::prepare (this=0x7f9434013130, tables_init=0x0, wild_num=0, conds_init=0x0, og_num=0, order_init=0x0, skip_order_by=false, group_init=0x0, having_init=0x0, proc_param_init=0x0, select_lex_arg=0x7f9434012208, unit_arg=0x7f9434004c80) at /home/midenok/src/mariadb/10.4/src/sql/sql_select.cc:1303
#9  0x0000557dcdc8b2d7 in mysql_select (thd=0x7f9434000d68, tables=0x0, wild_num=0, fields=..., conds=0x0, og_num=0, order=0x0, group=0x0, having=0x0, proc_param=0x0, select_options=2147748608, result=0x7f9434013108, unit=0x7f9434004c80, select_lex=0x7f9434012208) at /home/midenok/src/mariadb/10.4/src/sql/sql_select.cc:4758
#10 0x0000557dcdc8abf7 in handle_select (thd=0x7f9434000d68, lex=0x7f9434004bc0, result=0x7f9434013108, setup_tables_done_option=0) at /home/midenok/src/mariadb/10.4/src/sql/sql_select.cc:437
#11 0x0000557dcdc2ee3b in execute_sqlcom_select (thd=0x7f9434000d68, all_tables=0x0) at /home/midenok/src/mariadb/10.4/src/sql/sql_parse.cc:6452
#12 0x0000557dcdc24898 in mysql_execute_command (thd=0x7f9434000d68) at /home/midenok/src/mariadb/10.4/src/sql/sql_parse.cc:3966
#13 0x0000557dcdc1dbeb in mysql_parse (thd=0x7f9434000d68, rawbuf=0x7f9434012170 "select @@global.replicate_do_db", length=31, parser_state=0x7f9452cb2490, is_com_multi=false, is_next_command=false) at /home/midenok/src/mariadb/10.4/src/sql/sql_parse.cc:7984
midenok commented 1 year ago

How rgi->wait_commit_sub_id assigned

0. last_committed_sub_id updated

#0  finish_event_group (rpt=0x7f2abc015b60, sub_id=7, entry=0x7f2abc028148, rgi=0x7f2abc0299f0) at ../src/sql/rpl_parallel.cc:208
#1  0x0000561577b55a06 in handle_rpl_parallel_thread (arg=0x7f2abc015b60) at ../src/sql/rpl_parallel.cc:1428
192         /*
193           Record that this event group has finished (eg. transaction is
194           committed, if transactional), so other event groups will no longer
195           attempt to wait for us to commit. Once we have increased
196           entry->last_committed_sub_id, no other threads will execute
197           register_wait_for_prior_commit() against us. ...
...
206         */
207         entry->last_committed_sub_id= sub_id;

1. get_rgi() acquires new GTID gev and generates sub_id per gev->domain_id

#0  0x0000561577ab7f72 in event_group_new_gtid (rgi=0x7f2abc024b30, gev=0x7f2abc02ca78) at ../src/sql/rpl_rli.cc:2187
#1  0x0000561577b5748e in rpl_parallel_thread::get_rgi (this=0x7f2abc015f70, rli=0x56157a932a68, gtid_ev=0x7f2abc02ca78, e=0x7f2abc028148, event_size=42) at ../src/sql/rpl_parallel.cc:1956
#2  0x0000561577b5970c in rpl_parallel::do_event (this=0x56157a935c80, serial_rgi=0x7f2abc00bc20, ev=0x7f2abc02ca78, event_size=42) at ../src/sql/rpl_parallel.cc:2811
#3  0x0000561577818b8a in exec_relay_log_event (thd=0x7f2abc00c708, rli=0x56157a932a68, serial_rgi=0x7f2abc00bc20) at ../src/sql/slave.cc:4202
#4  0x000056157781ca08 in handle_slave_sql (arg=0x56157a930da0) at ../src/sql/slave.cc:5460
2179    event_group_new_gtid(rpl_group_info *rgi, Gtid_log_event *gev)
2180    {
2181      uint64 sub_id= rpl_global_gtid_slave_state->next_sub_id(gev->domain_id);
....
2187      rgi->gtid_sub_id= sub_id;
2188      rgi->current_gtid.domain_id= gev->domain_id;
2189      rgi->current_gtid.server_id= gev->server_id;
2190      rgi->current_gtid.seq_no= gev->seq_no;
2191      rgi->commit_id= gev->commit_id;
2192      rgi->gtid_pending= true;
2193      return 0;
2194    }
restart 8

2. e->current_sub_id assigned from gtid_sub_id

#0  0x0000561577b59aee in rpl_parallel::do_event (this=0x56157a935c80, serial_rgi=0x7f2abc00bc20, ev=0x7f2abc02ca78, event_size=42) at ../src/sql/rpl_parallel.cc:2945
#1  0x0000561577818b8a in exec_relay_log_event (thd=0x7f2abc00c708, rli=0x56157a932a68, serial_rgi=0x7f2abc00bc20) at ../src/sql/slave.cc:4202
#2  0x000056157781ca08 in handle_slave_sql (arg=0x56157a930da0) at ../src/sql/slave.cc:5460

(rr) p rgi
$10 = (rpl_group_info *) 0x7f2abc024b30
(rr) p e
$11 = (rpl_parallel_entry *) 0x7f2abc028148
restart 7

3. rgi->wait_commit_sub_id assigned from e->current_sub_id

#0  0x0000561577b59788 in rpl_parallel::do_event (this=0x56157a935c80, serial_rgi=0x7f2abc00bc20, ev=0x7f2abc02d3c8, event_size=42) at ../src/sql/rpl_parallel.cc:2833
#1  0x0000561577818b8a in exec_relay_log_event (thd=0x7f2abc00c708, rli=0x56157a932a68, serial_rgi=0x7f2abc00bc20) at ../src/sql/slave.cc:4202
#2  0x000056157781ca08 in handle_slave_sql (arg=0x56157a930da0) at ../src/sql/slave.cc:5460
2833        rgi->wait_commit_sub_id= e->current_sub_id;
2834        rgi->wait_commit_group_info= e->current_group_info;

(rr) p e
$8 = (rpl_parallel_entry *) 0x7f2abc028148
(rr) p rgi
$9 = (rpl_group_info *) 0x7f2abc02d8b0
restart 6

4. Wait registered

#0  wait_for_commit::register_wait_for_prior_commit (this=0x7f2abc02d8e8, waitee=0x7f2abc028538, dep=true) at ../src/sql/sql_class.cc:7435
#1  0x0000561577b52981 in register_wait_for_prior_event_group_commit (rgi=0x7f2abc02d8b0, entry=0x7f2abc028148) at ../src/sql/rpl_parallel.cc:327
#2  0x0000561577b55298 in handle_rpl_parallel_thread (arg=0x7f2abc015958) at ../src/sql/rpl_parallel.cc:1266
318       if (rgi->wait_commit_sub_id > entry->last_committed_sub_id)
319       {
320         if ((rgi->speculation == rpl_group_info::SPECULATE_DEPEND) &&
321             !(rgi->gtid_ev_flags2 & Gtid_log_event::FL_ALLOW_PARALLEL))
322         {
323           if (entry->last_committed_sub_id < rgi->wait_noptim_sub_id)
324           {
325             wait_for_commit *waitee=
326               &rgi->wait_noptim_group_info->commit_orderer;
327             rgi->commit_orderer.register_wait_for_prior_commit(waitee, true);
328           }
(rr) p rgi
$4 = (rpl_group_info *) 0x7f2abc02d8b0
(rr) p entry
$5 = (rpl_parallel_entry *) 0x7f2abc028148
(rr) p rgi->wait_commit_sub_id
$6 = 10
(rr) p entry->last_committed_sub_id
$7 = 7
restart 5

Wait for commit happens

#0  wait_for_commit::wait_for_prior_commit2 (this=0x7f2abc02d8e8, thd=0x7f2ac4002138) at ../src/sql/sql_class.cc:7495
#1  0x0000561577b59f94 in wait_for_commit::wait_for_prior_commit (this=0x7f2abc02d8e8, thd=0x7f2ac4002138) at ../src/sql/sql_class.h:2094
#2  0x0000561577b5a04e in THD::wait_for_prior_commit (this=0x7f2ac4002138) at ../src/sql/sql_class.h:4651
#3  0x0000561577b553aa in handle_rpl_parallel_thread (arg=0x7f2abc015958) at ../src/sql/rpl_parallel.cc:1303