ninjasource / embedded-bacnet

A library to read and write bacnet packets for embedded devices
Apache License 2.0
14 stars 2 forks source link

Working with the lifetimes on DataLink #3

Open mnbbrown opened 3 months ago

mnbbrown commented 3 months ago

Caveat: relatively new to rust, familiar with BACnet

Do you have a suggestion for a container that can be used to move DataLink<'a> around with it's underlying buffer safely? For example, consider this task handling incoming datagrams:

let incoming_packet_handler = tokio::spawn({
    // clone Arc<Mutex<HashMap<u8, oneshot::Sender>> of pending confirmed requests
    let pending = pending_transactions.clone();

    async move {
        while let Some(datagram) = rx.recv().await {
            let mut reader = Reader::new_with_len(datagram.len());
            let decoded = DataLink::decode(&mut reader, &datagram);

            match decoded {
                Ok(data_link) => {
                    if let Some(npdu) = data_link.npdu {
                        if let NetworkMessage::Apdu(ApplicationPdu::ComplexAck(ack)) =
                            npdu.network_message
                        {
                            if let Some(tx) = pending.lock().await.remove(&ack.invoke_id) {
                                tx.send(Ok(ack));
                            }
                        }
                    };
                }
                Err(_) => todo!("implement decoding error handling"),
            }
        }
    }
});

It currently fails to compile with "datagram dropped here while still borrowed". This makes sense given the DataLink has borrowed a reference to datagram which is dropped at the end of this context.

Do you have any suggestion on how I might move the ownership of datagram into a container with DataLink to avoid this?

P.S. sorry if some of the ownership nomenclature is wrong..

ninjasource commented 3 months ago

Don't be sorry, this is one of the trickiest areas of Rust to head your head around. Part of the reason why async was so difficult to implement and why we need the Pin API but I digress! The reason you can't store a DataLink in its own isolated struct (without lifetimes) is because Rust won't let you have self referential structs (*without memory pinning). This is incompatible with not having a runtime and performing fast memory moves. The price we pay for zero cost memory management.

However, you've got me thinking about a way I can make this Bacnet library more ergonomic to use in the way you want to use it. I already use a cursor (the Reader) so there should (theoretically) be no need for the DataLink or ComplexAck to have a lifetime linking it to the input buffer. It currently stores slices into the input buffer but I don't see why these can't simply be represented as indices. One complication I can think of with this approach is how to represent BitString data types. Another downside is that I won't be able to implement the into_iter() trait on lists so it may be bad for the API. I'll give it a think.

In the meantime, you can always decode the message twice. Once to get the invoke_id so you can match it up, then again on the receiving end of your tx ack channel. That data structure would have the entire datagram in it, no lifetimes. This might seem like a waste but decoding up to the ComplexAck (remember that this is not decoding the entire message - think of it as a header of sorts) is very fast and probably not where your bottleneck will be.

Alternatively, do all your bacnet processing inside that incoming_packet_handler. Passing something with a lifetime across a thread boundary is not common unless that lifetime is `static. Think of the DataLink struct as a codec rather than the final resting place of your data. At some point you're going to want to put things in a fully owned data structure. Perhaps I could add an "alloc" feature to this bacnet library to expose fully owned structs rather than what we have now with lifetimes and iterators.

One last thing you could try is to use a buffer pool which hands out buffers with static lifetimes. Then you can put your datalink into a struct like so:

struct MyStruct {
    datalink: DataLink<'static>,
    my_buffer: BufferPoolBuffer<'static>,   
}

Then again, that may not work because DataLink is not Send.

mnbbrown commented 3 months ago

Amazingly detailed answer - really appreciate it! Wrapping my head around lifetimes (and references not just being pointers) is proving challenging but I've made a lot of progress in 48 hours.

I've gone for the option of decoding twice:

  1. Once to get the invoke_id and if there are potentially any more APDUs to come in a segmented message
  2. The actual decoding.

I think this is the cleanest option - then I'm just passing Vec's around - and it's easier for my nascent rust brain to grep.

        let incoming_packet_handler = tokio::spawn({
            let pending_tx = pending_transactions.clone();
            let packet_buffer = pending_datagrams.clone();

            async move {
                while let Some(datagram) = rx.recv().await {
                    let mut reader = Reader::new_with_len(datagram.len());
                    let decoded = DataLink::decode(&mut reader, &datagram);

                    match decoded {
                        Ok(data_link) => {
                            if let Some(npdu) = data_link.npdu {
                                if let Some((has_more, invoke_id)) = match npdu.network_message {
                                    NetworkMessage::Apdu(ApplicationPdu::ComplexAck(ack)) => {
                                        Some((false, ack.invoke_id))
                                    }
                                    NetworkMessage::Apdu(ApplicationPdu::SimpleAck(ack)) => {
                                        Some((false, ack.invoke_id))
                                    }
                                    // NOTE: using ack.more_follows implies packets arrived in order
                                    // TODO: send acks where we have recieved max_window_size
                                    NetworkMessage::Apdu(ApplicationPdu::Segment(ack)) => {
                                        Some((ack.more_follows, ack.invoke_id))
                                    }
                                    _ => None,
                                } {
                                    if let Some(buffer) =
                                        packet_buffer.lock().await.get_mut(&invoke_id)
                                    {
                                        buffer.push(datagram);
                                    }

                                    if !has_more {
                                        if let Some(tx) = pending_tx.lock().await.remove(&invoke_id)
                                        {
                                            tx.send(Ok(()));
                                        }
                                    }
                                }
                            };
                        }
                        Err(_) => todo!("implement decoding error handling"),
                    }
                }
            }
        });

Re ergonomics - I'm not sure indices is better. Don't they end up just kind of being poor mans refs? If your underlying buffer has been dropped it's just an index to nothing.

mnbbrown commented 3 months ago

That all said, I think I'm going to end up implementing something akin to this:

Alternatively, do all your bacnet processing inside that incoming_packet_handler. Passing something with a lifetime across a thread boundary is not common unless that lifetime is `static. Think of the DataLink struct as a codec rather than the final resting place of your data. At some point you're going to want to put things in a fully owned data structure. Perhaps I could add an "alloc" feature to this bacnet library to expose fully owned structs rather than what we have now with lifetimes and iterators.

So I can implement something like this:

    pub async fn do_confirmed<'b>(
        self,
        address: SocketAddr,
        service: ConfirmedRequestService<'_>,
        timeout: u64,
    ) -> Result<ComplexAck<'b>> {
        let invoke_id = self.invoke_id_store.next().await;
        let tx = self.new_transaction(invoke_id).await;

        // build service request
        let apdu = ApplicationPdu::ConfirmedRequest(ConfirmedRequest::new(invoke_id, service));
        let message = NetworkMessage::Apdu(apdu);
        let npdu = NetworkPdu::new(None, None, true, MessagePriority::Normal, message);
        let bvlc = DataLink::new(DataLinkFunction::OriginalUnicastNpdu, Some(npdu));

        // send service request
        // TODO: how long should buffer be, supporting segmented send
        let mut buf = vec![0; 1400];
        let mut writer = Writer::new(buf.as_mut_slice());
        bvlc.encode(&mut writer);
        let buffer = writer.to_bytes();
        self.transport.send_to(buffer, address).await?;

        // wait for service ack
        match tokio::time::timeout(tokio::time::Duration::from_millis(timeout), tx.recv()).await {
            Ok(Ok(_)) => {
                if let Some(datagrams) = self.pending_datagrams.lock().await.remove(&invoke_id) {
                    // merge (if segmented) and decode..
                }
            }
            Ok(Err(e)) => Err(e),
            Err(_) => Err(ClientError::Timeout),
        }
    }
ninjasource commented 3 months ago

Nice one, glad I could help!

" // TODO: how long should buffer be, supporting segmented send"

I've always gone with buffer lengths of 1500 but that doesn't leave enough space for the IPv4 and and UDP headers so something like 1440 bytes might be safer. I haven't really experimented with BACnet segmentation so I don't know how it behaves.

Btw, if you plan to return a ComplexAck like that instead of an owned data structure then you'll probably want to pass in the buffer that makes it up too or else you will get borrow checker compilation errors.

Something like this should work:

    pub async fn do_confirmed<'b>(
        self,
        result_buffer: &'b mut [u8]
        address: SocketAddr,
        service: ConfirmedRequestService<'_>,
        timeout: u64,
    ) -> Result<ComplexAck<'b>> {
    // some code
    }
ninjasource commented 3 months ago

Amazingly detailed answer - really appreciate it! Wrapping my head around lifetimes (and references not just being pointers) is proving challenging but I've made a lot of progress in 48 hours.

I've gone for the option of decoding twice:

1. Once to get the invoke_id and if there are potentially any more APDUs to come in a segmented message

2. The actual decoding.

I think this is the cleanest option - then I'm just passing Vec's around - and it's easier for my nascent rust brain to grep.

        let incoming_packet_handler = tokio::spawn({
            let pending_tx = pending_transactions.clone();
            let packet_buffer = pending_datagrams.clone();

            async move {
                while let Some(datagram) = rx.recv().await {
                    let mut reader = Reader::new_with_len(datagram.len());
                    let decoded = DataLink::decode(&mut reader, &datagram);

                    match decoded {
                        Ok(data_link) => {
                            if let Some(npdu) = data_link.npdu {
                                if let Some((has_more, invoke_id)) = match npdu.network_message {
                                    NetworkMessage::Apdu(ApplicationPdu::ComplexAck(ack)) => {
                                        Some((false, ack.invoke_id))
                                    }
                                    NetworkMessage::Apdu(ApplicationPdu::SimpleAck(ack)) => {
                                        Some((false, ack.invoke_id))
                                    }
                                    // NOTE: using ack.more_follows implies packets arrived in order
                                    // TODO: send acks where we have recieved max_window_size
                                    NetworkMessage::Apdu(ApplicationPdu::Segment(ack)) => {
                                        Some((ack.more_follows, ack.invoke_id))
                                    }
                                    _ => None,
                                } {
                                    if let Some(buffer) =
                                        packet_buffer.lock().await.get_mut(&invoke_id)
                                    {
                                        buffer.push(datagram);
                                    }

                                    if !has_more {
                                        if let Some(tx) = pending_tx.lock().await.remove(&invoke_id)
                                        {
                                            tx.send(Ok(()));
                                        }
                                    }
                                }
                            };
                        }
                        Err(_) => todo!("implement decoding error handling"),
                    }
                }
            }
        });

Re ergonomics - I'm not sure indices is better. Don't they end up just kind of being poor mans refs? If your underlying buffer has been dropped it's just an index to nothing.

You're right, using indices would kind-of be pointless. It wouldn't return corrupt memory but it would panic.

ninjasource commented 2 months ago

After a major refactor I have added the alloc feature to the crate and enabled it by default. This means that all BACnet objects are fully owned (that lifetime attached to them is static btw). This was a fairly tricky change because I wanted to avoid as much duplication as possible. To this end I added a static reference to any owned BACnet object that required it using the PhantomData technique (a zero cost type). At some point I'll remove it but I need to make sure that code duplication and conditional compilation do not get out of hand.

This should make your segment work a lot easier because you can store each segment separately without the buffer that was used to decode it.