Open dmadisetti opened 6 months ago
Marimo could work on the execution graph in parallel since the dependencies are resolved I'm sure this has already been considered, but I couldn't find an issue for it
Yes, thanks for opening an issue. This is something I'd like to experiment with in the coming weeks; it's just too cool to not try it ...!
I came across an async batching pattern today and thought of this discussion: https://stackoverflow.com/questions/68137200/python-async-coroutines-in-batches
The referenced library is very small, and probably just worth copying: https://marimo.app/l/cmn6g0
I think my understanding of the runtime is still a little naive, but would something like this work?
dataflow
def topological_sort_by_batch(
graph: DirectedGraph, cell_ids: Collection[CellId_t]
) -> list[list[CellId_t]]:
"""Sort `cell_ids` in topological layers for async execution."""
parents, children = induced_subgraph(graph, cell_ids)
roots = [cid for cid in cell_ids if not parents[cid]]
sorted_cell_batch = [copy(roots)]
# BFS with explicit layers
while roots:
sorted_cell_batch.append([])
roots, layer = [], roots
for pid in layer:
for child in children[pid]:
sorted_cell_batch[-1].append(child)
parents[child].remove(pid)
if not parents[child]:
roots.append(child)
# TODO make sure parents for each id is empty, otherwise cycle
return sorted_cell_batch
...
async def run_cell_async(
self, cell_id: CellId_t, kwargs: dict[str, Any]
) -> tuple[Any, dict[str, Any]]:
# ...
execute_batched_async = semaphore_wrap(execute_cell_async)
for cids in topological_sort_by_batch(graph, ancestor_ids):
# TODO: Clone globals or explicitly shared refs? See #1142
await asyncio.gather(*[
execute_batched_async(graph.cells[cid], glbls)
for cid in cids])
Very cool. Thanks for sharing!
Because the event loop is single threaded, the example code would provide concurrency (if multiple cells used top-level await, but I'm not sure that's a common pattern) but not parallelism.
What we really want is parallelism. To get parallelism we'd need to use threads (in which we'd get parallelism if cells dropped the GIL, e.g. when dropping into NumPy or PyTorch, which both release the GIL ASAP) or processes (no GIL, but globals would need to be cloned).
Still, the general idea would be the same: sort into layers and execute.
Ridiculous, ipyparallel recommends building and managing your own DAG: https://ipyparallel.readthedocs.io/en/latest/reference/dag_dependencies.html
I think marimo could just build upon this opposed to trying to manage communication itself.
"marimo cloud" option could be to deploy to GCE or something with a click of a button. Just easy distributed compute would put it ahead of other competition.
I think marimo could just build upon this opposed to trying to manage communication itself.
I'll take a look, thanks for the pointer.
"marimo cloud" option could be to deploy to GCE or something with a click of a button. Just easy distributed compute would put it ahead of other competition.
This would be pretty amazing. Been noodling on ideas like this for a bit. We should jam over another video call soon ...
Description
I started doing more intense compute work in Marimo, and noticed the kernel is locked when executing a cell.
Suggested solution
Marimo could work on the execution graph in parallel since the dependencies are resolved I'm sure this has already been considered, but I couldn't find an issue for it
I do think the dependency sidebar could be a bit more useful even without parallel execution (color which cells are queued to run, which cells have run, and which cell is (cells are) running)
Alternative
the functional style required for reactivity means that in theory, you could maybe extend to do data streaming/ map-reduce/ distributed computation with little effort on the user's part
but maybe this is a cloud or enterprise solution :)
Additional context
No response