lemunozm / message-io

Fast and easy-to-use event-driven network library.
Apache License 2.0
1.11k stars 74 forks source link

Node API: zero-copy write/read message #61

Closed lemunozm closed 3 years ago

lemunozm commented 3 years ago

Problem

To read a message, you use:

let (mut network, mut events) = Network::split();
network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
loop {
    match events.receive() {
        NetEvent::Message(endpoint, data) => { /* data has been copied here */ },
        _ => (),
    }
}

Although the current API is quite simple, it has a drawback: in order to pass messages into the EventQueue, you need to perform a copy of that message. This is why the signature of the NetEvent::Message<Endpoint, Vec<u8>> has an allocated vector instead of a reference like &[u8]. This copy is necessary because once you send data into the queue, the lifetime of the referenced data is lost. The internal socket buffer can be overwritten with a new incoming message before you read the previous one.

To avoid this issue you can avoid sending the data into EventQueue in order to process the message directly from the AdapterEvent which signature reference the internal input buffer: AdapterEvent::Data(Endpoint, &[u8]). You can archieve this using the Network::new() constructor:

let mut network = Network::new(|adapter_event| {
    match adapter_event { 
        AdapterEvent::Data(endpoint, data) => { /* data direclty from adapter*/ },
        _ => (),
    }
});
network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();

Although this works with the desired performance, it reduces the API usage, for example:

These problems forced you to divide your application logic, offuscating the code: some events will be processed in the Network callback and other events will be processed in the EventQueue loop:

let events = EventQueue::new();
let sender = events.sender().clone();
let mut network = Network::new(move |adapter_event| {
    match adapter_event { 
        AdapterEvent::Data(endpoint, data) => { 
           // data directly from adapter
          let response = process_data(data);
          // Here I can not send by the network, I need to perform this action out of the callback.
          sender.send(UserEvent::SendResponse(endpoint, response));
        },
        _ => (),
    }
});

network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();

loop {
    match events.receive() {
        UserEvent::SendResponse(endpoint, response) => { 
              network.send(endpoint, response);
        },
        _ => (),
    }
}

Solution

To solve this problem, and allow the user to process all their events only in the callback, it is needed some additions:

Example 1

Signals as part of the network.

enum UserSignal {
    Tick(usize),
    Close,
    // other signals
}

let node = Network::new(|network, event|{
   match event { 
        NetEvent::Data(endpoint, data) => { 
              // data direclty from adapter
              network.send(endpoint, data);
              network.self_signal(UserSignal::Tick(1), Duration::from_millis(50));
        },
       NetEvent::Signal(signal) => match signal {
             UserSignal::Tick => { /* send other signal, call network action, etc... */ }
             UserSignal::Close => network.stop(), // The callback never be called again.
        }
       NetEvent::Connected(..) => (),
       NetEvent::Disconnected(..) => (),
    }
});
network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
network.self_signal(Close, Duration::from_secs(3));
network.wait_to_close();
// You still can make any network call and send signals outside the callback.

Example 2

Node concept: the node, contains network, signals and handles the internal thread . The node can be used inside and outside the callback.

enum UserSignal {
    Tick(usize),
    Close,
    // other signals
}

let node = Node::new(|node, event| {
   match event { 
        Event::Network(net_event) => match net_event {
             NetEvent::Data(endpoint, data) => { 
                 // data direclty from adapter
                 node.network.send(endpoint, data);
                 node.signals.send(UserSignal::Tick(1), Duration::from_millis(50));
             },
            NetEvent::Connected(..) => (),
            NetEvent::Disconnected(..) => (),
        }
        Event::Signal(signal) => match signal {
             UserSignal::Tick => { /* send other signal, call network action, etc... */ }
             UserSIgnal::Close => node.stop()
        }
    }
});
// In this moment the node is already running.
node.network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
node.signals.send(UserSignal::Close, Duration::from_secs(3));
node.await(); //waiting until the node be stoped.
// You still can make any network call and send signals outside the callback.
// ... 
let node = Node::new(...);
node.network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
std::thread::sleep(Duration::from_secs(3));
node.stop();

Example 3 candidate

Split node into a NodeHandler and a NodeListener. The handler manages the network, signals and can stop the internal thread. The NodeListener dispatch received events. The NodeHandler can be used both, inside and outside the callback.

enum UserSignal {
    Tick(usize),
    Close,
    // other signals
}

let (handler, listener) = Node::split();
node.network.listen(Transport::FramedTcp, "0.0.0.0:0").unwrap();
node.signals.send(UserSignal::Close, Duration::from_secs(3));
let node_task = listener.fo_each(|event| {
   match event { 
        NodeEvent::Network(net_event) => match net_event {
             NetEvent::Data(endpoint, data) => { 
                 // data direclty from adapter
                 node.network.send(endpoint, data);
                 node.signals.send(UserSignal::Tick(1), Duration::from_millis(50));
             },
            NetEvent::Connected(..) => (),
            NetEvent::Disconnected(..) => (),
        }
        NodeEvent::Signal(signal) => match signal {
             UserSignal::Tick => { /* send other signal, call network action, etc... */ }
             UserSIgnal::Close => node.stop()
        }
    }
});
// In this moment the node is already running.
// You can still make any network call and send signals outside the callback.
node.network.listen(Transport::Udp, "0.0.0.0:0").unwrap();
node.signals.send(UserSignal::Tick, Duration::from_secs(1));
drop(node_task); //waits until node.stop() be called.