sourcegraph / conc

Better structured concurrency for go
https://about.sourcegraph.com/blog/building-conc-better-structured-concurrency-for-go
MIT License
9.39k stars 322 forks source link

Proposal: concurrency with 1.23 iterator #137

Open amikai opened 4 months ago

amikai commented 4 months ago

Proposal

I propose the four functions

package iter
// unorder
func SeqMap[In, Out any](iter.Seq[In], func(In) Out) iter.Seq[Out]
func Seq2Map[In1, In2, Out1, Out2 any](iter.Seq2[In1, In2], func(In1, In2) (Out1, Out2)) iter.Seq2[Out1, Out2]

package stream
// order
func SeqMap[In, Out any](iter.Seq[In], func(In) Out) iter.Seq[Out]
func Seq2Map[In1, In2, Out1, Out2 any](iter.Seq2[In1, In2], func(In1, In2) (Out1, Out2)) iter.Seq2[Out1, Out2]

SeqMap under conc iter:

for v := range iter.SeqMap(slices.Values([]int{1, 2, 3, 4, 5, 6, 7}), func(x int) int { return x * 2 }) {
        if v == 25 {
            break
        }
    fmt.Println(x)
}
// Output is not order:
// 4
// 1
// 9
// 16
// 36

SeqMap2 under conc iter: similar to iter.SeqMap but returns two values, allowing it to be used for error handling.

// we assume that seq2WithError produce following
// 1, nil
// 2, nil
// 3, errors.New("123")
// 4, nil
// 5, nil
var seq2WithError iter.Seq2[int, error] = ...

for v := range iter.SeqMap2(seq2WithError, func(x int, err error) int { return x * 2, err }) {
        if err != nil {
            break
        }
        fmt.Println(x)
}
// Output is not order:
// 4
// 1
// 16

SeqMap under conc stream: similar to iter.SeqMap, but in order way.

for v := range iter.SeqMap(slices.Values([]int{1, 2, 3, 4, 5, 6, 7}), func(x int) int { return x * 2 }) {
        if v == 25 {
            break
        }
    fmt.Println(x)
}
// Output is ordered:
// 1
// 4
// 9
// 16

SeqMap2 under conc conc: similar to stream.SeqMap but returns two values, allowing it to be used for error handling.

// we assume that seq2WithError produce following
// 1, nil
// 2, nil
// 3, errors.New("123")
// 4, nil
// 5, nil
var seq2WithError iter.Seq2[int, error] = ...

for v := range stream.SeqMap2(seq2WithError, func(x int, err error) int { return x * 2, err }) {
        if err != nil {
            break
        }
        fmt.Println(x)
}
// Output is ordered:
// 1
// 4

Some thought

I believe the best aspects of a concurrent iterator are its ability to calculate lazily and its resource-saving capability. Functions like iter.Map and iter.ForEach will exhaust all elements, but with an iterator, when the output iterator stops, the input iterator should also cease retrieving elements and stop all goroutines.

amikai commented 4 months ago

The part I find more troublesome when implementing is that letting the user decide when to stop the loop means developer have to have to implement the complex mechanism to close the channel at receiver side.

I think we also need to consider how to use it with the context. However, I have no idea how to do that at the moment.

camdencheek commented 4 months ago

Hello @amikai! I've been toying with ideas in this direction on my end as well, so I'm glad to see others think this is an exciting possibility. Thank you for writing up this proposal.

When users break the loop, the running goroutines should be canceled to avoid wasting resources.

This one is a quite tricky since it requires conc to know how to cancel running goroutines. The standard way to do this is to have a cancellable context, but that means that conc must either be given a cancel() func, or it must own the context that child goroutines respect.

One way around it is just to leave cancellation to the caller. I personally like this because although it is a little harder to use, it's more predictable IMO and leaves control fully in the hands of the user.

ids := slices.Values([]int{1,2,3,4,5,6})
func fetchSquare(ctx context.Context, id int) {
    ...
}

ctx, cancel := context.WithCancel(ctx)

for v := range iter.SeqMap(ints, func(x int) int { return fetchSquare(ctx, x) }) {
    if v == 25 {
        cancel()
        break
    }
    fmt.Println(v)
}

letting the user decide when to stop the loop means developer have to have to implement the complex mechanism to close the channel at receiver side.

I think we can design this so that the consumer doesn't have to worry about it. Because of the design of the iter package, control will be yielded back to conc when the user breaks the loop (the yield func returns false), which gives us the opportunity to clean up goroutines and propagate panics.

amikai commented 4 months ago

This is a good idea, but I want to ask if it might cause a goroutine leak if the user does not call cancel and the for loop has already been broken.

Or in another situation, the context is already canceled but there is no break in for loop.

Perhaps we can pass the context by changing the method to iter.SeqMapCtx(ctx, xxx), or by creating a struct with WithContext like Pool.WithContext(ctx). This way, users have the choice to cancel themselves, but after a break, iter.SeqMap will definitely cancel (we can derive a cancellable context during implementation).

This will make it easier for users because they won’t need to worry about forgetting to cancel it. But I think this will cause problems for developers, including increased implementation difficulty and how to handle panic situations.

// function declaration
func SeqMapCtx[In, Out any](context.Context, iter.Seq[In], func(context.Context, In) Out ) iter.Seq[Out] {...}

ids := slices.Values([]int{1,2,3,4,5,6})
func fetchSquare(ctx context.Context, id int) int {
    ...
}

ctx := context.Background()

// SeqMapCtx will derived the context with cancel
for v := range iter.SeqMapCtx(ctx, fetchSquare}) {
    if v == 25 {
        break // when yield return false, cancel the context and clean up goroutines
    }
    fmt.Println(v)
}

Summary of my opinion

This is my opinion. I hope it helps.

NOTE: I haven't PoC yet. I'm not sure if it can be implemented.