Open EvanKirshenbaum opened 8 months ago
I added more log messages and I'm trying to understand (1) How does a drop movement get scheduled? I understand that it moves first to the destination row and then to the destination column (or vice-versa), but how does it actually get scheduled to do that? (2) From the log messages, it looks like most of the Pads get scheduled twice. Why is that?
> python tools/joey.py test --no-display --initial-delay 0s --log-level=debug
1116|Timer Thread|engine.py:371:run|DEBUG|Timer Thread started
1118|Clock Thread|engine.py:517:run|DEBUG|Clock Thread started
1118|MainThread|types.py:604:schedule_for|DEBUG|obj:ExtractionPoint[XYCoord(13,15)]|after:None
1118|MainThread|types.py:604:schedule_for|DEBUG|obj:<devices.dummy_pipettor.DummyPipettor object at 0x7f010863fe50>|after:None
1119|MainThread|pipettor.py:253:not_idle|INFO|Dummy Pipettor not idle
1119|Dummy Pipettor Thread|types.py:3544:run|DEBUG|queue len:1|before_task:None|after_task:None
1119|MainThread|types.py:604:schedule_for|DEBUG|obj:<mpam.types.Delayed object at 0x7f01086906a0>|after:None
1119|Dummy Pipettor Thread|types.py:3550:run|DEBUG|func:<function TransferSchedule._schedule.<locals>.run_it at 0x7f0108680790>
1119|MainThread|types.py:604:schedule_for|DEBUG|obj:<mpam.types.Delayed object at 0x7f0108690220>|after:None
4384|Dummy Pipettor Thread|dummy_pipettor.py:106:perform|INFO|Aspirating 1.0 μl of R1 from reagent block.
5886|Dummy Pipettor Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,15)|after:None
5958|Device Communication Thread|engine.py:239:run|DEBUG|Device Communication Thread started
5969|Dummy Pipettor Thread|dummy_pipettor.py:116:perform|INFO|Dispensing 1.0 μl of R1 to ExtractionPoint[XYCoord(13,15)].
5970|Dummy Pipettor Thread|drop.py:658:_schedule_for|DEBUG|direction:Dir.SOUTH|streps:15
6060|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,14)|after:None
6060|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,15)|after:None
6160|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,13)|after:None
6160|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,14)|after:None
6260|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,12)|after:None
6261|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,13)|after:None
6362|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,11)|after:None
6362|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,12)|after:None
6464|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,10)|after:None
6464|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,11)|after:None
6565|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,9)|after:None
6565|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,10)|after:None
6666|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,8)|after:None
6666|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,9)|after:None
6767|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,7)|after:None
6767|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,8)|after:None
6868|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,6)|after:None
6868|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,7)|after:None
6968|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,5)|after:None
6969|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,6)|after:None
7070|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,4)|after:None
7071|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,5)|after:None
7171|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,3)|after:None
7171|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,4)|after:None
7272|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,2)|after:None
7273|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,3)|after:None
7372|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,1)|after:None
7373|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,2)|after:None
7474|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,0)|after:None
7475|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,1)|after:None
7477|Clock Thread|drop.py:658:_schedule_for|DEBUG|direction:Dir.WEST|streps:13
7574|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(12,0)|after:None
7575|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(13,0)|after:None
7676|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(11,0)|after:None
7676|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(12,0)|after:None
7776|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(10,0)|after:None
7777|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(11,0)|after:None
7878|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(9,0)|after:None
7878|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(10,0)|after:None
7978|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(8,0)|after:None
7979|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(9,0)|after:None
8080|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(7,0)|after:None
8080|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(8,0)|after:None
8181|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(6,0)|after:None
8181|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(7,0)|after:None
8223|Dummy Pipettor Thread|pipettor.py:249:idle|INFO|Dummy Pipettor idle
8281|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(5,0)|after:None
8282|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(6,0)|after:None
8381|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(4,0)|after:None
8382|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(5,0)|after:None
8482|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(3,0)|after:None
8483|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(4,0)|after:None
8583|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(2,0)|after:None
8584|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(3,0)|after:None
8684|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(1,0)|after:None
8685|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(2,0)|after:None
8784|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(0,0)|after:None
8785|Clock Thread|types.py:604:schedule_for|DEBUG|obj:Pad(1,0)|after:None
The second question is the easiest to answer: Each pad gets scheduled twice because moving a drop involves turning on the pad you want to move to and turning off the pad where you currently are. In particular, in MotionOp._schedule_for() (in drop.py), lines 689-90:
next_pad.schedule(Pad.TurnOn, post_result=False)
last_pad.schedule(Pad.TurnOff, post_result=False)
And that's more or less the endpoint of the answer to your first question:
Please help me understand this from the docs:
class mpam.types.OpScheduler
Bases: Generic[mpam.types.CS] A mixin base class giving a class the ability to schedule Operation[CS, V]s. Typically, this will be class CS itself, as in: class Drop(OpScheduler['Drop']): ... There are several Operations (e.g., WaitAt, Reach, WaitFor), that are defined for all OpSchedulers. Parameters CS – A subclass of OpScheduler
So, CS is of type CommunicationScheduler, right? So, Operation[CS, V] transforms CS into V? So, OpScheduler[Drop] is Operation[CS, Drop]? Who is this in the following sentence typically, this will be class CS itself? I guess OpScheduler
schedules an operation, but what happens behind the scene? How is it used? What is the role of a CommunicationScheduler?
Also, what is the intention behind this code in paths.py:
def schedule_paths(cls, paths: Iterable[Schedulable], *,
system: System,
on_future: Optional[Delayed[Any]] = None,
after: Optional[DelayType] = None,
) -> Sequence[Union[Delayed[Drop], Delayed[None]]]:
def scheduled(path: Schedulable) -> Union[Delayed[Drop], Delayed[None]]:
if isinstance(path, Path.Start) or isinstance(path, Path.Full):
return path.schedule(on_future=on_future, after=after)
else:
return path[1].schedule_for(path[0], after=after)
# 1. What is the intention of this if/else branch?
# 2. What does the scheduled function return?
with system.batched():
return [scheduled(p) for p in paths]
Please help me understand this from the docs:
class mpam.types.OpScheduler
Bases: Generic[mpam.types.CS] A mixin base class giving a class the ability to schedule Operation[CS, V]s. Typically, this will be class CS itself, as in: class Drop(OpScheduler['Drop']): ... There are several Operations (e.g., WaitAt, Reach, WaitFor), that are defined for all OpSchedulers. Parameters CS – A subclass of OpScheduler
So, CS is of type CommunicationScheduler, right? So, Operation[CS, V] transforms CS into V? So, OpScheduler[Drop] is Operation[CS, Drop]? Who is this in the following sentence typically, this will be class CS itself? I guess
OpScheduler
schedules an operation, but what happens behind the scene? How is it used? What is the role of a CommunicationScheduler?
Yes, yes, no, and Drop
.
CS
stands for any CommunicationScheduler
(which is a Protocol
, so any class that provides correctly-typed schedule_communication()
and delayed()
methods counts). In this case, the CS
in question is Drop
, which also (by subtyping) is an OpScheduler[Drop]
.Operation[CS,V]
does, indeed, transform a CS
into a V
. In this case, an Operation[Drop,Drop]
transforms a Drop
into a Drop
. (They may or may not be the same Drop
.)OpScheduler[Drop]
is not, however, an Operation[CS,Drop]
. It can be used to schedule one, assuming that CS
is Drop
. That is, it can schedule an Operation[Drop,Drop]
(or, indeed, any Operation[CS,V]
).OpScheduler
. Basically, what it's trying to say is that if an object is an OpScheduler[CS]
, it's usually also going to be a CS
. If it's an OpScheduler[Drop]
, it's probably a Drop
. If it's an OpScheduler[BinaryComponent]
, it's probably a BinaryComponent
. There are reasons you might have subclasses that don't obey this pattern, but by and large you can expect to see the pattern
class Foo(OpScheduler['Foo']):
...
OpScheduler.schedule
just turns around and calls schedule_for
on the operation.
pad.schedule(Pad.SetState(s))
rather than
op = Pad.SetState(s)
op.schedule_for(pad)
schedule
also allows you to pass in a function that returns an Operation
rather than the Operation
itself, which means you can more easily delay the actual identification/creation of the Operation
until it is actually going o be scheduled. Grepping through the code, it doesn't look as though I use that functionality anymore.OpScheduler
provides a place to put Operation
s that any of them can use, e.g., WaitAt
, WaitFor
, and Reach
.CommunicationScheduler
is to provide a way to say "Call this function, optionally after a Time
or Ticks
delay", taking into account batching in the enclosing system.
Also, what is the intention behind this code in paths.py:
def schedule_paths(cls, paths: Iterable[Schedulable], *, system: System, on_future: Optional[Delayed[Any]] = None, after: Optional[DelayType] = None, ) -> Sequence[Union[Delayed[Drop], Delayed[None]]]: def scheduled(path: Schedulable) -> Union[Delayed[Drop], Delayed[None]]: if isinstance(path, Path.Start) or isinstance(path, Path.Full): return path.schedule(on_future=on_future, after=after) else: return path[1].schedule_for(path[0], after=after) # 1. What is the intention of this if/else branch? # 2. What does the scheduled function return? with system.batched(): return [scheduled(p) for p in paths]
First off, note that Schedulable
is defined as
Schedulable = Union['Path.Start', 'Path.Full',
tuple[Union[Drop, Delayed[Drop]],
Union['Path.Middle', 'Path.End']]]
That is, it's either a Path.Start
or Path.Full
(in which case, we can call schedule
on them without specifying a Drop
) or it's a Path.Middle
or Path.End
, combined with the Drop
(or Delayed[Drop]
) we will use as the argument to calling schedule_for
on it.
Start
or a Full
, we just schedule
it. Otherwise, it's a tuple containing a Middle
or and End
, and we call schedule_for
on that, passing in the correct argument.scheduled
returns a Delayed[Drop]
if we're dealing with a Start
or Middle
and a Delayed[None]
if we're dealing with a Full
or an End
(since the last two typically result in a step that removes the Drop
from the board, e.g., by absorbing it into a well or removing it via an extraction point).
Since the overall result of schedule_paths
is a sequence of these two types, we can just collect the return values into a list using a comprehension.
I'm trying to do the following:
(10, 15)
(11, 15)
(12, 15)
I did this:
Path.run_paths([
Path.teleport_into(ep, reagent=Reagent('R1')).to_pad((10, 15)),
Path.teleport_into(ep, reagent=Reagent('R2')).to_pad((11, 15)),
Path.teleport_into(ep, reagent=Reagent('R3')).to_pad((12, 15))
], system=system)
But it looks like all the drops get dispensed at once, get combined and moved together. How can I sequence them?
Oh, my. That looks like a bug (and I think that I know what's likely happening). To my mind, your explanation is correct. (Or, rather, I'd expect the first drop to be dispensed and start walking and then have the second start dispensed, perhaps before the first had finished walking.)
(Also, I should note that the paths will not finish. the second drop will refuse to get to the pad next to the first one, because doing so would cause them to mix. If you want to mix, you have to be explicit.)
For now, if you want the paths to be sequential, you'll have to put an explicit dependency in, something like
path1 = Path.teleport_into(ep, reagent=Reagent('R1')).to_pad(10,15)
path2 = Path.teleport_into(ep, reagent=Reagent('R2')).to_pad(11,15)
path3 = Path.teleport_into(ep, reagent=Reagent('R3')).to_pad(12,15)
f1: Delayed[Drop] = path1.schedule()
f2: Delayed[Drop] = path2.schedule(on_future=f1)
f3: Delayed[Drop] = path3.schedule(on_future=f2)
That will delay the teleports until after the prior path has finished. If you want better overlap, you'll want to do something like
gate2 = Delayed[None]()
gate3 = Delayed[None]()
path1 = Path.teleport_into(ep, reagent=Reagent('R1')).walk(Dir.W).then_process(lambda _: gate2.post(None)).to_pad(10,15)
path2 = Path.teleport_into(ep, reagent=Reagent('R2')).walk(Dir.W).then_process(lambda _: gate3.post(None)).to_pad(11,15)
path3 = Path.teleport_into(ep, reagent=Reagent('R3')).to_pad(12,15)
path1.schedule()
path2.schedule(on_future=gate2)
path3.schedule(on_future=gate3)
What's happening here is
Yes, this is ugly. (More thoughts in next comment and the actual likely problem in the following one, so that I can use them to create new issues.)
Besides the fact that this shouldn't be an issue (because teleports into the same EP should serialize, the problem here is that while paths allow things like wait_for()
and reach()
to be added as middle steps, there's no way to get a start step to wait.
The simplest change would be to add an on_future
to teleport_in
and its siblings and have that stored in StartStep
and used by Path.Start._schedule
(which could call back to its schedule()
if it had a non-None on_future
.
An arguably cleaner redesign would be to punt on_future
entirely and generalize after
to take a sequence of WaitableTypes
(or a single one).
Delayed
or a Trigger
(including a Barrier
) to be used, in addition to a Time
or a Ticks
value.schedule()
or schedule_for()
, you can require several conditions, in sequence, which will allow this single keyword argument to encapsulate both on_future
and (the old) after
, which could co-occur.What we really want in this case is to not have to worry about playing with a Delayed[None]
and posting to it. We want a simple trigger. Unfortunately, the Trigger
class is defined such that if you wait()
for a Trigger
that has already fired, you don't proceed immediately, since Trigger
s can be reset()
, which means that you'd have a race condition if you used it. The clean solution would be something like
gate2 = SingleFireTrigger()
gate3 = SingleFireTrigger()
path1 = Path.teleport_into(ep, reagent=Reagent('R1')).walk(Dir.W).then_fire(gate2).to_pad(10,15)
path2 = Path.teleport_into(ep, reagent=Reagent('R2')).walk(Dir.W).then_fire(gate3).to_pad(11,15)
path3 = Path.teleport_into(ep, reagent=Reagent('R3')).to_pad(12,15)
path1.schedule()
path2.schedule(on_future=gate2)
path3.schedule(on_future=gate3)
where SingleFireTrigger
is a subclass of Trigger
that keeps track of whether it's been fire()
ed and invokes any callbacks immediately on wait()
if it has.
Combining the two ideas, it can be simplified to
gate2 = SingleFireTrigger()
gate3 = SingleFireTrigger()
path1 = Path.teleport_into(ep, reagent=Reagent('R1')).walk(Dir.W).then_fire(gate2).to_pad(10,15)
path2 = Path.teleport_into(ep, reagent=Reagent('R2'), after=gate2).walk(Dir.W).then_fire(gate3).to_pad(11,15)
path3 = Path.teleport_into(ep, reagent=Reagent('R3'), after=gate3).to_pad(12,15)
Path.run_paths([path1, path2, path3])
Now all of the paths can be scheduled together, but the will serialize themselves.
I'm trying to do the following:
1. Dispense _R1_ w/ pipette and move it to `(10, 15)` 2. Dispense _R2_ w/ pipette and move it to `(11, 15)` 3. Dispense _R3_ w/ pipette and move it to `(12, 15)`
I did this:
Path.run_paths([ Path.teleport_into(ep, reagent=Reagent('R1')).to_pad((10, 15)), Path.teleport_into(ep, reagent=Reagent('R2')).to_pad((11, 15)), Path.teleport_into(ep, reagent=Reagent('R3')).to_pad((12, 15)) ], system=system)
But it looks like all the drops get dispensed at once, get combined and moved together. How can I sequence them?
Now, as to what's really going on here...
No, wait! It can't be what I thought it was, because you're asking for different reagents.
My initial thought was that what's happening is that the pipettor scheduling is being over-aggressive in its optimization. I try to be smart about noticing that the same reagent is being requested multiple times (e.g., for different extraction points or wells) and combining them into the same trip, as long as the actual pipettor hasn't yet been told to fetch any.
If that was the case, then the problem was that something needed to happen in ExtractionPoint.transfer_in()
to make sure that the pipettor didn't do that for multiple requests to the same EP. We don't reserve the pad when the request is made, because the pipettor could take a long time, and we don't want to block drops from using those pads until the drop is ready, so we'd need another mechanism. (And since it's reasonable to ask for serialized drops of the same reagent to walk different paths, this should be put in place in any case.)
But if you're using different reagents, the transfers should definitely serialize (and indeed, this is what happens in the CombSynth
part of the pcr tool: multiple reagents are requested to the same EP, and they not only serialize, but the second one would wait (in ExtractionPoint.prepare_for_add()
until the pad was clear.
So, I really don't know what's going on here.
I'm going to make an issue for this. Do you have a commit that demonstrates it?
How does one useon_tick
, after_tick
, and before_tick
? What would be an example use case for each?
Also, how does System._channel
work? What is a Batch
as opposed to Engine
?
The ClockThread
maintains three queues of requests that are processed on each clock tick.
Functions on the pre_tick_queue
(registered by before_tick()
are used to decide what to schedule on this tick.
None
to signal that they are complete or a number of Ticks
(typically 1*tick
) to reschedule themselves on the pre_tick_queue
for a future tick.
Iterator[Optional[Ticks]]
.on_tick()
or after_tick()
calls during the processing of this queue will take place in the current tick.pre_tick()
calls during the processing of the queue will take place relative to the next tick.Functions on the on_tick_queue
(registered by on_tick()
are used to communicate with the actual devices. This queue is kept separate so that all of these functions, including those registered during the pre-tick queue processing, can be passed to the DevCommThread
atomically, so they all get processed together to prevent devices from being partially updated.
Board
) that implements update_state()
. The actual function just sets things up so that when update_state()
is called, all of the functions that requested changes get the changes implemented at the same time.DevCommThread
), a set of their non-None
return values (the updatable components) have update_state()
called.Functions on the post_tick_queue
(registered by after_tick()
are used to clean up from actions that took place earlier in the tick.
Delayed
objects or updating drop positions.
pre_tick_queue
. Functions return Optional[Ticks]
, so they can be rescheduled, although I think this happens much less often (if at all). When processing this queue, all before_-
, on_-
, and after_tick()
calls refer to the next tick.
Also, how does
System._channel
work? What is aBatch
as opposed toEngine
?
System
s are associated with an Engine
, which implements the three worker threads (ticking clock, wall clock, and device communication). When you schedule a function via a System
, using methods like on_tick()
, before_tick()
, or call_after()
, it will (eventually) be forwarded to the Engine
and stuck on one of its threads' queues.
Sometimes you want to schedule several functions, but you want to make sure they happen before the same tick (e.g., turning one pad off and the next one on) or after deltas relative to the same current wall-clock time. To enable this, if you say
with system.batched():
self.before_tick(...)
self.before_tick(...)
a Batch
will be created (chained onto any existing _batch
), set as the System
's _batch
, and entered. With this in place, when _channel()
is called inside of, e.g., before_tick()
, it will return the Batch
rather than the Engine
, and the request will be added to the Batch
's queue. When the with
block exits, the Batch
's queues will be transferred to its parent Batch
(so these blocks can be nested) or, if there is no parent, to the System
's Engine
.
On directory structure:
Could you please explain how the TimerThread
works?
Could you please explain how the
TimerThread
works?
Yeah, that is a bit tricky. Essentially, ...
It's used to schedule things to happen in the future, via call_at()
(and call_after()
). Each of these things (a TimerRequest
) contains a Timestamp
(a wall-clock time), a TimerFunc
that either returns None
or a Time
or Timestamp
indicating when it should be repeated, and an indication of whether the request is a daemon. If it is, the TimerThread
is allowed to exit before the function is called.
To accomplish this, the TimerThread
makes use of a MyTimer
(a subclass of threading.Timer
) that keeps track of the earliest function to be called and a priority queue (implemented as a heap) of Entry
s, which are essentially TimerRequest
s ordered by time, for things that should happen later.
The TimerThread
's run()
method sleeps on a Condition
that's signalled (via wake_up()
) when the MyTimer
fires or when new requests are added via call_at()
.
When the timer fires, it calls its function. If the return value indicates that it should be run again, it is pushed onto the priority queue. In any case, the timer is removed before waking up the thread.
When requests come in via call_at()
, if there is a timer and the requested time is earlier than the timer's target time, the timer is canceled and its task is pushed onto the queue. In any case the new request is pushed and the thread is woken up.
When the thread wakes up and (1) there is no timer
and (2) queue
is not empty, it grabs the current time and pops off the first Entry
in the queue. If its desired_time
has passed, it calls its func
and, if necessary, enqueues it to happen in the future. Otherwise, it creates a MyTimer
with that first entry and starts it. If it processed the entry, it goes around again to check the next entry (against a new current time).
Not an actual issue. Just storing notes on various components work.
When I ran my test task, I get this log:
Two questions about line 4489:
Why is the Pipettor in its own thread now? The line before shows the pipettor in the MainTread. From this line on it seems to be in its own thread for the remaining of the program.
Pipettors are slow devices. If you look at the code for DummyPipettor, you'll see a number of attributes that model the time that a device takes to do things like moving, picking up a tip, dipping down to the board, etc. When transferring to the board, the sequence specified in perform() is
So, it should take about 3.26 sec before printing "Aspirating". (Note that this is way faster than an actual robot.) If you don't want to wait that long, you can call DummyPipettor.speed_up(factor) to cut all of those times down by a constant factor. Take a look at pcr.py for examples of how to use it. There's a --pipettor-speed argument defined in add_device_specific_common_args and used in PCRTask.setup(). (I typically run with a speed-up factor of 2.)
The pipetting system assumes that (1) any given pipettor can (more or less) only be filling one request at a time, (2) it will do so asynchronously to the board, and (3) the board may make other requests on it while it's fulfilling its current request. To handle this, it uses an AsyncFunctionSerializer (from mpam.types) whenever the state goes from idle (no outstanding requests) to not-idle (at least one outstanding request). The "not idle" message is printed by the code that spawns the thread, in this case in the main thread. The rest of the actual implementation takes place in the serializer's thread. When it's done and sees that it has an empty queue, it prints "idle", removes itself, and dies.
Could you please expand on how the pipettor becomes its own thread? How does it spawn?
The pipettor doesn't actually become its own thead. The "Dummy Pipettor Thread" is actually the _AFS_Thread spawned and held by the AsyncFunctionSerializer held by the TransferSchedule (in mpam.pipettor) held by the Pipettor base class of DummyPipettor. When the Pipetteor.Supply operation is scheduled, it calls add() to add the transfer request to the schedule (line 304). This winds up calling (line 179) enqueue() on the serializer. If there is no running _AFS_Thread (mpam.types, line 3648), one is created (line 3648) and started (line 3658). After the call, if the request queue is empty (line 3547), the thread removes itself (line 3550) and returns (line 3555), letting the thread be garbage collected.
Migrated from internal repository. Originally created by Rares Vernica on Apr 18, 2022 at 8:09 PM PDT.