bluesky / event-model

data model for event-based data collection and analysis
https://blueskyproject.io/event-model
BSD 3-Clause "New" or "Revised" License
13 stars 30 forks source link

Add new document for "bulk" external resources #219

Closed tacaswell closed 1 year ago

tacaswell commented 2 years ago

Currently the implementation of external data is limited to filling in data on a per-event per-field basis, basically the smallest quanta of data that the system can express. While this works extremely well for something like image stacks of equal length, this works very poorly in cases where the data is natively ragged (single-photon detectors) or when there are many many relatively small data sets (e.g. scanned florescence data). The difficulties vary from fully breaking the ability to put the data in an xarray to pathological performance problems.

One way out of this is to expand what can be "external".

option A:

Expand the notion of "external" to be able to replace all of the rows in a field in a stream at once. This would help significantly in cases like scanned florescence (because we can read out all of the data en-masse), but it would make it much much harder to pull out a single events worth of data (but not impossible). You could imagine saying "these col_datum is for index N -> N+m" so you get chunk level access. A wrinkle that this is going to have is that we have the ability to emit multiple events with the same event number and we will silently ignore all but the last one (so if you re-take points it "does the right thing" 😱 . This probably was a poor choice,but it made sense at the time).

The splicing to merge these into tiled is probably do-able. We already do the splicing on a per-row basis and tiled already has a deep sense of chunking. You would still keep the rule that the col_datum would have to go out before any event that refers to it goes out would still hold and I guess we would just duplicate the same col_datum_uid in all of the events.

A pro of this plan is that we can still use the structure of the Events to do the association between the "big" data and the "small" data and collate between multiple data sources in a very structural way. That is, an Event that says "I have a motor position, an image, and a spectrum that are all 'at the same time for science'" is is much more powerful statement than "there are 3 streams in this Run (good luck guessing the names!) that should be the same length and you can zip them...good luck!" in the same way that saying "this is a data frame" is a more powerful statement than "this is a dict of numpy arrays that I promise are all the same length".

option B:

The same as option A, but also replace multiple fields. Again, this would help significantly with multi-channel spectral data as we could read up the whole x, y, channel, E hyper cube up at once. You could still get the nice chunking, as option A but maybe even better. This has all of the complexity of the above, but now spread across columns. Also has the same pros.

At some point, this turns into an implementation of panda's block manager.

option C:

Scale the notion up to be "the entire stream".

This gets around all of the splicing related issues (because it is everything). This will handle cases like where we have some detector/device that outputs table like data in large volumes. If we had an analogy of stream_reosource and maybe stream_datum (you could say "it is the whole stream, you'll only ever have one datum so stuff everything in the resource", but I think this is wrong as you may have a detector that writes an un-known number of fixed sized files. It is better to positively say "and I expect another file" rather than to say "just glob all of those" because you will be able to detect when a file is missing (the document claimed it was there but you can not find it!) and resilient against the detector writing more later with the same file-prefix that is unrelated).

This would also be very useful for single-photon detectors (where you do not know the size of the output until (well) after the fact) and complicated flyscans that may be already writing data out to (multiple) files.

A down side of this approach is that we would lose the ability to associate the rows with an event an another stream via indexing (but other methods like time stamping or including an "event index column" would still work. The later may be the best approach to dealing with "ragged" data anyway.

option D

Option C but provide a bunch of streams.

The down side of this is that it is a bunch of extra book keeping / semantics, but an upside would be that complicated files can provide mulitple streams and we will be able to open them only once.


I think we should implement option D and maybe B, but do D first. B has the possibility of providing some big speed improvements, but I think is actually much more complex to implement. On the other had D will provide a very good escape hatch for some problems that have come up at both BNL and as DLS is thinking about adapting / wrapping / extending malcom.

Going down the path of D we should do our best to keep "feature" parity with the current scheme so we should still get a descriptor per out-sourced stream and we should make a best-effort to include event counts in the stop document. To make this work we would have to add two new documents stream_resource, stream_datum. I propose the following schema:

- stream_resource:
   - spec : str # mapping to how to read this
     root : str # the "root" part of the resource path
     resource_path : str # the rest of the resource path, may want to call this 'uri'?
     path_semantics : {unix | windows}  # \ vs / maybe, maybe not? 
     uid : str # a uuid
     run_start : str  # mapping to the run start this belongs to
     stream_names : List[str]   # list of the stream names this provides
     resource_kwargs : Dict[str, Any]  # anything to pass into the handler
- stream_datum
   - resource_id : str refernce back to the stream_resource
     uid : a uuid
     stream_name : str # the same of the stream this is providing a block of
     block_id : int   # the order in the stream of this block
     datum_kwargs : Dict[str, Any]  # what to pass to call the handler with
     event_offset : int # what is the sequence number of the first event in this block

and signatures:

class StreamHandler:
    def __init__(self, uri_path: str, **resource_kwargs):
        ...

    def get_block(self, stream_name: str, block_id: int, **datum_kwargs):
        ...

The order rules would be descriptor must come before a stream_resource, stream_resource must come before a stream_datum. I think we should prohibit emitting more than one descriptor if using external streams (because otherwise spelling at what point to change descriptors gets too hard). You can have as many stream_resource describing a stream as you want and each stream_resource can be referred to by as many stream_datum as you want, but block_id must be a unique run of integers with no gaps and be emitted in order. event_offset must be unique and increasing, but can (obviously) have gaps.

cjtitus commented 2 years ago

Thanks for writing this all up. My one question is -- in option D, what will the data look like when it is finally "filled"? And what will the syntax be?

Normally you have run.primary.read() to give you the data in tabular form. I think the whole point of the stream handler is that your data may not be in a form that is amenable to shoving in one table, because it may be ragged, etc.

So for some stream "external", would we have run.external? What would this be? A node that has been created via a Tiled adapter that is determined from the spec?

tacaswell commented 2 years ago

It would be run.extrenal_stream.read() just like it is now. From the client point of view you should not be able to tell that the data is stored externally, it will just look like another stream.

There is currently no node type in tiled that "natively" handles ragged data correctly (we reed to add the ability to expose awkward arrays). For these cases currently the best approach is probably to go with a "long form" with one of the columns being the "group" it is in as that is something we can currently express (and is more-or-less how awkward handles it under the hood but provides a nice API on top that is effectively variably striding, but I digress).

At the end of the day, because tiled is a service the server needs to be able to tell the client "this is what I am about to send you" in way we can serialize (as opposed to in Python where you can say "that function will return 'a numpy array'" and leave it at that level of detail).

cjtitus commented 2 years ago

Ok, cool. I actually already do a long form with one column being "pixel", and the other columns being timestamp and photon energy, so I think that will work nicely for my detector, and other multi-element photon-counting detectors. This is an approach that I, personally, would be happy with, and it sounds like it would translate well to awkward array in the future, but will be useful before native support is implemented.

maffettone commented 2 years ago

Some hopefully quick questions for @tacaswell :

  1. Can you point me at something that is directly using event model, so I get a feeling for the use case? Is the run engine processing Msgs to call functions like event_model.compose_run?
  2. Is "stream" the right name to use here? It may get confusing/degenerate with other things called streams. Would sequential_resource and sequential_datum be more adequate? Or is sequential too limiting of a description.
  3. Compatibility... This one might not be quick. My instinct is to extend core classes like DocumentRouter and Filler to have methods for the new documents. NamedTuple classes like ComposeRunBundle and their associated unit tests will start to fail if I extend them because of positional unpacking. I'm timid about editing tests and prefer to add new ones, but I don't think that'll be possible in this case, which makes me nervous about breaking downstream bluesky functionality that depends on this behavior...
tacaswell commented 2 years ago

Is the run engine processing Msgs to call functions like event_model.compose_run?

No, event_model is a refactoring of the logic that was scattered through the RE. There is an idea to make RE actually use event model, but I do not think we have done that yet. The best reference may either the event model tests or https://github.com/bluesky/bluesky/blob/master/bluesky/tests/test_callbacks.py#L518

Is "stream" the right name to use here?

I think so, the goal so to make it so that data is produced via these documents and data produced via making queries to mongo are indistinguishable from the user point of view. For example, imagine you have a parquet file on disk, using these documents you should be able to graft that onto a run as a single stream.

My instinct is to extend core classes like DocumentRouter and Filler to have methods for the new documents

yes, that is what I would go (and we have do that before).

NamedTuple classes like ComposeRunBundle and their associated unit tests will start to fail if I extend them

oh bad. Using named tuple was an easy way to get dotted name access, but does carry with it this order and length problem. It hind sight maybe these should have been data classes instead (side comment, the spread operator in JS is very nice, kinda wish we had that in Python).

Leaving aside if we want to abandon named tuples for now, I would break ComposeRunBundle by adding a new factory family on the end. This at least keeps the breakage in one place and makes it impossible to have a mix of Events and these new documents in the same stream (which in principle there is nothing wrong with, but lets get it working at all and the have someone ask for that before we make the most complex thing we can imagine....).

We could do terrible things to the named-tuple subclass to only iterate through the first N things, but if we went that way it should be a stepping stone to dropping positional unpacking all together.

The trade off I see here is either breaking some stuff now vs maintaining internally inconsistent compat code ("so has N+1 keys, but I can only iterate over N of them?!") vs building a second full set of method families.

I could also see a case for making compose_run take a flag like include_whole_stream_tools (note: rename this) that controls if it returns current RunBundler or ExtendedRunBundler. It is a bit of type instability, but only in the case of positional unpacking and it lets users opt-into the breaking change. If it defaults to False, it also gives us a chance to start warning that it will change. Instead of this being a bool, doing it as extension_level=0 (with 0 being the current state, 1 being what you are adding now, and if we add more in the future, we could increment it further).

maffettone commented 2 years ago

Sounds good, I rely on unit tests to guide my quest, and will call things stream.

As for the breaking problem, and the future proofing, I think we may be best off with something like an extended data class. It's roughly similar to doing terrible things to a namedtuple subclass, but I think it's defensible. It will be fully backward compatible with the namedtuple implementation, and allow extension to new types of documents without breaking anything. This also avoids having new flags to think about, or a global variable to track how many new document types have been added since 2022.

The notion here is if you are going to unpack this, you are going to get a simplified set of documents that will meet most use cases. If you want to work with more document types you should start to be explicit. If this existing pre v3.10 I'd consider plugging it in (https://docs.python.org/3.10/library/dataclasses.html#dataclasses.KW_ONLY).

from dataclasses import dataclass
from typing import Union

@dataclass
class ComposeBundle:
    start_doc: dict
    compose_descriptor: callable
    compose_resource: callable
    compose_stop: callable
    compose_stream_resource: Union[None, callable] = None

    def __iter__(self):
        return iter(
            (
                self.start_doc,
                self.compose_descriptor,
                self.compose_resource,
                self.compose_stop,
            )
        )

def foo():
    pass

if __name__ == "__main__":
    start_doc, compose_descriptor, compose_resource, compose_stop = ComposeBundle(
        {}, foo, foo, foo
    )

    bundle = ComposeBundle({}, foo, foo, foo, compose_stream_resource=foo)
    (
        start_doc,
        compose_descriptor,
        compose_resource,
        compose_stream_resource,
        compose_stop,
    ) = (
        bundle.start_doc,
        bundle.compose_descriptor,
        bundle.compose_resource,
        bundle.compose_stream_resource,
        bundle.compose_stop,
    )