DAGWorks-Inc / burr

Build applications that make decisions (chatbots, agents, simulations, etc...). Monitor, persist, and execute on your own infrastructure.
https://burr.dagworks.io
BSD 3-Clause Clear License
740 stars 41 forks source link

Parallel map-style actions #52

Open elijahbenizzy opened 4 months ago

elijahbenizzy commented 4 months ago

Overview

We should be able to launch a whole bunch of actions in parallel. Walking the state machine in parallel is tricky, so this proposes the following:

  1. A MultiAction that performs a map operation over some state field
  2. This then spawns and delegates subactions
  3. These sub actions each have access to a portion of that state field + everything else they need in state
  4. These then write back to state
  5. The user specifies how to reduce the result
  6. This is done via a configurable executor -- asyncio, threading, etc... With default of multithreading (for sync) and asyncio (for async)

Use-cases

  1. PR-bot -- want to determine how to respond for each comment, also for each file
  2. Firing off requests to multiple LLMs at once to determine the best/quickest
  3. Optimizing over multiple ML algorithms at once.
  4. Web scraper -- send a bunch of requests out

Requirements:

  1. Recursive, all the way down. Should use the same executor for sub-actions, sub-sub-actions, etc...
  2. Each individual update (from an action completing) is treated as a state update
  3. Idempotency is encouraged, if not built in (for instance, it'll decide the tasks on the input fields minus the output fields)
  4. Configurable parallelism (max rate, etc...)
  5. Hooks, everything respected as expected
  6. Quick-exit mode (get the first then cancel the others), join-all mode (join them all)

Design Qs:

  1. Should the actions that can be launched from an action all be the same? Or should there be a possibility to launch two of them?
  2. How to handle action failures? Should we allow some? Do retries? End up having the actions themselves handle failure cases and not deal with them at all?
  3. API -- class-based? Function-based?
elijahbenizzy commented 4 months ago

This is very likely dependent on #33 -- we'll need to layer/apply updates to state in a nice way. We can probably get around that but it is worth considering.

skrawcz commented 4 months ago

What I would do today is just do it internal to the action:

@action(...)
def my_parallel_action(state: State, .. ) -> tuple(dict, state):
      # do the parallelization myself with threading/asyncio/etc.
      futures = [ ... ]
      # wait for first, or wait for all
     result = wait_for_futures(futures)
     # do state update here / handle any failures etc
     state = update_state(state, result)
     return result, state

So the question if we provide framework support for something like this is "what are we removing/simplifying/enhancing'"?

elijahbenizzy commented 4 months ago

What I would do today is just do it internal to the action:


@action(...)

def my_parallel_action(state: State, .. ) -> tuple(dict, state):

      # do the parallelization myself with threading/asyncio/etc.

      futures = [ ... ]

      # wait for first, or wait for all

     result = wait_for_futures(futures)
     # do state update here / handle any failures etc
     state = update_state(state, result)
     return result, state

So the question if we provide framework support for something like this is "what are we removing/simplifying/enhancing'"?

Not a requirement (yet). But the enhancement is visibility/telemetry/checkpointing for long-running jobs.

Furthermore, when we combine this with the ability to build actions recursively then it really simplifies things.

Say you're building a PR bot such as ellipsis. You'll want to respond to each comment in parallel, otherwise it'll take too long. Each comment could be a multi-step process. This + composable actions make that feasible. Again, not necessary (there are multiple ways to model it).

elijahbenizzy commented 1 month ago

Plan of action:

Use-case (sample, for scrapegraph):

elijahbenizzy commented 1 month ago

Design for phase (1):

@action(...)
def spawning_action(..., __context: AppContext):
    app = (
        ApplicationBuilder()...
        .with_spawning_parent(
            __context.app_id, 
            __context.sequence_id)
        .with_tracker(__context.tracker)
        .build()
     )
    ... = app.run(...)
    return ...