HPInc / HP-Digital-Microfluidics

HP Digital Microfluidics Software Platform and Libraries
MIT License
3 stars 1 forks source link

Rework implementation of `Delayed` and `Postable` to be fairer #289

Open EvanKirshenbaum opened 10 months ago

EvanKirshenbaum commented 10 months ago

The current implementation of Postable.post() is

    def post(self, val: Tcontra) -> None:
        assert not self.has_value
        self._val = (True, val)
        lock = self._maybe_lock
        if lock is not None:
            with lock:
                for fn in self._callbacks:
                    fn(val)
                del self._callbacks

The fn(val) call results in very deep stacks, especially when there's a long chain of futures (as will happen during DML compilations), and the fact that it's done in a loop means that the second callback function won't be processed until all of the (possibly indirect) consequences of the first are done. This is both inefficient and unfair. Also, because the for loop presumably holds onto the list while it's iterating, it also means that the functions (and any objects they hold onto) can't be garbage collected until the entire list has been processed.

What I'm proposing is to replace that loop by one or more background threads with a shared agenda. When we a value is posted and we have callbacks, we add those callbacks as (value, function) pairs to the agenda. The background threads pull individual pairs off the agenda and process them. I think that the standard queue.Queue should suffice (although it would be nice to have a lock-free implementation). I've worked with ChatGPT4 to come up with a worker pool implementation that looks as though it should work well here. It automatically increases and decreases the number of threads based on how long it's taking workers to process tasks.

Even more low-hanging are optimizations that can be made to _CompleteDelayed[T], which is what's returned by Delayed.complete(val). This asserts its value at construction time, so in Delayed.when_value():

    def when_value(self, fn: Callable[[Tco], Any]) -> Delayed[Tco]:
        v = self._val
        just_run: bool = v[0]
        if not just_run:
            with self._lock:
                v = self._val
                just_run = v[0]
                if not just_run:
                    self._callbacks.append(fn)
        if just_run:
            fn(v[1])
        return self

we will always decide to just_run, but we still have to make the check. _CompleteDelayed can skip the check. But more importantly, there are a lot of methods that merely create a Callable that's passed to when_value(), and several of these involve creating Postables. For example:

    def transformed(self, fn: Callable[[Tco], V]) -> Delayed[V]:
        future = Postable[V]()
        def post_transformed(val: Any) -> None:
            future.post(fn(val))
        self.when_value(post_transformed)
        return future

In CompleteDelayed, this could simply be

  def transformed(self, fn: Callable[[T], V]) -> _CompleteDelayed[V]:
      return Delayed.complete(fn(val))

omitting the need to create and post to a Postable and returning the more efficient CompleteDelayed.

Migrated from internal repository. Originally created by @EvanKirshenbaum on Jul 11, 2023 at 12:32 PM PDT.
EvanKirshenbaum commented 10 months ago

This issue was referenced by the following commit before migration:

EvanKirshenbaum commented 10 months ago

I added the optimizations to _CompleteDelayed. It doesn't seem to have broken anything, and my gut feeling from watching the combinatorial synthesis demo run is that it may well have helped a fair bit.

Migrated from internal repository. Originally created by @EvanKirshenbaum on Jul 11, 2023 at 12:56 PM PDT.
EvanKirshenbaum commented 10 months ago

Implementing Postable.post() in terms of a new WorkerPool class, along the lines of what I worked out with ChatGPT4 was surprisingly easy. The new implementation is

    def post(self, val: Tcontra) -> None:
        assert not self.has_value
        self._val = (True, val)
        lock = self._maybe_lock
        if lock is not None:
            with lock:
                wp = self._work_pool()
                for fn in self._callbacks:
                    wp.add_task((val, fn))
                del self._callbacks

The only change is that if there's anything to do, we grab the class's WorkerPool (creating it the first time) and add the pairs as tasks. It works just fine with the combinatorial synthesis demo as well as interactive commands.

I'm going to want to do some more serious stress testing, though, to determine whether it's actually a win as well as to ensure that the flexing to create and remove threads based on load actually works. (The combinatorial synthesis code doesn't appear to stress it enough.) So, while I merged the _CompletedDelayed changes into master, I'm going to leave these changes in the issue.289 branch for now.

Migrated from internal repository. Originally created by @EvanKirshenbaum on Jul 11, 2023 at 3:33 PM PDT.
EvanKirshenbaum commented 10 months ago

And running the combinatorial synthesis demo long enough, we appear to get a deadlock (but no worker thread spawns beyond the first two). So, I'm definitely not merging it in yet.

Migrated from internal repository. Originally created by @EvanKirshenbaum on Jul 11, 2023 at 3:39 PM PDT.