HPInc / HP-Digital-Microfluidics

HP Digital Microfluidics Software Platform and Libraries
MIT License
3 stars 1 forks source link

Add a task thread to collapse long-running future chains #56

Open EvanKirshenbaum opened 9 months ago

EvanKirshenbaum commented 9 months ago

While debugging the macro language implementation, it became clear that the current Delayed implementation leads to very deep stacks, since when a value is posted, the subsequent actions are taken immediately (and sequentially).

I honestly don't know if this is a problem, but I suspect that it will become one when I add loops to the language or if somebody tries to write recursive macros. There's already an informal notion of "If you're going to do something that takes a long time, do it in a separate thread," but if you get this wrong, the whole system's going to block, and it's probably not going to be the thread you expect that's going to block (because you don't know which thread is going to post the value that triggers you).

What I'm thinking of is having a separate thread (or several) that monitor a run queue of tasks, each comprising a function and an argument (or sequence of arguments) to pass to the function. The task thread(s) simply pull off tasks and execute them. (It's quite possible that Python already has something very much like this.)

The main change would be that Delayed.chain() and .transform() get changed to something like .transformed_immediate() and chain_immediate(). These are used for things that are known to be very lightweight. The implementation of .chain() and .transformed() now go from

    def chain(self, fn: Callable[[T], Delayed[V]]) -> Delayed[V]:
        future = Delayed[V]()
        self.when_value(lambda val: fn(val).post_to(future))
        return future

to something more like

    def chain(self, fn: Callable[[T], Delayed[V]]) -> Delayed[V]:
        task = ChainedTask[T,V](fn)
        self.when_value(task)
        return task.future

with

class Task(Generic[T,V], ABC):
  future: Final[Delayed[V]]

  def __init__(self) -> None:
      self.future = Delayed[V]()
  def __call__(val: T) -> None: 
      # enqueue the task and value
  @abstractmethod  
  def run(val: T) -> None: ...

class ChainedTask(Task[T,V]):
  func: Final[Callable[[T], Delayed[V]]]
  def run(val: T) -> None:
      self.func(val).post_to(self.future)

class TransformTask(Task[T,V]):
  func: Final[Callable[[T], V]]
  def run(val: T) -> None:
      self.future.post(self.func(val))
Migrated from internal repository. Originally created by @EvanKirshenbaum on Oct 29, 2021 at 1:06 PM PDT.
EvanKirshenbaum commented 9 months ago

I'm not sure whether or not there should be a separate form for .when_value to similarly enqueue rather than call directly. Probably. If so, then it might make more sense for .chain and transformed to be untouched and just use those functions. (And maybe just have an extra arg to say which you want.)

Most of the other methods for Delayed (e.g., then_trigger or post_to) are known to be cheap, and so would work with the "immediate" variant.

I'm not sure about the number of threads to use. We'll need a lock on the queue in any case, since tasks will be enqueued from various threads. One notion I'm toying with is having each executor thread have a serial number of the task it's executing. Then we could have an extra thread that pops up every so often, and if no working thread has made progress (i.e., changed its serial number), a new executor thread is spawned. When an executor other than the first sees that the queue is empty, it sets an "I can be killed" flag, and if the monitor thread sees that, the thread is killed.

Migrated from internal repository. Originally created by @EvanKirshenbaum on Oct 29, 2021 at 1:12 PM PDT.