Accumulate NAL unit fragments into a ring buffer instead of via ref-counted Bytes fragments.
Mirrored ring buffer?
My first idea: use a mirrored ring buffer (umsafe VM tricks) slice-deque or similar for the read buffer.
Advantages:
It'd be very easy to use large enough read buffers (#5) without memcpying to get contiguous slices.
We wouldn't need the bytes crate, which I think might be more a tokio-only thing than something familiar to async-std folks or folks using threads with pure std.
No allocations of bytes::BytesMut or their behind-the-scenes Shared. The only regularly occurring allocations would be for final frames.
the pieces array in h264::Depacketizer would be smaller: 16 bytes per element rather than the current (on 64-bit platforms) 32 bytes for a bytes::Bytes (plus another 32 bytes for the Shared mentioned above). Some CPU cache efficiency improvements.
Disadvantages:
slice-deque isn't well-maintained now unfortunately. gnzlbg/slice_deque#94 but there are enough people who care about it that it might be forked if necessary.
Creating and destroying sessions would be more expensive, but I don't think this happens often enough to care about.
Some internal implementation complexity as described below.
rather than rtsp_types::Message<Bytes>, we'd use rtsp_types::Message<Handle> or something which has a ref to the input buffer and the range within it.
depacketizers could keep around Range<u64> of input byte positions; Depacketizer::pull would get a const ref to the input buffer. It could use this to look up those ranges into slices into the buffer.
depacketizers would need to advertise their low water position (earliest byte they still need). Eg, in the H.264 depacketizer, this would be self.pieces.first().map(|p| p.0).
depacketizers would need to copy stuff before yielding it in the Stream (as it's impractical for the consumer to communicate its low water position back in this way), but I decided to do that anyway (#4).
the Stream that returns PacketItems would also need to copy for the same reason. I don't expect this stream to be commonly used anyway.
retina::client::Demuxed would use a BinaryHeap to track the lowest of the low water positions of each depacketizer, lazily updated. It'd drop data from the front of the input buffer accordingly.
the two streams (Demuxed which returns CodecItems, and the PacketItem stream currently built into Session<Playing>) would be implemented via a common ancestor instead of the former being implemented in terms of the latter.
safe ring buffer
But actually we could use a plain safe ring buffer. The only problematic case is when a ring wraps back over. In that case we can either:
do a contiguous read (just the end piece), and copy any incomplete portion over to the beginning of the ring before doing a second read.
do a writev style read (read into the end and beginning), and likewise if the beginning read is incomplete, memmove the second piece to make some extra space (expanding if necessary), then copy over the first piece.
The overhead from the extra copy is less than a single read so it's not a big deal.
slab for UDP
The ring buffer approach makes sense for TCP (both requests/replies and interleaved data messages). Less sense for UDP, particularly if using recvmmsg on Linux to support reading multiple messages at once. The problem is that we don't know the byte length of the packets, and the biggest possible packet (65,535 bytes - 20 (IP header size) - 8 (UDP header size) = 65,507 bytes) is much larger than a typical packet (non-fragmented, 1,500-byte MTU - 28 bytes of headers = 1,472 bytes), and I don't want 97+% waste.
So for UDP, here's another idea: have a slab of packet buffers, read into one of them with recvmsg or maybe multiple simultaneously with recvmmsg. Use scatter/gather into a slab-based primary buffer that's of reasonable length and an overflow buffer for the remainder. When overflow happens, we extend the primary buffer with the overflow. "Reasonable length" can be initially our PMTU length (to the peer, on the assumption the path is roughly symmetrical), then based on the largest length we've seen.
buffer size limits and API changes
We don't really have any limit on how much stuff we buffer right now. It's even worse when using TCP and the ring buffer and Demuxed, because if one stream stops mid-access unit, we'll grow and grow even if the other streams are all behaving. We should have a limit on the total size of the TCP ring buffer. Also for UDP we should have a limit on the total size we keep around. Perhaps within SessionOptions and UdpTransportOptions, respectively.
Related open API question: do we want to just allow access to the most recent packet (with the packet-level API) or frame (with the Demuxed API)? Probably makes sense then to stop using futures::Stream (which because Rust doesn't have GAT can't return items borrowed from the stream). Or do we want to allow the caller to hold onto things for longer (subject to the limits above)? If the latter, I think we need some sort of extra step where you check if the item you got is still good. And if we continue to have these be Send/Sync, we need to have some sort of internal locking.
Accumulate NAL unit fragments into a ring buffer instead of via ref-counted
Bytes
fragments.Mirrored ring buffer?
My first idea: use a mirrored ring buffer (umsafe VM tricks) slice-deque or similar for the read buffer.
Advantages:
bytes
crate, which I think might be more a tokio-only thing than something familiar to async-std folks or folks using threads with pure std.bytes::BytesMut
or their behind-the-scenesShared
. The only regularly occurring allocations would be for final frames.pieces
array inh264::Depacketizer
would be smaller: 16 bytes per element rather than the current (on 64-bit platforms) 32 bytes for abytes::Bytes
(plus another 32 bytes for theShared
mentioned above). Some CPU cache efficiency improvements.Disadvantages:
slice-deque
isn't well-maintained now unfortunately. gnzlbg/slice_deque#94 but there are enough people who care about it that it might be forked if necessary.Here's how it would work:
TcpStream
into the uninitialized portion of this buffer viatokio::io::AsyncRead::poll_read
.rtsp_types::Message<Bytes>
, we'd usertsp_types::Message<Handle>
or something which has a ref to the input buffer and the range within it.Range<u64>
of input byte positions;Depacketizer::pull
would get a const ref to the input buffer. It could use this to look up those ranges into slices into the buffer.self.pieces.first().map(|p| p.0)
.Stream
(as it's impractical for the consumer to communicate its low water position back in this way), but I decided to do that anyway (#4).Stream
that returnsPacketItem
s would also need to copy for the same reason. I don't expect this stream to be commonly used anyway.retina::client::Demuxed
would use aBinaryHeap
to track the lowest of the low water positions of each depacketizer, lazily updated. It'd drop data from the front of the input buffer accordingly.Demuxed
which returnsCodecItem
s, and thePacketItem
stream currently built intoSession<Playing>
) would be implemented via a common ancestor instead of the former being implemented in terms of the latter.safe ring buffer
But actually we could use a plain safe ring buffer. The only problematic case is when a ring wraps back over. In that case we can either:
writev
style read (read into the end and beginning), and likewise if the beginning read is incomplete, memmove the second piece to make some extra space (expanding if necessary), then copy over the first piece.The overhead from the extra copy is less than a single read so it's not a big deal.
slab for UDP
The ring buffer approach makes sense for TCP (both requests/replies and interleaved data messages). Less sense for UDP, particularly if using
recvmmsg
on Linux to support reading multiple messages at once. The problem is that we don't know the byte length of the packets, and the biggest possible packet (65,535 bytes - 20 (IP header size) - 8 (UDP header size) = 65,507 bytes) is much larger than a typical packet (non-fragmented, 1,500-byte MTU - 28 bytes of headers = 1,472 bytes), and I don't want 97+% waste.So for UDP, here's another idea: have a slab of packet buffers, read into one of them with
recvmsg
or maybe multiple simultaneously withrecvmmsg
. Use scatter/gather into a slab-based primary buffer that's of reasonable length and an overflow buffer for the remainder. When overflow happens, we extend the primary buffer with the overflow. "Reasonable length" can be initially our PMTU length (to the peer, on the assumption the path is roughly symmetrical), then based on the largest length we've seen.buffer size limits and API changes
We don't really have any limit on how much stuff we buffer right now. It's even worse when using TCP and the ring buffer and
Demuxed
, because if one stream stops mid-access unit, we'll grow and grow even if the other streams are all behaving. We should have a limit on the total size of the TCP ring buffer. Also for UDP we should have a limit on the total size we keep around. Perhaps withinSessionOptions
andUdpTransportOptions
, respectively.Related open API question: do we want to just allow access to the most recent packet (with the packet-level API) or frame (with the
Demuxed
API)? Probably makes sense then to stop usingfutures::Stream
(which because Rust doesn't have GAT can't return items borrowed from the stream). Or do we want to allow the caller to hold onto things for longer (subject to the limits above)? If the latter, I think we need some sort of extra step where you check if the item you got is still good. And if we continue to have these beSend
/Sync
, we need to have some sort of internal locking.