rabbitmq / ra

A Raft implementation for Erlang and Elixir that strives to be efficient and make it easier to use multiple Raft clusters in a single system.
Other
813 stars 96 forks source link

Handle calls and queries in receive_snapshot state #466

Closed the-mikedavis closed 1 month ago

the-mikedavis commented 1 month ago

In a situation where a follower is behind the rest of the cluster, it might receive local queries or calls when it enters the the receive_snapshot state. Currently this results in a {unsupported_call,_} error being returned and a "receive_snapshot received unhandled msg" debug log. Instead we should redirect calls and handle queries like we do in other states like await_condition.

@mkuratczyk found a way to reproduce this in RabbitMQ with Khepri enabled. He starts a definition import on rabbit-1 in a three node cluster and then: rabbitmqctl -n rabbit-3 stop_app; sleep 30; rabbitmqctl -n rabbit-3 start_app.

Sample log output... ``` 2024-08-19 15:56:12.339233+02:00 [debug] <0.2731.0> ra_log_segment_writer: cleaning closed table for 'RABBITCRB202QBKTNV' range: 86019-88056 2024-08-19 15:56:12.339309+02:00 [debug] <0.2728.0> segment_writer: deleting wal file: 00000003.wal 2024-08-19 15:56:12.385548+02:00 [debug] <0.2735.0> RabbitMQ metadata store: ra_log:init recovered last_index_term {88056,1} first index 86019 2024-08-19 15:56:12.457920+02:00 [debug] <0.2735.0> RabbitMQ metadata store: post_init -> recover in term: 1 machine version: 1 2024-08-19 15:56:12.457995+02:00 [debug] <0.2735.0> RabbitMQ metadata store: recovering state machine version 1:1 from index 86018 to 88013 2024-08-19 15:56:12.458260+02:00 [info] <0.2698.0> Waiting for Khepri leader for 30000 ms, 9 retries left 2024-08-19 15:56:12.550954+02:00 [debug] <0.2735.0> RabbitMQ metadata store: recovery of state machine version 1:1 from index 86018 to 88013 took 92ms 2024-08-19 15:56:12.551032+02:00 [debug] <0.2735.0> RabbitMQ metadata store: scanning for cluster changes 88014:88056 2024-08-19 15:56:12.551410+02:00 [debug] <0.2735.0> RabbitMQ metadata store: recover -> recovered in term: 1 machine version: 1 2024-08-19 15:56:13.045204+02:00 [debug] <0.2735.0> RabbitMQ metadata store: recovered -> follower in term: 1 machine version: 1 2024-08-19 15:56:13.045310+02:00 [debug] <0.2735.0> RabbitMQ metadata store: is not new, setting election timeout. 2024-08-19 15:56:13.045883+02:00 [info] <0.2698.0> Khepri leader elected 2024-08-19 15:56:13.045929+02:00 [info] <0.2698.0> Waiting for Khepri projections for 30000 ms, 9 retries left 2024-08-19 15:56:13.055748+02:00 [debug] <0.2735.0> RabbitMQ metadata store: begin_accept snapshot at index 1130499 in term 2 2024-08-19 15:56:13.104175+02:00 [debug] <0.2735.0> RabbitMQ metadata store: follower -> receive_snapshot in term: 2 machine version: 1 2024-08-19 15:56:13.104268+02:00 [debug] <0.2735.0> RabbitMQ metadata store: receiving snapshot chunk: 1 / next, index 1130499, term 2 2024-08-19 15:56:13.104855+02:00 [debug] <0.2735.0> RabbitMQ metadata store: receive_snapshot received unhandled msg: {leader_call,{command,normal,{...}}} 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> BOOT FAILED 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> =========== 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> Exception during startup: 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> throw:{unsupported_call,{leader_call,{command,normal,{'$usr',{register_projection,[rabbit_db_exchange,exchanges,{if_name_matches,any,undefined},{if_name_matches,any,undefined}],{khepri_projection,ra bbit_khepri_exchange,copy,[{keypos,2},named_table]}},{await_consensus,#{reply_from => local}}}}}} 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> rabbit_khepri:-register_projections/0-lc$^9/1-0-/1, line 1140 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> rabbit_khepri:register_projections/0, line 1141 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> rabbit_khepri:wait_for_register_projections/2, line 319 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> rabbit_khepri:setup/1, line 267 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> rabbit:run_prelaunch_second_phase/0, line 389 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> rabbit:start/2, line 911 2024-08-19 15:56:13.105042+02:00 [error] <0.2698.0> application_master:start_it_old/4, line 293 ```

The reproduction case isn't reliable: the restarted node needs to enter the receive_snapshot state and then receive the projection registration commands or local queries (via rabbit_khepri:wait_for_leader/0) which both currently happen on init (though that will change soon).

the-mikedavis commented 1 month ago

I'm not sure there's a straightforward way to test this - we need a follower to enter the receive_snapshot state and then to send it commands. Maybe we can pause the leader process to stop it from sending the snapshot in the first place and then send the commands/queries?

michaelklishin commented 1 month ago

@the-mikedavis we can try pausing the process. If that turns out to be too flaky or complex, we can "delegate" testing of this to Khepri and RabbitMQ, which are tested in many different ways, including semi-automated ways.

the-mikedavis commented 1 month ago

Ok I looked into it a bit more and I think it would be pretty complex to test this reliably in Ra. The code itself is very similar to what we do for await_condition: https://github.com/rabbitmq/ra/blob/e2cffb73b5e5838427e4f44573aa44631c539822/src/ra_server_proc.erl#L936-L947

So I think in this case it'd be ok to verify it only with the steps against the server. What do you think @kjnilsson?

michaelklishin commented 1 month ago

@the-mikedavis go for it.