balena-io-modules / mahler

A automated task composer and HTN based planner for building autonomous system agents.
Apache License 2.0
7 stars 1 forks source link

Add Parallel task support. #28

Closed pipex closed 1 year ago

pipex commented 1 year ago

This PR adds support for parallelism in method tasks on Mahler.

While expanding a method during plan search, the planner will default to expanding methods in parallel branches. Before adding the result of the method back to the plan, the evaluation will check for conflicts on parallel branches. This is done by inspecting the list of changes, calculated as a JSON patch, and looking for intersecting changes between any branches. If a conflict is found, then the planner will re-do the expansion of the method in sequence.

For example, given the following code

const inc = Task.of({
    condition: (state: number, { target }) => state < target,
    effect: (state: number) => state + 1,
    action: async (state: number) => state + 1,
    description: '+1',
});

const byTwo = Task.of({
    condition: (state: number, { target }) => target - state > 1,
    method: (_: number, { target }) => [inc({ target }), inc({ target })],
    description: '+2',
});

const planner = Planner.of<number>({
    tasks: [byTwo, inc],
});

planner.findPlan(0, 3);

Calling byTwo in parallel results in a conflict, as we are modifying the same part of the state. Thus the planner treats the method as a sequential operation, resulting in the following plan

graph TD
    start(( ))
    start -.- d0{ }
    d0 -.- 21ac729[["+2"]]
    21ac729 -.- dc2da69("+1")
    dc2da69 -.- 19e83b4("+1")
    19e83b4 -.- d1{ }
    d1 -.- c1bb3cb[["+2"]]
    c1bb3cb -.- c1bb3cb-err[ ]
    c1bb3cb-err:::error
    d1 -.- c9c70c6("+1")
    c9c70c6 -.- stop(( ))
    stop:::finish
    classDef finish stroke:#000,fill:#000
    start:::selected
    start --> dc2da69
    dc2da69:::selected
    dc2da69 --> 19e83b4
    19e83b4:::selected
    19e83b4 --> c9c70c6
    c9c70c6:::selected
    c9c70c6 --> stop
    classDef error stroke:#f00
    classDef selected stroke:#0f0

Now let's take the following definition.

type Counters = { [k: string]: number };

const byOne = Task.of({
    path: '/:counter',
    condition: (state: Counters, ctx) => ctx.get(state) < ctx.target,
    effect: (state: Counters, ctx) => ctx.set(state, ctx.get(state) + 1),
    description: ({ counter }) => `${counter} + 1`,
});

const incMany = Task.of({
    condition: (state: Counters, ctx) =>
        Object.keys(state).filter((k) => ctx.target[k] - state[k] > 0).length >
        1,
    method: (state: Counters, ctx) =>
        Object.keys(state)
            .filter((k) => ctx.target[k] - state[k] > 0)
            .map((k) => byOne({ counter: k, target: ctx.target[k] })),
    description: `incMany`,
});

const planner = Planner.of({
    tasks: [incMany, byOne],
    config: { trace },
});

planner.findPlan({ a: 0, b: 0 }, { a: 3, b: 2 });

Counters a and b can be safely increased in parallel, which results in the following expansion

graph TD
        start(( ))
        start -.- d0{ }
        d0 -.- 043511f[["incMany"]]
        043511f -.- fa1322e("a + 1")
        043511f -.- 799e0e8("b + 1")
        fa1322e -.- j17bab65
        799e0e8 -.- j17bab65
        j17bab65(( ))
        j17bab65 -.- d1{ }
        d1 -.- 32e60ae[["incMany"]]
        32e60ae -.- 7411235("a + 1")
        32e60ae -.- 45cbc50("b + 1")
        7411235 -.- j0270df9
        45cbc50 -.- j0270df9
        j0270df9(( ))
        j0270df9 -.- d2{ }
        d2 -.- 8c39be5[["incMany"]]
        8c39be5 -.- 8c39be5-err[ ]
        8c39be5-err:::error
        d2 -.- 2627161("a + 1")
        2627161 -.- stop(( ))
        stop:::finish
        classDef finish stroke:#000,fill:#000
        start:::selected
        start --> fj17bab65(( ))
        fj17bab65:::selected
        fj17bab65 --> fa1322e
        fa1322e:::selected
        fj17bab65 --> 799e0e8
        799e0e8:::selected
        j17bab65(( ))
        fa1322e --> j17bab65
        799e0e8 --> j17bab65
        j17bab65:::selected
        j17bab65 --> fj0270df9(( ))
        fj0270df9:::selected
        fj0270df9 --> 7411235
        7411235:::selected
        fj0270df9 --> 45cbc50
        45cbc50:::selected
        j0270df9(( ))
        7411235 --> j0270df9
        45cbc50 --> j0270df9
        j0270df9:::selected
        j0270df9 --> 2627161
        2627161:::selected
        2627161 --> stop
        classDef error stroke:#f00
        classDef selected stroke:#0f0

This PR also updates utilities for testing that plans correspond to an expected outcome. For instance, for the latter example above, we may want to test that the resulting plan corresponds with our expectations. On testing, we could do the following

// On top of the test file
import { plan, branch, fork, stringify } from 'maher/testing';

// On the test suite
const result = planner.findPlan({ a: 0, b: 0 }, { a: 3, b: 2 });
expect(stringify(result)).to.deep.equal(
    plan()
        .fork(branch('a + 1'), branch('b + 1'))
        .fork(branch('a + 1'), branch('b + 1'))
        .action('a + 1')
        .end(),
);

Where

plan()
    .fork(branch('a + 1'), branch('b + 1'))
    .fork(branch('a + 1'), branch('b + 1'))
    .action('a + 1')
.end()

Creates a string representation of a plan with two forks, each fork with two branches updating values a and b respectively

The string representation will return a string like the following, that should make it easier to compare the plans visually

+ ~ - a + 1
  ~ - b + 1
+ ~ - a + 1
  ~ - b + 1
- a + 1

Where - indicates an action, + indicates a fork and ~ indicates a branch of the fork

Note that despite the amount of changes, this is an initial PR to enable parallelism. Conflict detection and backtracking when parallelism fails is still rudimentary, and this is still heavily lacking documentation

Change-type: minor

pipex commented 1 year ago

I self-certify!