ninjasource / embedded-bacnet

A library to read and write bacnet packets for embedded devices
Apache License 2.0
12 stars 2 forks source link

extend simple client to ignore mismatching invoke_ids #2

Closed mnbbrown closed 2 months ago

mnbbrown commented 2 months ago

This extends send_and_receive_(complex|simple)_ack to ignore messages with mismatched invoke_ids, this may be the case in scenarios where the client is sending more than request concurrently.

Note: this seems to be subject to the issues described in this blog post: https://ntietz.com/blog/my-reference-was-dropped-why-is-the-compiler-complaining-about-multiple-borrows/ - have asked about it here: https://users.rust-lang.org/t/work-around-multiple-borrows-in-loop-borrow-checker-limitation/113650

ninjasource commented 2 months ago

Hi, thank you for bringing this use-case to my attention. Regardless of the borrow checker issue with your solution I think that dropping incoming messages is probably not a good idea. Since UDP is a connection-less protocol, if you intend to fire off a whole lot of requests simultaneously you are going to need to implement some form of packet reordering scheme. If you have that then you need to implement ACK and rate control and before you know it you have re-implemented the TCP protocol - but only from one end. The "simple" module in this bacnet library was not intended for such complexity. I think that a cleaner way to support I fire-hose of requests would be to use a MPSC (multi producer single consumer) channel and leave matching up invoke_ids to the user. Then use this bacnet library as a codec. An example of using this library as a pure codec can be seen in the "simple" module. Since you are using tokio I assume that this is quite a high level application (probably has an allocator at least) so you have the luxury of using the heap to pass data around and avoid those nasty borrow checker issues you're having.

mnbbrown commented 2 months ago

Hi, thank you for bringing this use-case to my attention. Regardless of the borrow checker issue with your solution I think that dropping incoming messages is probably not a good idea. Since UDP is a connection-less protocol, if you intend to fire off a whole lot of requests simultaneously you are going to need to implement some form of packet reordering scheme. If you have that then you need to implement ACK and rate control and before you know it you have re-implemented the TCP protocol - but only from one end. The "simple" module in this bacnet library was not intended for such complexity. I think that a cleaner way to support I fire-hose of requests would be to use a MPSC (multi producer single consumer) channel and leave matching up invoke_ids to the user. Then use this bacnet library as a codec. An example of using this library as a pure codec can be seen in the "simple" module. Since you are using tokio I assume that this is quite a high level application (probably has an allocator at least) so you have the luxury of using the heap to pass data around and avoid those nasty borrow checker issues you're having.

Thanks! This was a bit of a "experiment" so will close the PR.

The MPSC channel approach is what I'm using in my application. Once I've got it working I'll share it somehow (maybe as a follow up comment on this PR).

ninjasource commented 2 months ago

Ok great, good luck with the implementation and let me know if you encounter any other issues!

mnbbrown commented 2 months ago

Here's the channel based UDP tx/rx implementation (the actual BVLC sending and receiving is still WIP) - but others might find this useful anyway.

use std::{net::SocketAddr, sync::Arc};

use tokio::{
    net::{ToSocketAddrs, UdpSocket},
    sync::mpsc::{self, Receiver, Sender},
    task::JoinHandle,
};

use super::error::Result;

type PacketBuffer = Vec<u8>;

pub struct UDPTransport {
    max_packet_size: usize,
    socket: Arc<UdpSocket>,
    rx_task: JoinHandle<()>,
}

impl UDPTransport {
    pub async fn new<T: ToSocketAddrs>(
        address: T,
        max_packet_size: usize,
    ) -> Result<(UDPTransport, Receiver<PacketBuffer>)> {
        let socket = Arc::new(UdpSocket::bind(address).await?);
        socket.set_broadcast(true)?;

        let (rx_sender, rx_recv) = mpsc::channel(100); // TODO: should this be stream?
        let rx_task = tokio::spawn({
            let recv = UDPReciever {
                max_packet_size,
                socket: socket.clone(),
                chan: rx_sender,
            };

            // start recv loop
            async move { recv.run().await }
        });

        Ok((
            UDPTransport {
                max_packet_size,
                socket,
                rx_task,
            },
            rx_recv,
        ))
    }

    pub async fn send_to(self, buf: &[u8], target: SocketAddr) -> Result<usize> {
        Ok(self.socket.send_to(buf, target).await?)
    }
}

struct UDPReciever {
    max_packet_size: usize,
    socket: Arc<UdpSocket>,
    chan: Sender<PacketBuffer>,
}

impl UDPReciever {
    async fn run(mut self) {
        loop {
            let result = self.receive().await;
            if let Ok(buffer) = result {
                if !buffer.is_empty() {
                    if self.chan.send(buffer).await.is_err() {
                        return;
                    }
                }
            }
        }
    }

    async fn receive(&mut self) -> Result<Vec<u8>> {
        let mut buffer = vec![0u8; self.max_packet_size];
        let size = self.socket.recv(buffer.as_mut_slice()).await?;
        buffer.truncate(size);
        Ok(buffer)
    }
}
ninjasource commented 2 months ago

I don't know what your performance requirements are but the bytes crate is handy to handy for this kind of networking stuff. That's if you want to limit allocations and memory copies and such. Might be overkill though!