bluesky / ophyd

hardware abstraction in Python with an emphasis on EPICS
https://blueskyproject.io/ophyd
BSD 3-Clause "New" or "Revised" License
51 stars 79 forks source link

Helper function to chain functions that return statuses #1121

Open DominicOram opened 1 year ago

DominicOram commented 1 year ago

On MX at DLS we have a usecase where we want to kickoff a series of tasks on the device that must be done in series e.g.

class MyDevice(Device):
   def do_1(self) -> Status:
      ...
   def do_2(self) -> Status:
      ...
   # Some more do_x

   def do_all(self) -> Status:
      status_1 = self.do_1()
      status_1.add_callback(self.do_2)
      ....
      return status_x

@olliesilvester has written a helper function for the do_all to take a generic list and perform everything in order. We think it may be generically useful to be in ophyd so they will add a PR for it here.

tacaswell commented 1 year ago

Could this sequencing be lifted up to be a plan that the RE manages?

DominicOram commented 1 year ago

That would be cleaner but to do so would require the RE to be multithreaded. It wasn't obvious to me if there was any support for this in the RE at the moment and if not seems like a potentially large refactor? I guess from a user perspective it would look something like:

def my_async_plan():
    yield from bps.set(.., group="A")
    yield from bps.wait("A") # Note this shouldn't block main thread just this plan
    ...

def my_actual_plan():
    yield from bps.run_async_plan(my_async_plan, group="async_plan") # New message that's basically create a thread
    yield from bps.set(.., group="B")
    yield from bps.wait("B") # Should not block async thread
    ...
    yield from bps.wait("async_plan") # Effectively a join here
tacaswell commented 1 year ago

This is a feature that I have wanted to write for a while now (but I would call it multi-task not multi-threaded)! There is a mostly working version of this in https://github.com/bluesky/ophyd/issues/1121 (which was motivated to better isolate the pre/post plans for suspneders).

At least at NSLS-II we have gotten away without doing this sort of thing because in the cases where we did need something like this we could either write the sequencing "inline" or made use of the per_step and per_shot parameters on most of the "standard" plans.


I have concerns about starting to do too much sequencing "natively" in ophyd. If there is sequencing done below Python (that is in the IOCs or what have you), ophyd needs to be able to provide a way to expose that up (which may require adding some additional methods to ophyd objects and verbs to RE, but between set, trigger, and kickoff/complete/collect I think we can cover almost everything) that is something that we have to handle, but when you start to put callback chaining as a fully supported thing on ophyd (like, we can not stop anyone from doing it....) then it become unclear where the sequencing should be done.

I have not seen the code, but I am guessing it looks like:

def sequencer(items):

    seq = iter(items)

    def inner(item):
        status = item()
        status.add_callback(lambda: inner(next(seq)))

    inner(next(seq))

which is (I think) the callback-centric version of the v0 RE in https://blueskyproject.io/bluesky/run_engine.html#minimal-runengine . I have concerns that if we start down this route we will start picking up more and more complexities until we end up with a second version of the RE (but implemented via callbacks).


The refactoring I think we need on the RE is:

  1. wrap _run and the message stacks, "continue permit" logic, and into a class (like we did for the bundlers). This should mostly be state-shuffling and pushing RE-singleton state into state on an object the RE holds
  2. making the dictionary of allowed messages a thing that is passed into _run rather than pulled from the parent (this will require a bit of management around how the ability to register messages on the fly works, but should still be possible.
  3. add a message that creates new sub _runs on demand.

I think all of this can be done in a backwards compatible

coretl commented 1 year ago
def my_async_plan():
    yield from bps.set(.., group="A")
    yield from bps.wait("A") # Note this shouldn't block main thread just this plan
    ...

def my_actual_plan():
    yield from bps.run_async_plan(my_async_plan, group="async_plan") # New message that's basically create a thread
    yield from bps.set(.., group="B")
    yield from bps.wait("B") # Should not block async thread
    ...
    yield from bps.wait("async_plan") # Effectively a join here

@warejosephb has written a task graph implementation that solves this for sub plans that end by returning a status object. I might have got this wrong, but I think you can construct a task graph that says "do A,B,C in parallel, then do D", and if A, B and C all do some setup and return a Status object, then it will do them all in quick succession then wait for the 3 Status objects, then do D.

On MX at DLS we have a usecase where we want to kickoff a series of tasks on the device that must be done in series

For things that are "prepare for and do a scan" I think we are covered with kickoff and complete doing the sequencing, and multiple flyers for the distinct Devices taking part in the scan. However for arbitrary sequencing of unrelated events it is not so clear. If we wanted to do a background robot load while simultaneously doing an alignment scan then it would sound like the tasks should actually be independent.

What's your use case?

tacaswell commented 10 months ago

I meant to link to https://github.com/bluesky/bluesky/pull/541 above