Closed EvanKirshenbaum closed 10 months ago
This issue was referenced by the following commits before migration:
Listing the changes we want to make:
after
to Path.teleport_into
after
from [Operation,CombinedOperation,CombinedStaticOperation]._schedule_for
after
from OpScheduler.[WaitAt,Reach,WaitFor]._schedule_for
after
from [StaticOperation,CombinedStaticOperation]._schedule
after: Optional[DelayType] = None
with after: WaitCondition = NO_WAIT
on_future
from schedule
Delayed
or Trigger
a valid WaitCondition
Operation(Generic[T, V], ABC)
to Operation(Generic[CS, V], ABC)
Started work in this branch Added after
to Path.teleport_into
and propagated it to Path.TeleportInStep
and then to Drop.TeleportInTo
see this commit. But Drop.TeleportInTo._schedule
already has an after
argument. What shall we do about the _schedule
after
argument and the new TeleportInTo.__init__
after
argument?
Interesting question. My initial intention was to say that after
isn't (and shouldn't be) an attribute of a StaticOperation
(or Operation
). It's a parameter to the schedule()
(or schedule_for()
) method. What you want to do is add it as an attribute to StartStep
(as MiddleStep
and EndStep
have it) and propagate it to the Path.Start
's first_step.op
when you schedule the path.
But Path.Start
is a StaticOperation
, and it schedules its first_step.op
within its _schedule()
(which is called from StaticOperation
's schedule()
), and on_future
is taken into account in the schedule()
method before the call to _schedule()
, so the mechanism will have to be changed a bit to take this into account.
In any case, this is why I proposed (in the initial comment):
An arguably cleaner redesign would be to punt on_future entirely and generalize after to take a sequence of WaitableTypes (or a single one).
- This would allow a Delayed or a Trigger (including a Barrier) to be used, in addition to a Time or a Ticks value.
- By making it allow a sequence and peeling off one layer at a time in schedule() or schedule_for(), you can require several conditions, in sequence, which will allow this single keyword argument to encapsulate both on_future and (the old) after, which could co-occur.
This would lobby for
schedule()
(and likely Operation.schedule_for()
) be able to ask the object for any inherent after
arguments, which would be combined into a sequence with any provided to the call anddevice.py
) that does a general "Call this Callable
after all of these wait conditions have completed".
StaticOperation.schedule()
and Operation.schedule_fore()
and remove the after
parameter from the abstract _schedule()
and _schedule_for()
methods.Just to clarify, Operation
/StaticOperation
will have the after
attribute, right?
My inclination would be to say no. Instead, I'd add a method that could be overridden that would combine the provided wait condition with any intrinsic ones, but which only has an effect for things like the path steps which might have one of their own. Something like
WaitCondition = ... # All of the things you can wait on
class StaticOperation:
def schedule(self, *,
after: Optional[WaitCondition] = None,
post_result: bool = True,
) -> Delayed[V]:
after = self.wait_condition(after)
return after_wait(after, lambda: self._schedule(post_result=post_result)
def wait_condition(after: Optional[WaitCondition]) -> Optional[WaitCondition]:
return after
class Path:
class Start:
after: Final[Optional[WaitCondition]]]
def wait_condition(after: Optional[WaitCondition]) -> Optional[WaitCondition]:
return extend_wait_condition(self.after, after)
(We'll need to decide whether the intrinsic ones should come first or the ones provided in the call. I can see arguments for both strategies.)
Thinking about it a bit, I might get rid of the Optional
and add something like
class NoWait(Enum)
SINGLETON = auto()
def __repr__(self) -> str:
return "NO_WAIT"
NO_WAIT: Final[NoWait] = NoWait.SINGLETON
with NoWait
being one of the options in WaitCondition
and NO_WAIT
being the default value used for after
arguments.
This allows you to more easily distinguish between "I don't know" (e.g., when looking up in a dictionary) and "I know there's no wait".
The ability to make this distinction on lookups and arguments where None
is a reasonable non-default argument is why I added MISSING
, following this paradigm. MyPy is smart enough that if you say
MissingOr = Union[Missing,T]
def foo(arg: MissingOr[Time]) -> Time:
if arg is not MISSING:
return arg
return the_default_value
MyPy is smart enough to know that MISSING
is the unique value of Missing
, and so if arg
isn't MISSING
, it isn't a Missing
and must, therefore, be a Time
.
Aren't we over complicating this? Wouldn't it be cleaner if there is a single way of specifying the WaitConidition
[s]. If they can come from different places (i.e., the provided wait condition and the intrinsic one), then we have to decide which goes first and it is probably hard to maintain and debug.
After this fix, could we simplify calls like:
schedule_for(obj: Union[T, Delayed[T]], *, after: Optional[Union[Ticks, Time]] = None,
to
schedule_for(after: WaitCondition[s])
MiddleStep
and EndStep
have a _schedule_after
method. I guess they are used to sequence the path. Could the after
param be used to accomplish this and get rid of these _schedule_after
methods?
Have a look at the latest version of the issue.091 branch to see if I'm getting it right. Here is what I did:
WaitCondition
and NO_WAIT
after: Optional[DelayType] = None
with after: WaitCondition = NO_WAIT
on_future
from schedule
after
from MiddleStep
and EndStep
as we will move them to Path.[Start,Middle,End,Full]
after
to Path.Start
after
to teleport_into
I'm not fully getting it yet. Not sure what we will do with _schedule_after
and how after_wait
is to be implemented.
After this fix, could we simplify calls like:
schedule_for(obj: Union[T, Delayed[T]], *, after: Optional[Union[Ticks, Time]] = None,
to
schedule_for(after: WaitCondition[s])
schedule_for()
needs to know the object that is the input for the Operation
. That's what the obj
parameter is doing. It's a Union
so that you can say "It's this object or the object that will eventually be posted to this future." It's not a wait condition other than in that if we're handed a future, we necessarily have to wait until it gets a value to know what to operate on.
Have a look at the latest version of the issue.091 branch to see if I'm getting it right.
It's a bit hard to tell. There are a lot of errors (including a bunch that show up after syntax errors (e.g., the lack of colon in the definition of NoWait
) are fixed).
Here is what I did:
* Added `WaitCondition` and `NO_WAIT` * Replaced some occurrences of `after: Optional[DelayType] = None` with `after: WaitCondition = NO_WAIT` * Drop `on_future` from `schedule` * Removed `after` from `MiddleStep` and `EndStep` as we will move them to `Path.[Start,Middle,End,Full]`
I don't understand how that can work. If you say something like
path = Path.dispense_from(w).to_col(5).to_row(7, after=3*ticks)
the point of the after
is that you want to start walking to row 7 three ticks after you get to column 5. Putting it on path
(a Path.Start
) isn't going to accomplish that. It pretty much has to be on the step. Or am I misunderstanding what you're saying?
* Add `after` to `Path.Start` * Add `after` to `teleport_into`
I'm not fully getting it yet. Not sure what we will do with
_schedule_after
and howafter_wait
is to be implemented.
The point of _schedule_after()
is precisely to be able to chain the scheduling so that each step operates on the (eventually produced) value of the step before after whatever intrinsic delay may have been specified, returning a future that will either be the overall result or the input to the next step.
By have a look I did not mean run MyPy or the code. I literally meant that you have a look at the general trend of the code and see if it is going in the direction you envision. Once the direction is good, I'll polish it.
See your comment [comment by @EvanKirshenbaum on Jun 15, 2022 at 2:26 PM PDT]:
class Path:
class Start:
after: Final[Optional[WaitCondition]]]
so I figured we don't want after
in both Start
and StartStep
, hence the removal.
We arguably don't need it in StartStep
. I don't see any way we can avoid it in MiddleStep
and EndStep
.
So, are we going to have?
class MiddleStep:
after : ...
class EndStep:
after : ...
class Start:
after : ...
Well, I'd put it on StartStep
rather than on Start
(and Full
) for consistency.
Note that wherever you put it, you're going to have to deal with both an after
being specified by the caller of schedule()
and schedule_for()
and one having been specified for the first step by whoever built the path. So, you're either going to have to compose them or else do something like add an invisible dummy operation before the first step when the path gets scheduled. (It's not as general, but it's probably easier. That way, you just schedule the dummy op with the after
passed in and each of the step ops with their intrinsic after
. When the dummy fires, it simply posts its output to its future, triggering the next step.)
Look at how drop.MotionOp
handles the after
Just to confirm we are removing the after
argument from the following methods:
[Operation,CombinedOperation,CombinedStaticOperation]._schedule_for
OpScheduler.[WaitAt,Reach,WaitFor]._schedule_for
[StaticOperation,CombinedStaticOperation]._schedule
But keep it in for the following methods:
Operation.schedule_for
[Operation,StaticOperation].then
CommunicationScheduler.schedule_communication
CommunicationScheduler.delayed
[OpScheduler,StaticOperation].schedule
Delayed.then_schedule
schedule
Do we want to add the after
argument to most of the Path
class methods? For example:
dispense_from
appear_at
walk
to_col
to_row
to_pad
start
join
That sounds right. I hadn't thought about how CommunicationScheduler
plays in all of this (i.e., whether we want to widen its after
parameters or leave them as Time
or Ticks
), but widening them may simplify things.
Note that the WaitAt
, et al.,'s _schedule_for()
will get it removed just like all of the other concrete Operation
s and StaticOperation
s in the code base. Also, you will want to (and possibly already have) changed StaticOperation.schedule()
to remove on_future
in favor of just using a Delayed
as an after
value.
[re: the Path
methods]: It looks as though they already have it, which seems right.
I made the updated to Path.*Step
as we discussed - there is a super class Path.Step
to hold the common after
attribute.
I started on after_wait
, I have this:
def after_wait(after: WaitCondition, callback: Callback):
if isinstance(after, Ticks):
board.after_tick(callback, delta=after)
elif isinstance(after, Time):
board.call_after(after, callback)
# TODO other `after` types
but this won't work as is since I don't have a board
Remind me why we need after_wait()
on Path.Step
. Doesn't the after
value just get passed to the op
during _schedule_after
or _schedule? If you've got a way to compose delays (e.g., by creating a tuple), can't you just let the underlying (first) operation take care of it?
Oh, wait. Maybe I do see the problem (or at least a problem). The actual delayed call happens (or, at least, should happen) in System.delayed()
:
def delayed(self, function: Callable[[], T], *,
after: Optional[DelayType]) -> Delayed[T]:
if after is None:
return Delayed.complete(function())
future = Postable[T]()
def run_then_post() -> None:
future.post(function())
if isinstance(after, Time):
if after > Time.ZERO:
self.call_after(after, run_then_post)
else:
return Delayed.complete(function())
else:
if after > Ticks.ZERO:
self.before_tick(run_then_post, delta = after)
else:
return Delayed.complete(function())
return future
which can be generalized to handle other delays or sequences of delays.
The way it's currently implemented, the one who calls this (possibly indirectly) is the argument of the Operation
, which is always (but isn't required to be) a CommunicationScheduler
, which has to implement delayed()
. (They're all (I believe) either explicitly CommunicationScheduler
s or else BoardComponent
s, SystemComponent
s, or Drop
s, which qualify.) All of the StaticOperation
s currently handle their delays in their _schedule()
implementations by delegating to some Operation
they schedule on a CommunicationScheduler
.
If you're going to put the delay logic in Operation.schedule_for()
and StaticOperation.schedule()
itself, the first thing I'd try is to redefine `Operation from
class Operation(Generic[T, V], ABC):
@abstractmethod
def _schedule_for(self, obj: T, *, # @UnusedVariable
after: Optional[DelayType] = None, # @UnusedVariable
post_result: bool = True, # @UnusedVariable
) -> Delayed[V]:
to
class Operation(Generic[CS, V], ABC):
@abstractmethod
def _schedule_for(self, obj: CS, *, # @UnusedVariable
after: Optional[DelayType] = None, # @UnusedVariable
post_result: bool = True, # @UnusedVariable
) -> Delayed[V]:
This would require that obj
is a CommunicationScheduler
, so you could call obj.delayed()
.
You'd also want to make StaticOperation
a CommunicationScheduler
, probably by taking one as an __init__()
parameter and stashing it away. Then, for a StaticOperation
like Drop.DispenseFrom
, you could just pass up the well
argument (which is a BoardComponent
and, therefore, a CommunicationScheduler
).
Remind me why we need after_wait() on Path.Step.
after_wait
is a top-level function in types.py
, see here
The way it's currently implemented, the one who calls this (possibly indirectly) is the argument of the Operation,
What does the argument of the Operation mean here?
All of the StaticOperations currently handle their delays in their _schedule() implementations by delegating to some Operation they schedule on a CommunicationScheduler.
Could you please elaborate this? Looking at Path.Start._schedule
we have:
def _schedule(self, *,
after: Optional[DelayType] = None,
post_result: bool = True,
) -> Delayed[Drop]:
middle = self.middle_steps
last = len(middle) - 1
future = schedule(self.first_step.op, after=after,
post_result = post_result if last == -1 else True)
for i,step in enumerate(middle):
future = step._schedule_after(future, post_result=post_result, is_last = i==last)
return future
Which is the delegating to some Operation they schedule on a CommunicationScheduler part?
What do you mean by?
You'd also want to make StaticOperation a CommunicationScheduler
Do you mean have StaticOperation
extend CommunicationScheduler
?
for a StaticOperation like Drop.DispenseFrom, you could just pass up the well argument
Do you mean pass the well
to _schedule
?
Remind me why we need after_wait() on Path.Step.
after_wait
is a top-level function intypes.py
, see here
Yeah, that's going to make it a lot harder to implement. I'd recommend not doing that and just making sure that you have a CommunicationScheduler
on hand to to use.
The way it's currently implemented, the one who calls this (possibly indirectly) is the argument of the Operation,
What does the argument of the Operation mean here?
The object that the Operation
operates on. In particular, the value of the obj
argument to _schedule_for()
.
All of the StaticOperations currently handle their delays in their _schedule() implementations by delegating to some Operation they schedule on a CommunicationScheduler.
Could you please elaborate this? Looking at
Path.Start._schedule
we have:def _schedule(self, *, after: Optional[DelayType] = None, post_result: bool = True, ) -> Delayed[Drop]: middle = self.middle_steps last = len(middle) - 1 future = schedule(self.first_step.op, after=after, post_result = post_result if last == -1 else True) for i,step in enumerate(middle): future = step._schedule_after(future, post_result=post_result, is_last = i==last) return future
Which is the delegating to some Operation they schedule on a CommunicationScheduler part?
Whatever self.first_step.op
delegates to. For example, for TeleportInStep
, the op
is a Drop.TeleportInTo
, which delegates to ExtractionPoint.TransferIn
. For DispenseStep
, you have a Drop.DispenseFrom
, which delegates to Well.TransitionTo
. Both ExtractionPoint
and Well
(both BoardComponent
s) are CommunicationScheduler
s.
If you want to have StaticOperation.schedule()
handle the after
parameter (which seems right), you'll want to have StaticOperation
have access to a CommunicationScheduler
, so TransferIn
could pass up its ExtractionPoint
, and DispenseFrom
could pass up its Well
. For Path.Start
it would presumably pass up the scheduler stored by first_step.op
.
What do you mean by?
You'd also want to make StaticOperation a CommunicationScheduler
Do you mean have
StaticOperation
extendCommunicationScheduler
?
That was my initial thought, but really, you just need to have it take one as an __init__()
parameter and store it away.
class StaticOperation(Generic[V], ABC):
scheduler: Final[CommunicationScheduler]
def __init__(self, *, scheduler: CommunicationScheduler) -> None:
self.scheduler = scheduler
for a StaticOperation like Drop.DispenseFrom, you could just pass up the well argument
Do you mean pass the
well
to_schedule
?
No, I mean that the DispenseFrom.__init__()
would pass up the Well
to StaticOperation.__init__()
, e.g.,
def __init__(self, well: Well, *,
reagent: Optional[Reagent] = None,
empty_wrong_reagent: bool = False,
after_reservation: Optional[Callback] = None,
before_release: Optional[Callback] = None) -> None:
super().__init__(scheduler=well) # This is new
self.well = well
self.reagent = reagent
self.empty_wrong_reagent = empty_wrong_reagent
self.after_reservation = after_reservation
self.before_release = before_release
###### Migrated from internal repository. Originally created by @EvanKirshenbaum on Jun 22, 2022 at 2:29 PM PDT.
I think I'm getting it now. Have a quick look at these snippets (w/o mypy) to see if I'm no the right track:
after
in schedule_for
- need help with handling it for obj: Delayed[CS]
after
in schedule
Path.Start
I think now I have a thin slice in place for this issue. I'll work with mypy and see what else we are missing.
I think I'm getting it now. Have a quick look at these snippets (w/o mypy) to see if I'm no the right track:
- Handling
after
inschedule_for
- need help with handling it forobj: Delayed[CS]
if isinstance(obj, Delayed): future = Postable[V]() def schedule_and_post(x: CS) -> None: f = self._schedule_for(x, post_result=post_result) f.when_value(lambda val: future.post(val)) obj.when_value(schedule_and_post) return future
if after == NO_WAIT:
return self._schedule_for(obj, post_result=post_result)
else:
return obj.delayed(lambda _: self._schedule_for(obj, post_result=post_result)
after=after)
The only problem I see here is that if `obj` is a `Delayed`, you don't handle `after`. All you should need to do to handle that is to use `schedule_for()` inside of `schedule_and_post()` rather than `_schedule_for()`. You'll get around to this same function, which will see that `obj` is no longer a `Delayed` and proceed from there.
A couple of minor things:
* `schedule_and_post()` can just do
~~~py
self.schedule_for(...).post_to(f)
I don't think I had added post_to()
when I wrote the original code.
NO_WAIT
in delayed()
rather than special casing around every call. It's less error-prone (since you don't have to worry about your lambda matching what you would've done otherwise), and it should be almost as efficient. (If this was C++, it would be exactly as efficient.)
- Handling
after
inschedule
if after == NO_WAIT: return self._schedule(post_result=post_result)) else: return self.scheduler.delayed( lambda _: self._schedule(post_result=post_result), after=after)
Same comment about
NO_WAIT
- Passing scheduler up in
Path.Start
def __init__(self, start: Path.StartStep, middle: tuple[Path.MiddleStep,...]) -> None: super().__init__(start.op.scheduler) self.first_step = start self.middle_steps = middle
That looks right, although
scheduler
is (correctly) a keyword argument.I think now I have a thin slice in place for this issue. I'll work with mypy and see what else we are missing.
I ran mypy on types.py
, here are some findings:
It looks like we need to further change Operation[CS, V]
to Operation[CS, CS]
so that operations can be chained together. On this code:
class Operation(Generic[CS, V], ABC):
...
def then(self, op: Union[Operation[V, V2], StaticOperation[V2],
Callable[[], Operation[V, V2]],
Callable[[], StaticOperation[V2]]], *,
after: WaitCondition = NO_WAIT,
) -> Operation[CS, V2]:
mypy pointed out that:
src/mpam/types.py:711: error: Type argument "V" of "Operation" must be a subtype of "CommunicationScheduler"
A similar issue we have with Tco
. For this code, mypy reports:
def then_schedule(self, op: Union[Operation[Tco,V], StaticOperation[V],
Callable[[], Operation[Tco,V]],
Callable[[], StaticOperation[V]]], *,
after: WaitCondition = NO_WAIT,
post_result: bool = True) -> Delayed[V]:
Type argument "Tco" of "Operation" must be a subtype of "CommunicationScheduler"
I ran mypy on
types.py
, here are some findings:1. It looks like we need to further change `Operation[CS, V]` to `Operation[CS, CS]` so that operations can be chained together. On this code:
class Operation(Generic[CS, V], ABC): ... def then(self, op: Union[Operation[V, V2], StaticOperation[V2], Callable[[], Operation[V, V2]], Callable[[], StaticOperation[V2]]], *, after: WaitCondition = NO_WAIT, ) -> Operation[CS, V2]:
mypy pointed out that:
src/mpam/types.py:711: error: Type argument "V" of "Operation" must be a subtype of "CommunicationScheduler"
We certainly don't want to do that, since it's reasonably common to have an Operation
that returns something that isn't chainable (typically None
).
Sigh. This is one of those situations where MyPy should be able to figure out that it's safe, because if we actually managed to pass in an Operation[V,V2]
, then V
must be a CommunicationScheduler
. (And it's one of those situations where C++ would've SFINAE'd the method out if it wasn't.)
I think the best thing to do here is to tell MyPy that you know what you're doing by adding a type: ignore
comment:
def then(self, op: Union[Operation[V, V2], StaticOperation[V2], # type: ignore [type-var]
Callable[[], Operation[V, V2]],
Callable[[], StaticOperation[V2]]], *,
after: WaitCondition = NO_WAIT,
) -> Operation[CS, V2]:
I hate doing that, but there's only so much you can do with the type inference provided.
1. A similar issue we have with `Tco`. For this code, mypy reports:
def then_schedule(self, op: Union[Operation[Tco,V], StaticOperation[V], Callable[[], Operation[Tco,V]], Callable[[], StaticOperation[V]]], *, after: WaitCondition = NO_WAIT, post_result: bool = True) -> Delayed[V]:
Type argument "Tco" of "Operation" must be a subtype of "CommunicationScheduler"
I suspect that the same argument applies here.
Got it. I'm getting some unexpected errors once I start propagating the change from ComputeOp(Operation[T,V])
to ComputeOp(Operation[CS, V])
, I'm geting complains about V
needing to be of type CS
:
src/mpam/types.py:741: error: Type argument "V" of "Operation" must be a subtype of "CommunicationScheduler"
src/mpam/types.py:755: error: Incompatible return value type (got "Operation[CS, V2]", expected "Operation[V, V2]")
src/mpam/types.py:755: error: Value of type variable "CS" of "ComputeOp" cannot be "V"
src/mpam/types.py:775: error: Incompatible return value type (got "Operation[V, V2]", expected "Operation[CS, V2]")
src/mpam/types.py:1206: error: Value of type variable "CS" of "ComputeOp" cannot be "V"
Could you please have a look at types.py
?
As we discussed in today's meeting, this looks like a real issue (i.e., not one we can just tell MyPy to ignore). It may be that requiring Operation
to take a CommunicationScheduler
may be too strict. All we need is the ability to get our hands on one inside of schedule_for()
.
There are a couple of ways we might be able to do this. The most general is probably to give Operation
an abstract method that can map from the object to a scheduler. Then you can either have each Operation
subclass define it or, probably better, add an intermediate CSOperation
(or some better name) class that does require a CommunicationScheduler
and defines that method to just return its argument.
ComputeOp
will still be tricky, because you have to remember that Operation
s can be scheduled more than once, so you can't simply stash away the the first op's scheduled argument. This will take some playing.
OK, I added CSOperation
and there are no mypy issues on types.py
. Have a look. If this looks good, I'll propagate replace Operation
with CSOperation
as the class to extend by operations.
Tell me more about ComputeOs
, what is its purpose and how exactly it is used.
The way you did that completely ignores the after
parameter in Operation.schedule_for()
and repeats the logic in CSOperation.schedule_for()
. What I was suggesting was something more like
class Operation(Generic[T, V], ABC):
...
def schedule_for(self, obj: Union[T, Delayed[T]], *,
after: WaitCondition = NO_WAIT,
post_result: bool = True,
) -> Delayed[V]:
if isinstance(obj, Delayed):
future = Postable[V]()
def schedule_and_post(x: CS) -> None:
self.schedule_for(
x, after=after, post_result=post_result).post_to(future)
obj.when_value(schedule_and_post)
return future
def cb() -> Delayed[V]:
return self._schedule_for(obj, post_result=post_result)
return self.after_delay(after, cb, obj = obj)
@abstractmethod
def after_delay(after: WaitCondition,,
fn: Callable[[], Delayed[V],
*, obj: T) -> Delayed[V]:
...
class CSOperation(Operation[CS, V]):
def after_delay(after: WaitCondition,,
fn: Callable[[], Delayed[V],
*, obj: CS) -> Delayed[V]:
return obj.delayed(fn, after=after)
Then in other, non-CSOperation
subclasses, you just have to define after_delay()
(possibly ignoring obj
in favor of some other means of getting a CommunicationScheduler
).
ComputeOp
is simply an adapter that takes an arbitrary function Callable[[T], Delayed[V]
and turns it into an Operation[T,V]
. It's there to support Operation.then_compute()
and .then_call()
, but looking at the code, it appears that it doesn't actually get used that way anymore. Where it does get used is in paths.py
in CallStep
, CallAndWaitStep
, and BarrierStep
.
Got it. Run mypy on all files and fixed most of the easy errors. There are a few dozen errors left. Few comments:
We removed after
from _schedule
and _schedule_for
. How do we want to deal with after
being used inside these methods? Should I just strip the code as if after
was never an argument or do we want some special treatment? For example, should this:
class Drop
class AppearAt(StaticOperation['Drop']):
...
def _schedule(self, *,
post_result: bool = True,
) -> Delayed[Drop]:
...
pad.schedule(Pad.TurnOn, after=after) \
.then_call(make_drop)
return future
just become this:
pad.schedule(Pad.TurnOn).then_call(make_drop)
return future
or this:
class Path
class End(CSOperation[Drop, None]):
...
def _schedule_for(self, obj: Drop, *,
post_result: bool = True,
) -> Delayed[None]:
if after is None:
future = Delayed.complete(obj)
else:
postable = Postable[Drop]()
assert isinstance(obj.pad, BoardComponent)
obj.pad.board.before_tick(lambda: postable.post(obj), delta=after)
future = postable
...
just become this:
def _schedule_for(self, obj: Drop, *,
post_result: bool = True,
) -> Delayed[None]:
future = Delayed.complete(obj)
...
In the Path classes we have two ways of specifying the after
argument. For example:
class DispenseStep(StartStep):
def __init__(self, well: Well, *,
after: WaitCondition,
...
class WalkStep(MiddleStep):
def __init__(self, direction: Dir, steps: int, allow_unsafe: bool,
after: WaitCondition) -> None:
To be consistent, which one do you prefer? Also, should after
be at the end of the required args, but before the ones with default values?
Related to (2), in which cases do we want to specify the default NO_WAIT
value? It seems that we might want the default value for the helper functions but not the subclass constructions, right?
def to_pad(cls, target: Union[Pad, XYCoord, tuple[int, int]],
*,
row_first: bool = True,
allow_unsafe: bool = False,
after: WaitCondition = NO_WAIT)
...
class ToColStep(MiddleStep):
def __init__(self, col: int, allow_unsafe: bool,
after: WaitCondition) -> None:
I think once these are taken care of, we can start playing with the harder case of ComputeOP.
Got it. Run mypy on all files and fixed most of the easy errors. There are a few dozen errors left. Few comments:
1. We removed `after` from `_schedule` and `_schedule_for`. How do we want to deal with `after` being used inside these methods? Should I just strip the code as if `after` was never an argument or do we want some special treatment?
I was thinking that we'd just ignore it. By the time _schedule_for()
is called, the after
has already been taken into account, so anything we would have delayed for should happen now.
2. In the Path classes we have two ways of specifying the `after` argument. For example:
class DispenseStep(StartStep): def __init__(self, well: Well, *, after: WaitCondition, ... class WalkStep(MiddleStep): def __init__(self, direction: Dir, steps: int, allow_unsafe: bool, after: WaitCondition) -> None:
To be consistent, which one do you prefer? Also, should
after
be at the end of the required args, but before the ones with default values?
I can't deny the evidence, but I have no idea why I declared WalkStep.__init__()
that way. after
is definitely the sort of parameter that should be a keyword arg, even if it's required. allow_unsafe
, too. And arguably steps
. If it's a keyword arg, it doesn't matter where it sits in the list.
3. Related to (2), in which cases do we want to specify the default `NO_WAIT` value? It seems that we might want the default value for the helper functions but not the subclass constructions, right?
You definitely want defaults for the methods, since things like to_pad()
are user visible. I was seeing the step subclasses as purely internal and only called by local code which would always be passing the values along anyway, so I didn't (and probably wouldn't) bother.
I had an attempt at fixing CombinedOperation
and ComputeOp
. I require a communication scheduler to be passed in the constructor. I updated all the then_*
methods as well as any uses in paths.py
. There are no mypy errors in types.py
but a three in paths.py
:
src/mpam/paths.py:583: error: Argument 1 to "__init__" of "MiddleStep" has incompatible type "ComputeOp[CS, Drop, Drop]"; expected "CSOperation[Drop, Drop]"
src/mpam/paths.py:592: error: Argument 1 to "__init__" of "MiddleStep" has incompatible type "ComputeOp[CS, Drop, Drop]"; expected "CSOperation[Drop, Drop]"
src/mpam/paths.py:604: error: Argument 1 to "__init__" of "MiddleStep" has incompatible type "Operation[Drop, Drop]"; expected "CSOperation[Drop, Drop]"
Before I get to that, I have some reservations about some of your design choices. It's quite possible, of course, that I'm missing something, in which case, feel free to argue for them.
First of all, you have CombinedOperation
parameterized by CS
, but it doesn't appear that CS
is being used anywhere. The scheduler
(which isn't declared as an attribute, by the way) only needs to be a CommunicationScheduler
. There doesn't seem to be any need to keep track of which subclass it is.
But, in any case, there doesn't seem to be any need for it. first
is an Operation
, and by construction it therefore supports after_delay()
, so unless I'm missing something, you should be able to just delegate to it:
def after_delay(self,
after: WaitCondition,
fn: Callable[[], V2],
*, obj: T) -> Delayed[V2]:
return self.first.delayed(after, fn, obj=obj)
I guess that this gets rid of my other problem: the addition of scheduler
parameters to Operation.then()
, .then_compute()
, .then_call()
, and .then_process()
. These are all functions that I'd expect to be used by protocol writers, and unless it's absolutely necessary, CommunicationScheduler
s are an implementation detail that they shouldn't have to understand. But if you get rid of the scheduler
from CombinedOperation.__init__()
, then .then()
doesn't need one in order to create the CombinedOperation
, and none of the others need to pass it along.
ComputeOp
also has a few problems. As with CombinedOperation
, I don't se any reason that it needs to be parameterized by CS
. scheduler
can just be a CommunicationScheduler
. Unlike CombinedOperation
, it looks as though it's likely that it will need to be specified, but I would strongly recommend making it a keyword parameter to __init__()
.
As for your actual question, the complaint is that you're passing in a ComputeOp
, but MiddleStep.__init__()
requires a CSOperation
, and ComputeOp
is an Operation
, but not a CSOperation
. This is true, but my questions "Why does MiddleStep
need CSOperation
rather than simply an Operation
?" The use is
def _schedule_after(self, future: Delayed[Drop], *,
is_last: bool, post_result: bool) -> Delayed[Drop]:
return future.then_schedule(self.op, after=self.after,
post_result=post_result if is_last else True)
and Delayed.then_schedule()
is perfectly happy with an Operation
.
self.first.delayed
does not work. delayed
is a method in CommunicationScheduler
, not Operation
. Moreover, after_dalay
is abstract in Operation
and implemented in CSOperation
. If we make CombinedOperation
require CSOperation
when all the then*
methods have to be moved in CSOperation
- this sounds like something we already tried in the past.
Regarding the then*
methods, only then()
uses CombinedOperation
, the others use ComputeOp
. So, if we successfully remove CS
from CombinedOperation
only then()
will benefit. The others will still require a CS
, as ComputeOp
requires it.
The MiddleStep
change seems fine.
self.first.delayed
does not work.delayed
is a method inCommunicationScheduler
, notOperation
. Moreover,after_dalay
is abstract inOperation
and implemented inCSOperation
.
Sorry, self.first.after_delay()
, not self.first.delayed()
.
If we make
CombinedOperation
requireCSOperation
when all thethen*
methods have to be moved inCSOperation
- this sounds like something we already tried in the past.Regarding the
then*
methods, onlythen()
usesCombinedOperation
, the others useComputeOp
. So, if we successfully removeCS
fromCombinedOperation
onlythen()
will benefit. The others will still require aCS
, asComputeOp
requires it.
I see your point, sort of. Technically, only then_compute()
uses CombinedOperation
. then_call()
and then_process()
just delegate to it. And then_compute()
delegates to then()
, which, as we've established, doesn't need the scheduler. So it would seem to be perfectly fine for none of these to take one and to just let ComputeOp
fail if its after_delay()
is called.
This won't work for the uses of ComputeOp
in paths
, since MiddleStep._schedule_after()
uses Delayed.then_schedule()
, which delegates to Operation.schedule_for()
, but in this case, we know that the incoming object is a Drop
, so it would suffice for CallStep
and the like to use a subclass of ComputeOp
along the lines of
class DropComputeOp(ComputeOp[Drop,Drop]):
def after_delay(self,
after: WaitCondition,
fn: Callable[[], Drop],
*, obj: Drop) -> Delayed[Drop]:
return obj.delayed(fn, after=after)
I removed CS
type argument from CombinedOperation
and updated after_delay
to use first.after_delay
, I'm running into errors with the return type:
src/mpam/types.py:903: error: Incompatible return value type (got "Delayed[V]", expected "Delayed[V2]")
src/mpam/types.py:903: error: Argument 2 to "after_delay" of "Operation" has incompatible type "Callable[[], V2]"; expected "Callable[[], V]"
The rest of the changes worked well.
The problem here is that Operation.after_delay()
is declared as
@abstractmethod
def after_delay(self,
after: WaitCondition,
fn: Callable[[], V],
*, obj: T) -> Delayed[V]:
...
This ties the return type of the function to the type produced by the operation. In CombinedOperation
, the function returns a V2
, but first.after_delay()
expects a V
.
The solution to this is relatively straightforward. There's no real reason why after_delay()
should care about the returned type. It just takes a function and delays it, returning the value in a future. So if we re-define it to take a new type variable not tied to the class:
@abstractmethod
def after_delay(self,
after: WaitCondition,
fn: Callable[[], V2],
*, obj: T) -> Delayed[V2]:
...
it should all work. Note that in this case, for CombinedOperation
, which does use V2
, you'll need to change this to a different type variable (e.g., a V3
) so it doesn't get captured.
Don't look at me like that. I didn't design the way Python does generic classes and methods.
There are also a couple of problems in your definition of Operation.schedule_for()
:
def schedule_for(self, obj: Union[T, Delayed[T]], *,
after: WaitCondition = NO_WAIT,
post_result: bool = True,
) -> Delayed[V]:
if isinstance(obj, Delayed):
future = Postable[V]()
def schedule_and_post(x: T) -> None:
self.schedule_for(
x, after=after, post_result=post_result).post_to(future)
obj.when_value(schedule_and_post)
return future
def cb():
return self._schedule_for(obj, post_result=post_result)
return self.after_delay(after, cb, obj=obj)
cb()
has no type annotations, so MyPy ignores it. If you add the return type
def cb() -> Delayed[V]:
return self._schedule_for(obj, post_result=post_result)
you'll get two errors.
The first is a MyPy annoyance. When you hit the definition of cb()
, MyPy knows that obj
is a T
, but within the function, it reverts back to being its declared type of Union[T, Delayed[T]]
. (I suspect that this may have to do with the fact that you could go on to say nonlocal obj
.) To get around this, I've gotten in the habit of doing the following:
real_obj = obj
def cb() -> Delayed[V]:
return self._schedule_for(real_obj, post_result=post_result)
real_obj
is definitely a T
, so there's no problem.
The second problem is a bit trickier. In
return self.after_delay(after, cb, obj=obj)
after_delay()
(after the change for the last comment) a Callable[[], V2]
and return a Delayed[V2]
. You're passing in cb()
, which returns a Delayed[V]
, so the result of the after_delay()
call will be a Delayed[Delayed[V]]
rather than the Delayed[V]
that schedule_for()
is declared to return.
The simple solution would be to use a second future, something like:
real_obj=obj
future = Postable[V]()
def cb() -> None:
self._schedule_for(real_obj, post_result=post_result).post_to(future)
self.after_delay(after, cb, obj=obj)
return future
There are probably more efficient ways to do it, but they likely make the interfaces uglier (especially with Python's lack of overloading or ability to check Callable
return types.)
I did the V2
and V3
replacement for after_delay
but I get some errors with DropComputeOp
. I tried ignore[override]
but it did not help:
class DropComputeOp(ComputeOp[Drop,Drop]):
def after_delay(self,
after: WaitCondition,
fn: Callable[[], Drop],
*, obj: Drop) -> Delayed[Drop]: # type: ignore[override]
return obj.delayed(fn, after=after)
src/mpam/drop.py:1081: error: Argument 2 of "after_delay" is incompatible with supertype "ComputeOp"; supertype defines the argument type as "Callable[[], V2]"
src/mpam/drop.py:1081: note: This violates the Liskov substitution principle
src/mpam/drop.py:1081: note: See https://mypy.readthedocs.io/en/stable/common_issues.html#incompatible-overrides
src/mpam/drop.py:1081: error: Return type "Delayed[Drop]" of "after_delay" incompatible with return type "Delayed[V2]" in supertype "ComputeOp"
src/mpam/drop.py:1081: error: Argument 2 of "after_delay" is incompatible with supertype "Operation"; supertype defines the argument type as "Callable[[], V2]"
src/mpam/drop.py:1081: error: Return type "Delayed[Drop]" of "after_delay" incompatible with return type "Delayed[V2]" in supertype "Operation"
Fixed schedule_for
type annotations and mypy issues for cb()
Besides the fact that this shouldn't be an issue (because teleports into the same EP should serialize, the problem here is that while paths allow things like
wait_for()
andreach()
to be added as middle steps, there's no way to get a start step to wait.The simplest change would be to add an
on_future
toteleport_in
and its siblings and have that stored inStartStep
and used byPath.Start._schedule
(which could call back to itsschedule()
if it had a non-Noneon_future
.An arguably cleaner redesign would be to punt
on_future
entirely and generalizeafter
to take a sequence ofWaitableTypes
(or a single one).Delayed
or aTrigger
(including aBarrier
) to be used, in addition to aTime
or aTicks
value.schedule()
orschedule_for()
, you can require several conditions, in sequence, which will allow this single keyword argument to encapsulate bothon_future
and (the old)after
, which could co-occur.What we really want in this case is to not have to worry about playing with a
Delayed[None]
and posting to it. We want a simple trigger. Unfortunately, theTrigger
class is defined such that if youwait()
for aTrigger
that has already fired, you don't proceed immediately, sinceTrigger
s can bereset()
, which means that you'd have a race condition if you used it. The clean solution would be something likewhere
SingleFireTrigger
is a subclass ofTrigger
that keeps track of whether it's beenfire()
ed and invokes any callbacks immediately onwait()
if it has.Combining the two ideas, it can be simplified to
Now all of the paths can be scheduled together, but they will serialize themselves.
Originally posted by @EvanKirshenbaum in https://github.com/HPInc/HP-Digital-Microfluidics/issues/87 [comment by @EvanKirshenbaum on Apr 26, 2022 at 10:25 AM PDT]
Migrated from internal repository. Originally created by @EvanKirshenbaum on Apr 26, 2022 at 10:26 AM PDT. Closed on Oct 26, 2022 at 3:29 PM PDT.