PyLabRobot / pylabrobot

interactive & hardware agnostic SDK for lab automation
https://docs.pylabrobot.org
MIT License
178 stars 63 forks source link

LiquidHandler._run_async_in_thread is broken #303

Open jrast opened 6 days ago

jrast commented 6 days ago

The implementation of LiquidHandler._run_async_in_thread is not working if the called method actually performs some async stuff.

MWE:

# %%
from dummy_backend import DummyBackend
from pylabrobot.liquid_handling import LiquidHandler
from pylabrobot.resources import (
    TIP_CAR_480_A00,
    STARDeck,
    standard_volume_tip_no_filter,
)

deck = STARDeck()
deck.assign_child_resource(TIP_CAR_480_A00("tip-carrier"), rails=5)

backend = DummyBackend()
lh = LiquidHandler(backend, deck)

await lh.setup()

With the following dummy_backend.py

import asyncio
from typing import List, Union

from pylabrobot.liquid_handling.backends.backend import (
    LiquidHandlerBackend,
)
from pylabrobot.liquid_handling.standard import (
    Aspiration,
    AspirationContainer,
    AspirationPlate,
    Dispense,
    DispenseContainer,
    DispensePlate,
    Drop,
    DropTipRack,
    Move,
    Pickup,
    PickupTipRack,
)
from pylabrobot.resources import Resource

class DummyBackend(LiquidHandlerBackend):
    """The MLPrep backend."""

    def __init__(self):
        super().__init__()
        self._future = None

    # --- Liquid Handling Methods, Required by PLR ---

    async def setup(self):
        async def some_background_task():
            await asyncio.sleep(1)

        self._task = asyncio.create_task(some_background_task())
        await super().setup()

    async def stop(self):
        super().stop()

    async def assigned_resource_callback(self, resource: Resource):
        """Called when a new resource was assigned to the robot.

        This callback will also be called immediately after the setup method has been called for any
        resources that were assigned to the robot before it was set up. The first resource will always
        be the deck itself.

        Args:
          resource: The resource that was assigned to the robot.
        """
        await super().assigned_resource_callback(resource)
        print(f"Assigned resource: {resource.name}")
        await self._task

    async def unassigned_resource_callback(self, name: str):
        """Called when a resource is unassigned from the robot.

        Args:
          resource: The name of the resource that was unassigned from the robot.
        """
        await super().unassigned_resource_callback(name)
        print(f"Unassigned resource: {name}")

    @property
    def num_channels(self) -> int:
        """The number of channels that the robot has."""
        return 2

    async def pick_up_tips(self, ops: List[Pickup], use_channels: List[int]):
        """Pick up tips from the specified resource."""
        raise NotImplementedError()

    async def drop_tips(self, ops: List[Drop], use_channels: List[int]):
        """Drop tips from the specified resource."""
        raise NotImplementedError()

    async def aspirate(self, ops: List[Aspiration], use_channels: List[int]):
        """Aspirate liquid from the specified resource using pip."""
        raise NotImplementedError()

    async def dispense(self, ops: List[Dispense], use_channels: List[int]):
        """Dispense liquid from the specified resource using pip."""
        raise NotImplementedError()

    async def pick_up_tips96(self, pickup: PickupTipRack):
        """Pick up tips from the specified resource using CoRe 96."""
        raise NotImplementedError()

    async def drop_tips96(self, drop: DropTipRack):
        """Drop tips to the specified resource using CoRe 96."""
        raise NotImplementedError()

    async def aspirate96(self, aspiration: Union[AspirationPlate, AspirationContainer]):
        """Aspirate from all wells in 96 well plate."""
        raise NotImplementedError()

    async def dispense96(self, dispense: Union[DispensePlate, DispenseContainer]):
        """Dispense to all wells in 96 well plate."""
        raise NotImplementedError()

    async def move_resource(self, move: Move):
        """Move a resource to a new location."""
        raise NotImplementedError()

    async def prepare_for_manual_channel_operation(self, channel: int):
        """Prepare the robot for manual operation."""
        raise NotImplementedError()

    async def move_channel_x(self, channel: int, x: float):
        """Move the specified channel to the specified x coordinate."""
        raise NotImplementedError()

    async def move_channel_y(self, channel: int, y: float):
        """Move the specified channel to the specified y coordinate."""
        raise NotImplementedError()

    async def move_channel_z(self, channel: int, z: float):
        """Move the specified channel to the specified z coordinate."""
        raise NotImplementedError()

Running this code will lead to the following exception (only copied the relevant part):

Exception in thread Thread-3 (callback):
Traceback (most recent call last):
  File "C:\Python312\Lib\threading.py", line 1052, in _bootstrap_inner
    self.run()
  File "d:\...\.venv\Lib\site-packages\ipykernel\ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "C:\Python312\Lib\threading.py", line 989, in run
    self._target(*self._args, **self._kwargs)
  File "D:\...\pylabrobot\liquid_handling\liquid_handler.py", line 218, in callback
    loop.run_until_complete(func(*args, **kwargs))
  File "C:\Python312\Lib\asyncio\base_events.py", line 664, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "d:\....\dummy_backend.py", line 51, in assigned_resource_callback
    await self._future
RuntimeError: Task <Task pending name='Task-6' coro=<DummyBackend.assigned_resource_callback() running at [d:\...\dummy_backend.py:51](file:///D:/PREP_VM/PLR-Demo/dummy_backend.py:51)> cb=[_run_until_complete_cb() at [C:\Python312\Lib\asyncio\base_events.py:180](file:///C:/Python312/Lib/asyncio/base_events.py:180)]> got Future <Future pending> attached to a different loop
jrast commented 6 days ago

Note: I replaced the self._future above with a task which is running for 1sec. The future was never resolved, so the example was kind of flawed (would block because the future is never resolved)

jrast commented 6 days ago

I tried to replace the _run_async_in_thread function code with something like this:

    loop = asyncio.get_event_loop()
    future = asyncio.run_coroutine_threadsafe(func(*args, **kwargs), loop)
    # Note: Regardless of how we wait for the future, the event loop will be blocked.

This assumes that a event loop is running, which is a sane expectation because most of the LH methods are async. While this works, func is not finished when _run_async_in_thread returns. This makes the behaviour unpredictable. Can we continue execution without any issues? For example there might still be some configuration commands waiting to be sent to the device when we actually start the execution of aspirations / dispensations.

rickwierenga commented 6 days ago

while I will debug this, some suggestions that might potentially be a faster solution:

again, will debug this, but wanted to share these ideas in case it allows you to move forward without getting stuck on this approach

jrast commented 6 days ago
  • might it be possible to queue commands and send them just-in-time when eg aspirate is called?

I strongly dislike this idea. It's to easy to forget to send the queued commands in one of the liquid handling methods. And sending the queued commands if backend level methods are called would also be impossible (or would require a complicated implementation)

At the moment I actually do something like command queueing and run the queued operations once the deck layout is final. I can live with this as currently the decklayout cannot change throughout the run.

rickwierenga commented 6 days ago

depends on the use case. Some features only require a number when it's necessary. (eg tip definitions in PLR are only sent using TT when needed for a command like TP, not preemptively based on what is assigned on the deck)

but if it's truly a queue of things to send, yes, that is not good. A powerful feature of PLR is to be able to change the deck at runtime, and we should make it easy for all backends to support this.

jrast commented 5 days ago

Ok, this is off-topic for this issue but:

eg tip definitions in PLR are only sent using TT when needed for a command like TP, not preemptively based on what is assigned on the deck

I did not look at the implementation, but if the tip definition is written each time a tip is picked up, some overhead is generated each time. Performance wise this would be improved if the tip definition is only written when the deck is updated. While this sounds like nitpicking, if the current strategy is followed, more and more configuration commands will be executed before each step (tip-pickup, aspirate, dispense, ...) and the execution time will increase.

rickwierenga commented 5 days ago

it is checked if the definition for a tip with the exact characteristics have already been written. The "key"/"hash" into the Tip->tt idx if you will is the tip definition itself.

jrast commented 5 days ago

Another idea, which does not solve the reported bug or the queuing problem: Why not just make assigned_resource_callback and unassigned_resource_callback of the backend sync methods instead of async ones?

rickwierenga commented 5 days ago

I introduced this pattern for Opentrons (oh boy) but yes I could adopt a similar JIT approach there.

Since assigning resources is purely declarative there is no reason to have any part be async.

rickwierenga commented 5 days ago

I am not a fan of going through an unrelated queue of commands the next best time an async method is called. It seems to me we can just send parts of the "queue" when a related function is called (eg pick up tip on a tip rack). Preferably there would be no queue, we just look at the resource (tip rack in the example) and handle it appropriately. For static things, the backend has access to the deck.

But perhaps my imagination is lacking here