minghuaw / toy-rpc

An async RPC in rust-lang that mimics golang's net/rpc
26 stars 2 forks source link

Weird pub/sub "race condition" / lack of delivery? #38

Open brandonros opened 1 year ago

brandonros commented 1 year ago

I have a relatively unique use case where I need to do async Rust from a Windows DLL export (which is typically just an fn, not an async fn)

client.rs:

use std::sync::Arc;
use std::{os::raw::c_void};
use common::rpc_pubsub::*;
use common::structs::*;
use common::logger;
use futures::StreamExt;
use tokio::sync::Mutex;
use toy_rpc::Client;

#[no_mangle]
#[allow(non_snake_case, unsupported_calling_conventions)]
extern "stdcall" fn PassThruOpen(name: *const c_void, device_id: *mut u32) -> i32 {
    // logging
    logger::init_logger();
    // runtime
    let runtime = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    return runtime.block_on(async move {
        // client + subscriber
        let addr = "ws://127.0.0.1:23333";
        let mut client = Client::builder().set_ack_mode_none().dial_websocket(addr).await.unwrap();
        let subscriber = client.subscriber::<Message>(None).unwrap();
        let client = Arc::new(Mutex::new(client));
        let subscriber = Arc::new(Mutex::new(subscriber));
        // subscriber
        log::info!("spawning subscriber thread");
        let rt_handle = tokio::runtime::Handle::current();
        rt_handle.spawn(async move {
            log::info!("getting subscriber");
            let mut subscriber = subscriber.lock().await;
            log::info!("got subscriber");
            loop {
                log::info!("calling next");
                let result = subscriber.next().await;
                log::info!("{:?}", result);
            }
        });
        log::info!("spawned");
        // rpc call
        log::info!("name = {:?} device_id = {:?}", name, device_id);
        let request = PassThruOpenRequest {
            name: String::from("") // TODO: from FFI parameter
        };
        let client_clone = client.clone();
        let client = client_clone.lock().await;
        let response: PassThruOpenResponse = client.call("J2534Service.PassThruOpen", request).await.unwrap();
        // map response back to FFI
        unsafe { *device_id = response.device_id };
        return response.ret_val;
    });
}

server.rs:

use common::logger;
use common::rpc_services::*;
use common::rpc_pubsub::*;
use serde::Deserialize;
use serde::Serialize;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use toy_rpc::pubsub::Topic;
use std::sync::Arc;
use toy_rpc::Server;
use futures::SinkExt;

#[tokio::main(flavor = "current_thread")]
async fn main() {
    // logger
    logger::init_logger();
    // server
    let addr = "127.0.0.1:23333";
    let j2534_service = J2534Service{};
    let j2534_service = Arc::new(j2534_service);
    let server = Server::builder()
        .register(j2534_service)
        .build();
    let listener = TcpListener::bind(addr).await.unwrap();
    let server_future = server.accept_websocket(listener);
    // publisher
    let publisher = server.publisher::<Message>();
    let publisher = Arc::new(Mutex::new(publisher));
    let rt_handle = tokio::runtime::Handle::current();
    rt_handle.spawn(async move {
        loop {
            let item = Message {
                foo: String::from("hello, world")
            };
            let mut publisher = publisher.lock().await;
            publisher.send(item).await.unwrap();
            log::info!("wrote");
            tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; 
        }
    });
    // block
    server_future.await;
}

common.rs:

pub mod ffi {
    use std::os::raw::c_void;

    pub type PassThruOpenFn = unsafe extern "stdcall" fn(name: *const c_void, device_id: *mut u32) -> i32;
}

pub mod structs {
    use serde::{Serialize, Deserialize};

    #[derive(Serialize, Deserialize, Debug)]
    pub struct PassThruOpenRequest {
        pub name: String
    }

    #[derive(Serialize, Deserialize, Debug)]
    pub struct PassThruOpenResponse {
        pub ret_val: i32,
        pub device_id: u32
    }
}

pub mod constants {
    pub const ERR_FAILED: i32 = 0x07;
}

pub mod logger {
    pub fn init_logger() {
        simple_logger::init_with_level(log::Level::Info).unwrap();
    }
}

pub mod rpc_services {
    use toy_rpc::macros::{export_impl};

    use crate::structs::*;

    pub struct J2534Service { }

    #[export_impl]
    impl J2534Service {
        #[export_method]
        pub async fn PassThruOpen(&self, req: PassThruOpenRequest) -> Result<PassThruOpenResponse, String> {
            log::info!("{:?}", req);
            let response = PassThruOpenResponse {
                device_id: 0,
                ret_val: 0
            };
            Ok(response)
        }
    }
}

pub mod rpc_pubsub {
    use serde::{Serialize, Deserialize};
    use toy_rpc::pubsub::Topic;

    #[derive(Debug, Serialize, Deserialize)]
    pub struct Message {
        pub foo: String
    }

    impl Topic for Message {
        type Item = Message;

        fn topic() -> String {
            "Message".into()
        }
    }
}

I see the messages being published from the server, but I don't receive them in the client.

I've played with it for a bit and I've seen weird behavior where I actually do get them to receive, but it's on the 2nd call and they come in a giant burst.

It could very well be my implementation/understanding, it might not be an issue with the library. I'd appreciate some guidance if you could spare it if that is the case.

minghuaw commented 1 year ago

Thanks for raising the issue. I haven't done much testing on a Windows machine. I will look at this on my Windows machine and get back to you ASAP :)

brandonros commented 1 year ago

The trace logs are interesting:

     Running `target/debug/diag_test`
2023-02-03T22:43:52.461Z INFO  [diag_test] calling...
2023-02-03T22:43:52.693Z TRACE [mio::poll] registering event source with poller: token=Token(2147483649), interests=READABLE
2023-02-03T22:43:52.694Z TRACE [mio::poll] registering event source with poller: token=Token(0), interests=READABLE | WRITABLE
2023-02-03T22:43:52.695Z TRACE [tungstenite::handshake::client] Request: "GET / HTTP/1.1\r\nHost: 127.0.0.1:23333\r\nConnection: Upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 13\r\nSec-WebSocket-Key: FOIjzE+9fLTGYDZ60FCnYA==\r\n\r\n"
2023-02-03T22:43:52.695Z TRACE [tungstenite::handshake::client] Client handshake initiated.
2023-02-03T22:43:52.695Z TRACE [tungstenite::handshake::machine] Doing handshake round.
2023-02-03T22:43:52.695Z TRACE [tungstenite::handshake::machine] Doing handshake round.
2023-02-03T22:43:52.695Z TRACE [tungstenite::handshake::machine] Doing handshake round.
2023-02-03T22:43:52.695Z TRACE [tungstenite::handshake::machine] Doing handshake round.
2023-02-03T22:43:52.695Z DEBUG [tungstenite::handshake::client] Client handshake done.
2023-02-03T22:43:52.695Z INFO  [vpi] spawning subscriber thread
2023-02-03T22:43:52.696Z INFO  [vpi] spawned
2023-02-03T22:43:52.696Z INFO  [vpi] name = 0x0 device_id = 0x16b996400
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Frames still in queue: 0
2023-02-03T22:43:52.696Z INFO  [vpi] getting subscriber
2023-02-03T22:43:52.696Z INFO  [vpi] got subscriber
2023-02-03T22:43:52.696Z INFO  [vpi] calling next
2023-02-03T22:43:52.696Z DEBUG [toy_rpc::client::writer] Subscribe { id: 1, topic: "Message" }
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Frames still in queue: 0
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Frames still in queue: 1
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Sending frame: Frame { header: FrameHeader { is_final: true, rsv1: false, rsv2: false, rsv3: false, opcode: Data(Binary), mask: Some([183, 255, 86, 170]) }, payload: [4, 1, 7, 77, 101, 115, 115, 97, 103, 101] }
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol::frame] writing frame 
<FRAME>
final: true
reserved: false false false
opcode: BINARY
length: 16
payload length: 10
payload: 0x4174d657373616765

2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Frames still in queue: 0
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Frames still in queue: 0
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Frames still in queue: 1
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Sending frame: Frame { header: FrameHeader { is_final: true, rsv1: false, rsv2: false, rsv3: false, opcode: Data(Binary), mask: Some([174, 153, 137, 58]) }, payload: [] }
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol::frame] writing frame 
<FRAME>
final: true
reserved: false false false
opcode: BINARY
length: 6
payload length: 0
payload: 0x

2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Frames still in queue: 0
2023-02-03T22:43:52.696Z DEBUG [toy_rpc::client::writer] Request { id: 0, service_method: "J2534Service.PassThruOpen", timeout: 10s }
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Frames still in queue: 0
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Frames still in queue: 1
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Sending frame: Frame { header: FrameHeader { is_final: true, rsv1: false, rsv2: false, rsv3: false, opcode: Data(Binary), mask: Some([88, 245, 255, 113]) }, payload: [0, 0, 25, 74, 50, 53, 51, 52, 83, 101, 114, 118, 105, 99, 101, 46, 80, 97, 115, 115, 84, 104, 114, 117, 79, 112, 101, 110, 10, 0] }
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol::frame] writing frame 
<FRAME>
final: true
reserved: false false false
opcode: BINARY
length: 36
payload length: 30
payload: 0x00194a32353334536572766963652e50617373546872754f70656ea0

2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Frames still in queue: 0
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Frames still in queue: 0
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Frames still in queue: 1
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Sending frame: Frame { header: FrameHeader { is_final: true, rsv1: false, rsv2: false, rsv3: false, opcode: Data(Binary), mask: Some([52, 44, 97, 128]) }, payload: [0] }
2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol::frame] writing frame 
<FRAME>
final: true
reserved: false false false
opcode: BINARY
length: 7
payload length: 1
payload: 0x0

2023-02-03T22:43:52.696Z TRACE [tungstenite::protocol] Frames still in queue: 0
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol] Frames still in queue: 0
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol::frame::frame] Parsed headers [130, 3]
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol::frame::frame] First: 10000010
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol::frame::frame] Second: 11
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol::frame::frame] Opcode: Data(Binary)
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol::frame::frame] Masked: false
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol::frame] received frame 
<FRAME>
final: true
reserved: false false false
opcode: BINARY
length: 5
payload length: 3
payload: 0x101

2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol] Received message 
2023-02-03T22:43:52.697Z DEBUG [toy_rpc::client::reader] Response { id: 0, is_ok: true }
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol] Frames still in queue: 0
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol] Frames still in queue: 0
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol::frame::frame] Parsed headers [130, 2]
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol::frame::frame] First: 10000010
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol::frame::frame] Second: 10
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol::frame::frame] Opcode: Data(Binary)
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol::frame::frame] Masked: false
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol::frame] received frame 
<FRAME>
final: true
reserved: false false false
opcode: BINARY
length: 4
payload length: 2
payload: 0x00

2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol] Received message 
2023-02-03T22:43:52.697Z TRACE [tungstenite::protocol] Frames still in queue: 0
2023-02-03T22:43:52.697Z TRACE [mio::poll] deregistering event source from poller
2023-02-03T22:43:52.697Z INFO  [diag_test] device_id = 0
2023-02-03T22:43:52.697Z INFO  [diag_test] ret_val = 0
brandonros commented 1 year ago

Do you have any guess as to why

rt_handle.spawn(async move {
            loop {
                log::info!("getting subscriber");
                let mut subscriber = subscriber.lock().await;
                log::info!("got subscriber");
                log::info!("calling next");
                let result = subscriber.next().await;
                log::info!("{:?}", result);
            }
        });

the trace logs would reveal there are messages in queue but .next().await is never firing? Am I calling that incorrectly?

brandonros commented 1 year ago

For what it's worth I was just calling out the weird need for stdcall extern fn architecture. I'm actually temporarily doing the test on Mac OS X (where the problem is still happening).

minghuaw commented 1 year ago

Good to know. Let me try this on my own machine

minghuaw commented 1 year ago

I am a little confused. Were you able to get some subscribed message or nothing at all?

The pubsub on the server is a rather naive implementation that doesn't hold the message in the queue if there is no subscriber. The publisher on the server side doesn't seem to wait for subscriber, so all the messages it publishes before the subscriber connects are lost.

minghuaw commented 1 year ago

The pubsub is not a proper message queue. It was really just a hack for a feature request. I could probably make it behave more like a proper message queue in an update.

Otherwise, I feel like a proper message queue broker should be used. I have another crate https://github.com/minghuaw/fe2o3-amqp that you can use to talk to message brokers that speak AMQP 1.0.

minghuaw commented 1 year ago

If you are able to receive some published messages but only those that are published after subscriber is connected then this is likely caused by my bad implementation. And another mistake is that I forgot to mention such behaviour in the documentation