HPInc / HP-Digital-Microfluidics

HP Digital Microfluidics Software Platform and Libraries
MIT License
3 stars 1 forks source link

Describe Opentrons protocol #197

Open EvanKirshenbaum opened 9 months ago

EvanKirshenbaum commented 9 months ago

As requested by Rares Vernica and Mark Huber, I'm going to sketch out the protocol between Thylacine and the Opentrons OT2 robot. Rather than do this as a Word document, I'm going to just describe it in comments to this issue.

Migrated from internal repository. Originally created by @EvanKirshenbaum on Aug 26, 2022 at 11:38 AM PDT.
EvanKirshenbaum commented 9 months ago

The interface to the rest of the MPAM

The Opentrons robot is reified by an instance of OT2, which is a subclass of Pipettor, and the point of the API is to implement Pipettor.perform():

    def perform(self, transfer: Transfer) -> None:
        ...

where

class Transfer:
    def __init__(self, reagent: Reagent, xfer_dir: XferDir, *, is_product: bool = False) -> None:
         ...

and an XferDir is one of FILL or EMPTY. The Transfer also contains

    targets: list[XferTarget]

which specifies where the Reagent should be taken from or delivered to, how much should transfer, and what Postable should get a value when the operation is complete (which might take multiple trips and may be unable to fully complete the request). An XferTarget is defined as

class XferTarget(ABC):
    def __init__(self, target: PipettingTarget, volume: Volume,
                 *,
                 future: Postable[Liquid],
                 allow_merge: bool,
                 on_unknown: ErrorHandler,
                 on_insufficient: ErrorHandler,
                 ) -> None:
        ...

with subclasses for FillTarget (with an optional MixResult) and EmptyTarget (with an optional Postable[ProductLocation]).

This is all sitting under a fair bit of mechanism implemented by the Pipettor's TransferSchedule, which keeps track of pending Transfers and calls perform() asynchronously when Pipettor is idle and there are outstanding requests. When a new request is seen, it ensures that there is a Transfer for the Reagent and creates a XferTarget to add to the Transfers targets. This allows multiple requests for the same reagent to be combined into a single Transfer, potentially allowing, e.g., a single aspiration from a source well and dispensing into multiple Wells and/or ExtractionPoints.

Requests are added to the schedule by means of the Pipettor.Extract and Pipettor.Supply operations. These operations are typically the result of (directly or indirectly) calling Well.add(), Well.remove(), ExtractionPoint.transfer_out(), and ExtractionPoint.transfer_int().

Migrated from internal repository. Originally created by @EvanKirshenbaum on Aug 26, 2022 at 12:30 PM PDT.
EvanKirshenbaum commented 9 months ago

OT2 components

The OT2 architecture is split into two parts, one which runs on the robot itself and one that runs locally to the MPAM process.

Local OT2 components

Within the MPAM process, the OT2 consists of the following components

Robot-side components

The components that run on the robot are uploaded (by the ProtocolManager) when OT2.join_system() is called. Because the robot runs a different (much older) version of Python, these components are not specified under the src directory, but rather at top level in a special opentrons directory.

The principal components are

Migrated from internal repository. Originally created by @EvanKirshenbaum on Aug 26, 2022 at 5:38 PM PDT.
EvanKirshenbaum commented 9 months ago

opentrons.Listener runs as a background thread. To implement the handoff from OT2.perform(), it contains

OT2.perform() then becomes simply

    def perform(self, transfer:Transfer) -> None:
        listener = self.listener
        with listener.lock:
            while listener.pending_transfer is not None:
                listener.ready_for_transfer.wait()
            listener.pending_transfer = transfer
            listener.have_transfer.notify()
        # Now we wait until it's done
        with listener.lock:
            while listener.pending_transfer is not None:
                listener.ready_for_transfer.wait()

(Recall that this is called asynchronously. It returns when the transfer has completed.)

Within Listener.ready(), which handles the request for next task from the robot, the corresponding code (somewhat simplified) is run:

    async def ready(self, request: Request) -> Response:
        ...
        with self.lock:
            self.pending_transfer = None
            self.ready_for_transfer.notify()
        # Now we wait until we have one
        with self.lock:
            while self.system_running and self.pending_transfer is None:
                self.have_transfer.wait()
            ...
        ...

system_running is used to tell the Listener that the protocol has finished and it should tell the robot to shut down.

Migrated from internal repository. Originally created by @EvanKirshenbaum on Aug 26, 2022 at 5:54 PM PDT.
EvanKirshenbaum commented 9 months ago

From the robot's point of view, the protocol is performed by opentrons_support.Robot.loop(). Stripped down:

class Robot:
   def loop(self) -> None:
        dirs = {"fill": Direction.FILL, "empty": Direction.EMPTY}
        well_map = self.board.well_map
        while True:
            call_params = self.prepare_call()
            if self.last_product_well is not None:
                call_params["product_well"] = str(self.last_product_well)
            resp = self.http_post("ready", json = call_params)

            body = resp.json()
            cmd = body["command"]
            if cmd == "exit":
                break
            d = dirs[cmd]
            r = body["reagent"]
            well_specs = body["targets"]

            wells = [(well_map[ws["well"]], ws["volume"]) for ws in well_specs]
            transfers = self.plan(d, r, wells)

            on_board = {ws[0] for ws in wells}
            self.do_transfers(transfers, r, on_board=on_board)

Each time around the loop, the robot posts to the "ready" path. If the prior request was to remove a product, a "product_well" parameter is included to specify where it wound up. This call will return when there's something to do.

The response to the "ready" call will contain a "command", which will be one of "fill", "empty", or "exit". If it is "exit", the loop terminates and the protocol shuts down. Otherwise, the response will include a "reagent" and a "targets" list, each element of which contains a "well" and a "volume". The robot then calls plan(), which results in a sequence of Transfers, each of which contains an opentrons_support.Pipettor (i.e., which pipettor to use) and a sequence of XferOps, each of which encapsulates a direction (i.e., aspirate or dispense), a volume, and a target (i.e., which well on which labware).

do_transfers():

   def do_transfers(self, transfers: Sequence[Transfer], reagent: str, *, 
                     on_board: set[Well]) -> None:
        p: Optional[Pipettor] = None
        for t in transfers:
            if p is not t.pipettor:
                if p is not None :
                    p.release_tip()
                p = t.pipettor
                p.get_tip_for(reagent)
            for op in t.ops:
                p.process(op, on_board=on_board)
        if p is not None:
            p.release_tip()

iterates through the Transfers. For each, it obtains the appropriate tip for the reagent (if this is the first Transfer or if it uses a different Pipettor from the last Transfer, in which case it returns or discards the tip in use), then it performs the Transfer by having the current Pipettor process() each of the XferOps. Finally, it returns or discards the tip in use (if any).

The rest of the protocol takes place within process():

    def process(self, op: XferOp[Well], *, on_board: set[Well]) -> None: 
        w = op.target
        v = op.volume
        p = self.pipette
        pause_around = w in on_board
        if pause_around:
            self.wait_for_clearance(w, v)
        if isinstance(op, AspirateOp):
            p.aspirate(v, w)
        else:
            p.dispense(v, w)
        if pause_around:
            self.signal_clear(w, v)

    def wait_for_clearance(self, well: Well, volume: float) -> None:
        self.pipette.move_to(well.top(1))
        self.robot.call_waiting(well, volume)

    def signal_clear(self, well: Well, volume: float) -> None:
        self.pipette.move_to(well.top(1))
        self.robot.call_finished(well, volume)

First it checks (pause_around) whether the target well is on the MPAM Board or not. If it is, it first moves (in wait_for_clearance()) to 1 mm above the well and calls call_waiting() on the Robot. This makes a call on the "waiting" HTTP path, passing in the "well" name and the "volume" (as a number of microliters). This call returns when it is safe to proceed to aspirate() or dispense(). After the liquid transfer is done, the Pipettor calls signal_clear(), which moves back up to 1 mm above the well and calls signal_clear() on the Robot. This makes a call on the "finished" HTTP path, passing in the "well" name, the "volume", to indicate that it is now safe to undo whatever reservations were made by the "waiting" call.

There is also a "message" path, which can be used to communicate a desire to print a string passed as the "message" parameter, and an "exit" path, which signals that the Robot has finished running the looping protocol.

Migrated from internal repository. Originally created by @EvanKirshenbaum on Aug 27, 2022 at 3:56 PM PDT.
EvanKirshenbaum commented 9 months ago

On the MPAM side, the paths are handled by the Listener thread. Note that by the protocol described above, only "message" and "exit" can actually race the others.

The guts of the "ready" handler were described above [comment by @EvanKirshenbaum on Aug 26, 2022 at 5:54 PM PDT]. What was simplified out there was the handling of any "product_well" parameter and the packaging up of the command, reagent, and targets expected by the robot.

Within the "waiting" handler, prepare_for_remove() or prepare_for_add() is called on the PipettingTarget involved (i.e., the Well or ExtractionPoint). In the case of an ExtractionPoint target, it will call ensure_drop() (if necessary) and reserve_pads(), waiting for the returned future to get a value.

Within the "finished" handler, pipettor_removed() or pipettor_added() is called on the PipettingTarget involved). This will update the representation of the contents of a Well or the Blobs (and, thereby, Drops) at or near an ExtractionPoint. In the case of an ExtractionPoint, this will finally call unreserve_pads(), but not wait.

Within the "message" handler, a munged version of the message is printed (probably should be logged).

Within the "exit" handler, the Listener notes that it is no longer running and raises an aiohttp.web_server.GracefulExit exception, which shuts down the web server and, I believe, closes the connection immediately.

Migrated from internal repository. Originally created by @EvanKirshenbaum on Aug 27, 2022 at 4:16 PM PDT.