CCSDSPy / ccsdspy

I/O interface and utilities for CCSDS binary spacecraft data in Python. Library used in flight missions at NASA, NOAA, and SWRI
https://ccsdspy.org
BSD 3-Clause "New" or "Revised" License
74 stars 18 forks source link

Feature Request: load from byte array for live parsing #83

Closed scandey closed 10 months ago

scandey commented 11 months ago

It seems like CCSDSpy would be viable for live plotting of telemetry if I could pass in a byte string/array of chunks of mixed binary data and process a second's worth of data at a time.

Absolutely reasonable to say this is out of scope, but it feels close to being viable with the existing structure. I see that _load wants a numpy array of bytes and I wish I could just dump that array in directly rather than saving to file first. For my particular data stream, I can guarantee even splits between packets so hopefully it'd be relatively clean.

In the longer term, allowing such a thing might benefit from a little bit of overhead to handle the case of missing bytes or extra bytes (if a packet is split across a chunk boundary). Both cases would preferably (to me) still return the successful packets up to that point and then return the incomplete packets or extra bytes for handling at a higher level.

ddasilva commented 11 months ago

Hi @scandey ,

This is an interesting idea, and I think it's a more simple to do than we might think. I think we could make this work by modifying utils.iter_packet_bytes() to incrementally call .read(n) on a file-like object to read the next n bytes as needed, blocking for more input when its not available. Then, we could do something like this to handle streaming telemetry over a socket:

  1. Open a socket as a file-like object
  2. For each packet bytes in util_iter_packet_bytes(socket_file)
    • Get the APID with utils.read_primary_headers(packet_bytes)
    • Parse the packet bytes with the correct packet definition for the given APID
    • Do something with that parsed packet

Do you have a use case in mind for your project? It would cool to actually prove this works end-to-end.

In the longer term, allowing such a thing might benefit from a little bit of overhead to handle the case of missing bytes or extra bytes (if a packet is split across a chunk boundary). Both cases would preferably (to me) still return the successful packets up to that point and then return the incomplete packets or extra bytes for handling at a higher level.

Right now a number of functions will issue a warning if bytes are missing at the end of the stream to complete the length stated in the last packet (utils.iter_packet_bytes() does this). I have been thinking it actually makes more sense to report extra bytes instead of missing bytes. If you have an incomplete header at the end of the file, it's not quite possible to determine how many bytes are missing from the last packet.

scandey commented 11 months ago

I'm working on internal calibration for the electric fields instrument on the TRACERS spacecraft (project/SOC at UIowa, instrument at UC Berkeley). I have lots of relatively simple CCSDS packets streaming in that I currently chunk into files of few minutes at a time to look at with CCSDSpy. I currently split the files by APID into new files and read with CCSDSpy, but it would be nice to iterate over packets, grab the ones I want into byte arrays by APID using util_iter_packet_bytes and split_by_apid, and only do the full loading process on a subset (without making a series of files). That would also serve nicely for pulling in live data from a socket (though I'd still chunk the data elsewhere for archive).

I agree that number of extra bytes is better than missing bytes, even better if there's an option to return the bytes themselves (so they can be easily tacked on to the top of the next array/file as desired). All of this still assumes that the packets are generally well formed and that eventually well-formed packets are guaranteed to arrive (so no state machines needed).

ddasilva commented 11 months ago

Thanks for explaining @scandey ! Does the solution I posted above with iter_packet_bytes() work for you? If not, do you have any ideas for what an API for this might look like?

scandey commented 11 months ago

That's close to what I'd like to do in practice, but I already have a Python-based command and telemetry system that provides bytearrays of raw ccsds packets and it would be nice to drop those into util_iter_packet_bytes() directly (and then split, process and return as you described).

ddasilva commented 11 months ago

Thanks @scandey . Can you be more specific as how these byte arrays are represented in your system and how they stream in? Does a file just grow on disk, or something else? I might have been premature in assuming it was streamed over a socket

scandey commented 11 months ago

Sorry for confusing things! There are two very similar use-cases I'm working with at the moment. One of them is indeed a socket-based version for real-time analysis. I'm currently using an internal python-based command/data handling system which could easily dump well-formed CCSDS packets over a socket for real time analysis. The other use-case is mostly an extension of the first: I have plain CCSDS packets (no frame/sync info) going into a file (all the data from that same socket concept, but saved to a fresh file every 10 minutes). Packets are guaranteed to not split across file boundaries and I'm pretty confident that a half-saved file will also have all well-formed CCSDS packets (assuming I am handling the flush operations correctly).

For either/both of these cases, I'd like to harness the nice packet definition system of CCSDSpy to quickly inspect and collect packets one at a time in the style of iter_packet_bytes. I'd like one byte array (as provided by iter_packet_bytes) to be passed directly into a packet.load (looking at just the primary header, for instance) and then depending on the APID pass the same byte array into a different packet.load. I realize this intentionally defeats the purpose of the efficient numpy array processing (by only operating on one packet at a time). It also might end up taking up a Lot of memory for larger packets, depending on implementation. I'm tentatively hopeful that it is doable, even if it isn't the most efficient option for long-term processing.

The big benefit to me is that I can go straight from a mixed-APID file to data analysis in one step, rather than having to save out a file for each APID and then load that file back in. Pseudo-python logic for the chunked file as I imagine it:

byte_arrays = {} # per APID
# file is a mixed APID, assumed to all be well-formed packets
iterator_of_byte_arrays = utils.iter_packet_bytes(file)
byte_array = next(iterator_of_byte_arrays)
apid = primary_header_packet.load(byte_array)['CCSDS_APID'][0] # there's only one value in the returned array
if apid == my_favorite_packet_apid:
    packet_dict = my_favorite_packet.load(byte_array)
    # do something cool with the single packet (same dictionary of numpy array format)
    cool_thing(packet_dict)
    # and maybe save byte array to a larger collection of byte_arrays per APID held in memory for later use?
    byte_arrays[apid].append(byte_array)

# later in data analysis, once you've built up a long byte array for each APID
all_my_favorite_packets_dict = my_favorite_packet.load(byte_arrays[my_favorite_packet_apid])
# do something even cooler with many packets
cooler_thing(all_my_favorite_packets_dict)

EDIT: utils.split_by_apid can replace a lot of my logic above, so the simplified code would look like:

# file is a mixed APID, assumed to all be well-formed packets
streams_by_apid_dict = utils.split_by_apid(file)
favorite_packet_byte_array = numpy.frombuffer(streams_by_apid[my_favorite_packet_apid].getbuffer(), "u1")
all_my_favorite_packets_dict = my_favorite_packet.load(favorite_packet_byte_array)

I'm not sure the BytesIO from split_by_apid can be directly read as numpy.frombuffer and whether reading it like that would prevent the BytesIO object from being mutated later (to add more data if desired).

EDIT2: I went ahead and forked and adjusted the load/_load parameters (quick and messy change) to allow for passing numpy byte arrays directly, seems to work okay on first glance, though the BytesIO objects cannot be resized later (not sure how to get rid of the view provided by bytesio_object.getbuffer()). Since at the moment split_by_apid doesn't take existing ByteIO streams, the buffers are fixed once returned by split_by_apid and so this problem is moot.

ddasilva commented 10 months ago

Hey Scott, just got back from vacation.

It should be easier to use utils.get_packet_apid() to determine the APID: https://docs.ccsdspy.org/en/latest/api/ccsdspy.utils.get_packet_apid.html#ccsdspy.utils.get_packet_apid

I'm not sure why you are using numpy.frombuffer() on the BytesIO instance before passing to .load(). The .load() methods as well as all util.split_by_apid() and util.iter_packet_bytes() will take file-like objects directly (BytesIO is a file-like object). Maybe this is easier than you thought?

Does this solve your issue?

scandey commented 10 months ago

I guess I was making it massively overcomplicated, that seems to work. I'm not sure now how I got stuck on that idea of passing buffers around as opposed to BytesIO objects... maybe the issue that prompted that choice will turn up again but for now I'll close this massively overly-complicated issue. Thank you for taking the time to walk me through the logic!

(I'll split out the network / live feed idea into a separate issue and the extra bytes not missing bytes into another separate issue, since this one is quite... overloaded by my confusion from last week).