cue-lang / cue

The home of the CUE language! Validate and define text-based and dynamic configuration
https://cuelang.org
Apache License 2.0
5.04k stars 287 forks source link

cmd/cue: support for controlling parallelism in command tasks #709

Open cueckoo opened 3 years ago

cueckoo commented 3 years ago

Originally opened by @myitcv in https://github.com/cuelang/cue/issues/709

cue version: cue version 0.3.0-beta.1 darwin/amd64

I using a custom cue command to start a large number (100s) of independent tasks using exec.Run. The number of process tasks is too large to run them all in parallel, so I am trying to find a way to limit how many tasks are run at a time.

My current attempt involves grouping tasks, and making each group depend on the previous group completing. However, adding the $after dependencies makes the cue command very slow.

Here's an simplfied example of what I am trying:

test_tool.cue

package test

import (
    "tool/exec"
)

command: test: {
    let numTasks = 40
    let parallelism = 10

    _tasks: [ for i, _ in numTasks * [0] {"task-\(i)"}]

        // Number of task groups, each should have parallelism or less tasks
    _numGroups: __div(len(_tasks)+parallelism-1, parallelism)

        // Group tasks into numGroups groups
    _taskGroups: [
        for tg in {
            for i, t in _tasks {"\(__mod(i, _numGroups))": "\(t)": t}
        } {tg},
    ]

    for i, tg in _taskGroups
    for k, t in tg {
        let shellCommand = """
echo $(date +%!s(MISSING)): group:\( i ) in-group:\( k ) task:\( t ) starting
/bin/sleep 1
echo $(date +%!s(MISSING)): group:\( i ) in-group:\( k ) task:\( t ) finished
"""

        task: "\( i )": "\( k )": exec.Run & {
            // Run each group serially
            if i > 0 {
                $after: task["\( i-1 )"]
            }
            cmd: ["sh", "-e", "-c", shellCommand]
        }
    }
}

This should start four groups of tasks serially. The tasks in a group run in parallel. I would expect the tasks to take about four seconds, but it takes over a minute on my computer, and causes high CPU. time cue test shows:

real    1m12.176s
user    1m37.900s
sys 0m3.872s

If I comment out the $after: task["\( i-1 )"], time cue test shows:

real    0m1.340s
user    0m0.667s
sys 0m0.494s

Is there a reason adding these dependencies is so expensive? Is there a better way to control how many tasks run at once?

Originally posted by @svend in https://github.com/cuelang/cue/discussions/640

cueckoo commented 3 years ago

Original reply by @myitcv in https://github.com/cuelang/cue/issues/709#issuecomment-771712539

Quoting myself from https://github.com/cuelang/cue/discussions/640#discussioncomment-331122:

I've just hit a similar use case where I need to effectively hold a mutex whilst running a specific task (that writes to a part of a file), a number of instances of which are created, much like above, using a comprehension based on a file.Glob.

How about a solution that uses the equivalent of a lengthn buffered channel? Not sure what that would look like in terms of a declaration (using a tool/sync builtin of sorts?), but a task would effectively be able to depend on such a limiter.

Initialising the value of such a limiter (via a builtin) would take a positive integer value. If we were to support the injection of special values like os, we might extend that to include the CUE equivalent of Go's runtime.NumCPU().

cueckoo commented 3 years ago

Original reply by @myitcv in https://github.com/cuelang/cue/issues/709#issuecomment-772808008

Noting a discussion from this afternoon regarding this:

command: x: exec.Run & {
    $semaphore: s1: 3
    cmd: "........"
}

where a $semaphore field on a command is essentially a [string]: int type, allowing a command to depend on multiple semaphores