minghuaw / toy-rpc

An async RPC in rust-lang that mimics golang's net/rpc
29 stars 2 forks source link

A question about that RPC hangs #40

Closed drmingdrmer closed 1 year ago

drmingdrmer commented 1 year ago

I've used toy-rpc 0.8.6 in an example raft-based application that sends RPC between nodes with toy-rpc: https://github.com/datafuselabs/openraft/tree/4b555c83a023ce92922d98f045f09eea47cc254e

Everything goes very well until recently the RPC looks like blocking forever with:

cd examples/raft-kv-rocksdb
RUST_LOG=trace cargo test

It happens on my m1-mac 12.0.1;

The toy-rpc service is defined as the following: https://github.com/datafuselabs/openraft/blob/4b555c83a023ce92922d98f045f09eea47cc254e/examples/raft-kv-rocksdb/src/network/raft.rs#L16-L36

#[export_impl]
impl Raft {
    #[export_method]
    pub async fn append(
        &self,
        req: AppendEntriesRequest<ExampleTypeConfig>,
    ) -> Result<AppendEntriesResponse<u64>, toy_rpc::Error> {
        tracing::debug!("handle append");
        self.app.raft.append_entries(req).await.map_err(|e| toy_rpc::Error::Internal(Box::new(e)))
    }

The tracing log shows that some messages are sent, but on the server side nothing is received: this log is not printed: https://github.com/datafuselabs/openraft/blob/4b555c83a023ce92922d98f045f09eea47cc254e/examples/raft-kv-rocksdb/src/network/raft.rs#L34

        tracing::debug!("handle append");

Is it an issue of toy-rpc or does the trace log show there could be something else going wrong?

This issue is found in:

Full log is attached here.

The tracing log when the blocking happens:

2023-02-17T03:58:42.905691Z DEBUG ThreadId(11) new{id=1 cluster=foo}:RaftCore{id=1 cluster=foo}:replication{id=1 target=2}:main{session=vote:1-1:committed/Some(1-1-2) target=2 cluster=foo}:send_log_entries: openraft::replication: start sending append_entries, timeout: 250 payload=vote=vote:1-1:committed, prev_log_id=None, leader_commit=Some(1-1-2), entries=0-0-0:membership: members:[{1:{ExampleNode { rpc_addr: "127.0.0.1:22001", api_addr: "127.0.0.1:21001" }}}],learners:[],1-1-1:blank,1-1-2:membership: members:[{1:{ExampleNode { rpc_addr: "127.0.0.1:22001", api_addr: "127.0.0.1:21001" }}}],learners:[2:{ExampleNode { rpc_addr: "127.0.0.1:22002", api_addr: "127.0.0.1:21002" }}]
2023-02-17T03:58:42.905742Z DEBUG ThreadId(11) new{id=1 cluster=foo}:RaftCore{id=1 cluster=foo}:replication{id=1 target=2}:main{session=vote:1-1:committed/Some(1-1-2) target=2 cluster=foo}:send_log_entries:send_append_entries: raft_kv_rocksdb::network::raft_network_impl: send_append_entries req=AppendEntriesRequest { vote: Vote { leader_id: LeaderId { term: 1, node_id: 1 }, committed: true }, prev_log_id: None, entries: [Entry { log_id: LogId { leader_id: LeaderId { term: 0, node_id: 0 }, index: 0 }, payload: Membership(Membership { configs: [{1}], nodes: {1: ExampleNode { rpc_addr: "127.0.0.1:22001", api_addr: "127.0.0.1:21001" }} }) }, Entry { log_id: LogId { leader_id: LeaderId { term: 1, node_id: 1 }, index: 1 }, payload: Blank }, Entry { log_id: LogId { leader_id: LeaderId { term: 1, node_id: 1 }, index: 2 }, payload: Membership(Membership { configs: [{1}], nodes: {1: ExampleNode { rpc_addr: "127.0.0.1:22001", api_addr: "127.0.0.1:21001" }, 2: ExampleNode { rpc_addr: "127.0.0.1:22002", api_addr: "127.0.0.1:21002" }} }) }], leader_commit: Some(LogId { leader_id: LeaderId { term: 1, node_id: 1 }, index: 2 }) }
2023-02-17T03:58:42.905812Z DEBUG ThreadId(11) new{id=1 cluster=foo}:RaftCore{id=1 cluster=foo}:replication{id=1 target=2}:main{session=vote:1-1:committed/Some(1-1-2) target=2 cluster=foo}:send_log_entries:send_append_entries: raft_kv_rocksdb::network::raft_network_impl: got connection
2023-02-17T03:58:42.905841Z DEBUG ThreadId(11) new{id=1 cluster=foo}:RaftCore{id=1 cluster=foo}:replication{id=1 target=2}:main{session=vote:1-1:committed/Some(1-1-2) target=2 cluster=foo}:send_log_entries:send_append_entries: raft_kv_rocksdb::network::raft_network_impl: got raft
2023-02-17T03:58:42.905945Z TRACE ThreadId(23) async_io::driver: block_on: sleep until notification    
2023-02-17T03:58:42.905967Z TRACE ThreadId(23) polling: Poller::notify()    
2023-02-17T03:58:42.905967Z TRACE ThreadId(18) async_std::task::builder: spawn    
2023-02-17T03:58:42.905983Z TRACE ThreadId(23) polling::kqueue: notify: kqueue_fd=6    
2023-02-17T03:58:42.906007Z TRACE ThreadId(16) polling::kqueue: new events: kqueue_fd=6, res=1    
2023-02-17T03:58:42.906013Z DEBUG ThreadId(23) toy_rpc::client::writer: Request { id: 0, service_method: "Raft.append", timeout: 10s }    
2023-02-17T03:58:42.906027Z TRACE ThreadId(18) polling: Poller::notify()    
2023-02-17T03:58:42.906037Z TRACE ThreadId(16) async_io::reactor: process_timers: 0 ready wakers    
2023-02-17T03:58:42.906043Z TRACE ThreadId(18) polling::kqueue: notify: kqueue_fd=6    
2023-02-17T03:58:42.906054Z TRACE ThreadId(16) async_io::reactor: react: 0 ready wakers    
2023-02-17T03:58:42.906064Z TRACE ThreadId(23) tungstenite::protocol: Frames still in queue: 0    
2023-02-17T03:58:42.906070Z TRACE ThreadId(16) async_io::driver: block_on: notified    
2023-02-17T03:58:42.906074Z TRACE ThreadId(18) async_io::driver: block_on: sleep until notification    
2023-02-17T03:58:42.906085Z TRACE ThreadId(23) tungstenite::protocol: Frames still in queue: 1    
2023-02-17T03:58:42.906089Z TRACE ThreadId(24) async_io::driver: main_loop: waiting on I/O    
2023-02-17T03:58:42.906096Z TRACE ThreadId(16) async_io::driver: block_on: sleep until notification    
2023-02-17T03:58:42.906110Z TRACE ThreadId(24) async_io::reactor: process_timers: 0 ready wakers    
2023-02-17T03:58:42.906125Z TRACE ThreadId(24) polling: Poller::wait(_, Some(9.9999155s))    
2023-02-17T03:58:42.906139Z TRACE ThreadId(24) polling::kqueue: wait: kqueue_fd=6, timeout=Some(9.9999155s)    
2023-02-17T03:58:42.906155Z TRACE ThreadId(24) polling::kqueue: new events: kqueue_fd=6, res=1    
2023-02-17T03:58:42.906147Z TRACE ThreadId(23) tungstenite::protocol: Sending frame: Frame { header: FrameHeader { is_final: true, rsv1: false, rsv2: false, rsv3: false, opcode: Data(Binary), mask: Some([86, 99, 205, 246]) }, payload: [0, 0, 11, 82, 97, 102, 116, 46, 97, 112, 112, 101, 110, 100, 10, 0] }    
2023-02-17T03:58:42.906180Z TRACE ThreadId(24) async_io::reactor: process_timers: 0 ready wakers    
2023-02-17T03:58:42.906196Z TRACE ThreadId(24) async_io::reactor: react: 0 ready wakers    
2023-02-17T03:58:42.906186Z TRACE ThreadId(23) tungstenite::protocol::frame: writing frame 
<FRAME>
final: true
reserved: false false false
opcode: BINARY
length: 22
payload length: 16
payload: 0x00b526166742e617070656e64a0

2023-02-17T03:58:42.906209Z TRACE ThreadId(24) async_io::driver: main_loop: sleeping for 50 us    
2023-02-17T03:58:42.906247Z TRACE ThreadId(23) tungstenite::protocol: Frames still in queue: 0    
2023-02-17T03:58:42.906295Z TRACE ThreadId(24) async_io::driver: main_loop: waiting on I/O    
2023-02-17T03:58:42.906311Z TRACE ThreadId(24) async_io::reactor: process_timers: 0 ready wakers    
2023-02-17T03:58:42.906325Z TRACE ThreadId(24) polling: Poller::wait(_, Some(9.999712375s))    
2023-02-17T03:58:42.906339Z TRACE ThreadId(24) polling::kqueue: wait: kqueue_fd=6, timeout=Some(9.999712375s)    
2023-02-17T03:58:42.906355Z TRACE ThreadId(24) polling::kqueue: new events: kqueue_fd=6, res=1    
2023-02-17T03:58:42.906356Z TRACE ThreadId(23) async_io::driver: block_on: sleep until notification    
2023-02-17T03:58:42.906375Z TRACE ThreadId(24) async_io::reactor: react: 1 ready wakers    
2023-02-17T03:58:42.906397Z TRACE ThreadId(24) async_io::driver: main_loop: sleeping for 50 us    
2023-02-17T03:58:42.906411Z TRACE ThreadId(23) tungstenite::protocol: Frames still in queue: 0    
2023-02-17T03:58:42.906422Z TRACE ThreadId(16) async_io::driver: block_on: waiting on I/O    
2023-02-17T03:58:42.906434Z TRACE ThreadId(23) tungstenite::protocol::frame::frame: Parsed headers [130, 144]    
2023-02-17T03:58:42.906441Z TRACE ThreadId(16) async_io::reactor: process_timers: 0 ready wakers    
2023-02-17T03:58:42.906454Z TRACE ThreadId(23) tungstenite::protocol::frame::frame: First: 10000010    
2023-02-17T03:58:42.906457Z TRACE ThreadId(16) polling: Poller::wait(_, Some(9.9995835s))    
2023-02-17T03:58:42.906473Z TRACE ThreadId(23) tungstenite::protocol::frame::frame: Second: 10010000    
2023-02-17T03:58:42.906479Z TRACE ThreadId(24) async_io::driver: main_loop: sleeping for 75 us    
2023-02-17T03:58:42.906484Z TRACE ThreadId(16) polling::kqueue: wait: kqueue_fd=6, timeout=Some(9.9995835s)    
2023-02-17T03:58:42.906489Z TRACE ThreadId(23) tungstenite::protocol::frame::frame: Opcode: Data(Binary)    
2023-02-17T03:58:42.906503Z TRACE ThreadId(23) tungstenite::protocol::frame::frame: Masked: true    
2023-02-17T03:58:42.906530Z TRACE ThreadId(23) tungstenite::protocol::frame: received frame 
<FRAME>
final: true
reserved: false false false
opcode: BINARY
length: 22
payload length: 16
payload: 0x5663c6a4375b9d83713bd93387c7f6

2023-02-17T03:58:42.906567Z TRACE ThreadId(23) tungstenite::protocol: Received message ���Raft.append
�    
2023-02-17T03:58:42.906594Z TRACE ThreadId(24) async_io::driver: main_loop: sleeping for 100 us    
2023-02-17T03:58:42.906612Z DEBUG ThreadId(23) toy_rpc::server::reader: Request { id: 0, service_method: "Raft.append", timeout: 10s }    
2023-02-17T03:58:42.906634Z TRACE ThreadId(23) tungstenite::protocol: Frames still in queue: 0    
2023-02-17T03:58:42.906655Z TRACE ThreadId(23) polling::kqueue: add: kqueue_fd=6, fd=48, ev=Event { key: 8, readable: true, writable: false }    
2023-02-17T03:58:42.906687Z TRACE ThreadId(23) async_io::driver: block_on: sleep until notification    
2023-02-17T03:58:42.906739Z TRACE ThreadId(24) async_io::driver: main_loop: sleeping for 250 us    
2023-02-17T03:58:42.907079Z TRACE ThreadId(24) async_io::driver: main_loop: sleeping for 500 us    
2023-02-17T03:58:42.907735Z TRACE ThreadId(24) async_io::driver: main_loop: sleeping for 750 us    
2023-02-17T03:58:42.908698Z TRACE ThreadId(24) async_io::driver: main_loop: sleeping for 1000 us    
2023-02-17T03:58:42.909972Z TRACE ThreadId(24) async_io::driver: main_loop: sleeping for 2500 us    
2023-02-17T03:58:42.913119Z TRACE ThreadId(24) async_io::driver: main_loop: sleeping for 5000 us    
2023-02-17T03:58:42.919386Z TRACE ThreadId(24) async_io::driver: main_loop: sleeping for 10000 us    
2023-02-17T03:58:43.158346Z TRACE ThreadId(23) polling: Poller::notify()    
2023-02-17T03:58:43.158328Z DEBUG ThreadId(10) new{id=1 cluster=foo}:RaftCore{id=1 cluster=foo}:replication{id=1 target=2}:main{session=vote:1-1:committed/Some(1-1-2) target=2 cluster=foo}:send_log_entries: openraft::replication: append_entries res: Err(Elapsed(()))
2023-02-17T03:58:43.158677Z TRACE ThreadId(23) polling::kqueue: notify: kqueue_fd=6    
2023-02-17T03:58:43.158784Z  WARN ThreadId(10) new{id=1 cluster=foo}:RaftCore{id=1 cluster=foo}:replication{id=1 target=2}:main{session=vote:1-1:committed/Some(1-1-2) target=2 cluster=foo}: openraft::replication: error replication to target=2 error=timeout after 250ms when AppendEntries 1->2
2023-02-17T03:58:43.158977Z ERROR ThreadId(10) new{id=1 cluster=foo}:RaftCore{id=1 cluster=foo}:replication{id=1 target=2}:main{session=vote:1-1:committed/Some(1-1-2) target=2 cluster=foo}: openraft::replication: RPCError err=timeout after 250ms when AppendEntries 1->2
minghuaw commented 1 year ago

Let me take a look

minghuaw commented 1 year ago

@drmingdrmer I have added a comment (https://github.com/datafuselabs/openraft/issues/677#issuecomment-1434334562) with what I think is the cause and the potential solution(s)

drmingdrmer commented 1 year ago

@minghuaw Got it! Thank you!

drmingdrmer commented 1 year ago

By replacing the default serde_bincode with serde_json, the hanging is gone. :heart: