bluesky / event-model

data model for event-based data collection and analysis
https://blueskyproject.io/event-model
BSD 3-Clause "New" or "Revised" License
13 stars 29 forks source link

Add ability for ComposeDescriptor to accept arbitrary kwargs to create descriptor #286

Closed rosesyrett closed 10 months ago

rosesyrett commented 10 months ago

Description

Currently the DAQ-Core team at diamond are in need of being able to pass metadata to descriptor documents, primarily for nexus file writing.

Motivation and Context

In our case, we want descriptor documents from streams to be easily identifiable and match up with the Application definitions for the experiments they represent (e.g. for tomography, detectors require an image key DataKey field).

How Has This Been Tested?

I've written a test confirming that extra kwargs are passed to the resulting descriptor document.

rosesyrett commented 10 months ago

At the moment I'm seeing mypy errors:

event_model/__init__.py:2427: error: Unsupported type "dict[str, Any]" for ** expansion in TypedDict  [typeddict-item]
Found 1 error in 1 file (checked 4 source files)

This is because I'm trying to unpack kwargs into the EventDescriptor, which is a TypedDict. The JSON schema suggests we can have any arbitrary fields, but the python TypedDict is obviously restrictive in what it can handle. Is there any nice way to deal with this?

evalott100 commented 10 months ago

I deleted some previous comments on this, they also result in mypy errors down the line... Should maybe discuss solutions at the meeting

evalott100 commented 10 months ago

This passes mypy on event-model:


# =================== __init__.py
class ComposeDescriptor:
    start: RunStart
    streams: dict
    event_counters: Dict[str, int]

    def __call__(
        self,
        name,
        data_keys,
        hints=None,
        configuration=None,
        object_keys=None,
        time=None,
        uid=None,
        validate=True,
        metadata=None,
    ) -> ComposeDescriptorBundle:
        if time is None:
            time = ttime.time()
        if uid is None:
            uid = str(uuid.uuid4())
        if hints is None:
            hints = {}
        if configuration is None:
            configuration = {}
        if object_keys is None:
            object_keys = {}

        doc = EventDescriptor(
            configuration=configuration,
            data_keys=data_keys,
            name=name,
            object_keys=object_keys,
            run_start=self.start["uid"],
            time=time,
            uid=uid,
            hints=hints,
        )
        if metadata:
            doc.update(metadata)

        if validate:
            if name in self.streams and self.streams[name] != set(data_keys):
                raise EventModelValidationError(
                    "A descriptor with the name {} has already been composed with "
                    "data_keys {}. The requested data_keys were {}. All "
                    "descriptors in a given stream must have the same "
                    "data_keys.".format(name, self.streams[name], set(data_keys))
                )
            schema_validators[DocumentNames.descriptor].validate(doc)

        if name not in self.streams:
            self.streams[name] = set(data_keys)
            self.event_counters[name] = 1

        return ComposeDescriptorBundle(
            descriptor_doc=doc,
            compose_event=ComposeEvent(
                descriptor=doc, event_counters=self.event_counters
            ),
            compose_event_page=ComposeEventPage(
                descriptor=doc, event_counters=self.event_counters
            ),
        )

# =================== test.py
run_doc, compose_descriptor, compose_resource, compose_stop = compose_run()

descriptor_doc, compose_event, compose_event_page = compose_descriptor(
    "name", {}, metadata={"a": "b"}, validate=True
)

descriptor_doc["data_keys"]
descriptor_doc["run_start"]
descriptor_doc["a"]

def foo(descriptor_doc: EventDescriptor):
    ...

foo(descriptor_doc)
DiamondJoseph commented 10 months ago

Decided to place this metadata on StartDocument, with some mapping object. Assumption is that enough information available at run start to resolve onto any eventual stream creations (even if creating streams proceedurally).