lemunozm / message-io

Fast and easy-to-use event-driven network library.
Apache License 2.0
1.11k stars 74 forks source link

Blocking on for_each_async #111

Closed fabracht closed 2 years ago

fabracht commented 2 years ago

I'm trying to implement message-io as a tcp client within the actix framework. This is my current implementation:

use actix::Message;
use actix::{
    Actor, ActorContext, AsyncContext, Context, Handler, Running, StreamHandler, WrapFuture,
};
use log::info;
use message_io::network::{NetEvent, Transport};
use message_io::node::{
    self, NodeEvent, NodeHandler, NodeListener, NodeTask, StoredNetEvent, StoredNodeEvent,
};

pub struct TcpClientActor {
    handler: NodeHandler<String>,
    listener: Option<NodeListener<String>>,
}

impl TcpClientActor {
    pub fn new() -> Self {
        let (handler, listener) = node::split::<String>();
        TcpClientActor {
            handler,
            listener: Some(listener),
        }
    }
}

impl Actor for TcpClientActor {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        let message = serde_json::json!({
            "op": "subscribe",
            "topic": "/client_count"
        });
        // Listen for TCP, UDP and WebSocket messages at the same time.
        let (server, socket_address) = self
            .handler
            .network()
            .connect(Transport::Tcp, "192.168.43.217:9092")
            .unwrap();
    }

    fn stopping(&mut self, ctx: &mut Self::Context) -> Running {
        ctx.stop();
        Running::Stop
    }
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct Listen {}

impl Handler<Listen> for TcpClientActor {
    type Result = ();

    fn handle(&mut self, msg: Listen, ctx: &mut Self::Context) -> Self::Result {
        let listener = self.listener.take().unwrap();
        listener
            .for_each_async(move |event| {
                match event {
                    NodeEvent::Network(net_event) => match net_event {
                        NetEvent::Connected(_endpoint, _ok) => {
                            info!("Connected");
                            // handler.signals().send(message.to_string());
                        }
                        NetEvent::Accepted(_, _) => unreachable!(), // Only generated by listening
                        NetEvent::Message(_endpoint, data) => {
                            println!("Received: {}", String::from_utf8_lossy(data));
                        }
                        NetEvent::Disconnected(_endpoint) => (),
                    },
                    NodeEvent::Signal(signal) => match signal {
                        _ => {
                            // computed every second
                            info!("Signal Received: {}", signal);
                            // handler.signals().send_with_timer(signal, Duration::from_secs(1));
                        }
                    },
                }
            })
            .wait();
    }
}

The problem with the current implementation is that whenever the listener.for_each_async() method runs, the thread gets blocked and the entire program freezes. I though for_each_async was non-blocking. Am I misinterpreting something here? Thank you

Ps: If you need any information regarding the actix framework or anything else, just let me know and I'll write it down. I didn't want to overwhelm the issue with information that might not be needed.

sigmaSd commented 2 years ago

I remember I was also surprised by this, but its documented and what you need to do is also documented , just checkout for_each_async doc ;)

sigmaSd commented 2 years ago

Maybe it would be better if it had a #[must_use] attribute

lemunozm commented 2 years ago

Hi @fabracht,

I think the problem is that you are calling wait() after the for_each_async() method. The wait() method blocks the thread. Maybe what you want to do is to store the ~listener~ node task to avoid destroying it.

Regarding what @sigmaSd says, I remember that must_use was already introduced. Probably you use wait() to avoid that error, but in your case, the correct way is to store the ~listener~ node task to avoid blocking.

fabracht commented 2 years ago

Thanks for the reply. By storing the listener, do you mean not using something like this?

impl Handler<Listen> for TcpClientActor {
    type Result = ();

    fn handle(&mut self, msg: Listen, ctx: &mut Self::Context) -> Self::Result {
        if let Some(listener) = &self.listener {
            listener
                .for_each_async(move |event| {
                    match event {
                        NodeEvent::Network(net_event) => match net_event {
                            NetEvent::Connected(_endpoint, _ok) => {
                                info!("Connected");
                                // handler.signals().send(message.to_string());
                            }
                            NetEvent::Accepted(_, _) => unreachable!(), // Only generated by listening
                            NetEvent::Message(_endpoint, data) => {
                                println!("Received: {}", String::from_utf8_lossy(data));
                            }
                            NetEvent::Disconnected(_endpoint) => (),
                        },
                        NodeEvent::Signal(signal) => match signal {
                            _ => {
                                // computed every second
                                info!("Signal Received: {}", signal);
                                // handler.signals().send_with_timer(signal, Duration::from_secs(1));
                            }
                        },
                    }
                });
        }
    }
}

This way, instead of taking the value out of the Option, I use a reference to it. But that gives me an error

cannot move out of `*listener` which is behind a shared reference E0507 move occurs because `*listener` has type `NodeListener<std::string::String>`, which does not implement the `Copy` trait
sigmaSd commented 2 years ago
pub fn for_each_async(
    self,
    event_callback: impl FnMut(NodeEvent<'_, S>) + Send + 'static
) -> NodeTask

for_each_async consumes self, that's why you have to take ownership of the listener

What you need to store is not the listener but the nodetask that this function returns, something like:

self.node = listener.for_each_async(|_|/**/);

As long as node is alive the task will continue and it wont block

I'm not sure though how will this will work with actix, since you have to take ownership of the listener this means Handler::handle can be only called once, the second time it will panic because the listener has been moved.

lemunozm commented 2 years ago

I think the issue can be closed, if not, please feel free to reopen it 😃