tejzpr / ordered-concurrently

Ordered-concurrently a library for concurrent processing with ordered output in Go. Process work concurrently and returns output in a channel in the order of input. It is useful in concurrently processing items in a queue, and get output in the order provided by the queue.
BSD 3-Clause "New" or "Revised" License
38 stars 8 forks source link

Add back pressure #8

Open stangelandcl opened 1 year ago

stangelandcl commented 1 year ago

Description The library doesn't provide back pressure. A slow job at the head of the queue can allow an infinite number of outputs to accumulate in the heap waiting for the oldest job to finish.

To Reproduce

inputChan := make(chan orderedconcurrently.WorkFunction)
outChan := orderedconcurrently.Process(context.Background(),
inputChan, &orderedconcurrently.Options{PoolSize: 10, OutChannelBuffer: 10})
go func() {
      n := 0
      for out := range outChan {
          n += out.Value.(int)
          fmt.Println("output count", n)
      }
}()

for i := 0; i < 3; i++ {
        inputChan <- &Runner{}
}

inputChan <- &Slow{}

for i := 0; i < 500*1000*1000; i++ {
      inputChan <- &Runner{}
      if (i+1)%(1000*1000) == 0 {
          fmt.Println("input count", i+1)
      }
}

close(inputChan)
type WorkFunction interface {
    Run(ctx context.Context) interface{}
}

type Runner struct{}

func (r *Runner) Run(ctx context.Context) interface{} {
    return 1
}

type Slow struct{}

func (r *Slow) Run(ctx context.Context) interface{} {
    time.Sleep(60 * time.Second)
    return 1
}

Output

output count 1
output count 2
output count 3
input count 1000000
input count 2000000
input count 3000000
^Csignal: interrupt

Desired Results If it supported back pressure then the test would take 9 inputs then wait one minute for the slow job at the head of the queue to complete then start consuming inputs again.

Suggested change The heap allows an unbounded number of jobs to pile up. A queue that adds items as they are started, has a limit, a way to flag a job as complete or not, signal the returning function to wakeup and check if the head item in the queue has its "completed" flag set yet will result in back pressure.