gammazero / workerpool

Concurrency limiting goroutine pool
MIT License
1.33k stars 138 forks source link

Pause a workerpool through keyboard #42

Open Numenorean opened 3 years ago

Numenorean commented 3 years ago

Hello dear developer, I'm looking for a way to pause a workerpool with pressing some key("P" as an example) and resume work with the same key. And I want to completely stop workerpooI with another key(maybe with "S" key). I have construction as an your example

gammazero commented 3 years ago

I am not entirely certain what you are asking for. Detecting user input, or whatever other signal that indicates workers should be paused, must be done by your application. Upon detecting the "P" key pressed, you application should call Pause. Upon detecting the "S" key, you application should call Stop or StopWait

Numenorean commented 3 years ago

I am not entirely certain what you are asking for. Detecting user input, or whatever other signal that indicates workers should be paused, must be done by your application. Upon detecting the "P" key pressed, you application should call Pause. Upon detecting the "S" key, you application should call Stop or StopWait

i'm really don't know how to do it, i have tried this, but it have not work for me

       if err := keyboard.Open(); err != nil {
        panic(err)
    }
    defer func() {
        _ = keyboard.Close()
    }()
    ctx, cancel := context.WithCancel(context.Background())
    for i, str := range resources.data {
                char, _, _ := keyboard.GetKey()
            if char == 112 {
            wp.Pause(ctx)
        } else if char == 115 {
            cancel()
        }
        str := str
        i := i
        wp.Submit(func() {
                    doSomeWork()
                }
gammazero commented 3 years ago

Couple things that could be a problem:

  1. If you pause more than once, you are trying to use a context that is already canceled. You need a new context.
  2. The "s" key is being used to resume work (canceling the contest). Perhaps you meant for "s" to stop workers?

Here is a complete working example of the above, with a few changes:

package main

import (
    "context"
    "fmt"

    "github.com/eiannone/keyboard"
    "github.com/gammazero/workerpool"
)

var data = []string{"a", "b", "c", "d", "e", "f", "g"}

func main() {
    wp := workerpool.New(5)

    if err := keyboard.Open(); err != nil {
        panic(err)
    }

    var cancel context.CancelFunc
    defer func() {
        keyboard.Close()
        if cancel != nil {
            cancel()
        }
    }()

    fmt.Println("Press p to pause/resume, s to stop, any other key to do work")
    for _, str := range data {
        char, _, err := keyboard.GetKey()
        if err != nil {
            panic(err)
        }
        switch char {
        case 'p':
            if cancel == nil {
                // pause work
                var ctx context.Context
                ctx, cancel = context.WithCancel(context.Background())
                wp.Pause(ctx)
                fmt.Println("--- paused ---")
            } else {
                // already paused, so resume work
                fmt.Println("--- resuming ---")
                cancel()
                cancel = nil
            }
        case 's':
            wp.StopWait()
            fmt.Println("--- stopped ---")
            return
        }
        str := str
        wp.Submit(func() {
            doSomeWork(str)
        })
    }
    wp.StopWait()
    fmt.Println("no more data")
}

func doSomeWork(s string) {
    fmt.Println("doWork", s)
}
Numenorean commented 3 years ago

Couple things that could be a problem:

  1. If you pause more than once, you are trying to use a context that is already canceled. You need a new context.
  2. The "s" key is being used to resume work (canceling the contest). Perhaps you meant for "s" to stop workers?

Here is a complete working example of the above, with a few changes:

package main

import (
  "context"
  "fmt"

  "github.com/eiannone/keyboard"
  "github.com/gammazero/workerpool"
)

var data = []string{"a", "b", "c", "d", "e", "f", "g"}

func main() {
  wp := workerpool.New(5)

  if err := keyboard.Open(); err != nil {
      panic(err)
  }

  var cancel context.CancelFunc
  defer func() {
      keyboard.Close()
      if cancel != nil {
          cancel()
      }
  }()

  fmt.Println("Press p to pause/resume, s to stop, any other key to do work")
  for _, str := range data {
      char, _, err := keyboard.GetKey()
      if err != nil {
          panic(err)
      }
      switch char {
      case 'p':
          if cancel == nil {
              // pause work
              var ctx context.Context
              ctx, cancel = context.WithCancel(context.Background())
              wp.Pause(ctx)
              fmt.Println("--- paused ---")
          } else {
              // already paused, so resume work
              fmt.Println("--- resuming ---")
              cancel()
              cancel = nil
          }
      case 's':
          wp.StopWait()
          fmt.Println("--- stopped ---")
          return
      }
      str := str
      wp.Submit(func() {
          doSomeWork(str)
      })
  }
  wp.StopWait()
  fmt.Println("no more data")
}

func doSomeWork(s string) {
  fmt.Println("doWork", s)
}

thanks, but it isn't what i want. I have been trying to do system like that:

Numenorean commented 3 years ago

@gammazero could u help me? i have a problem with pausing workerpool. I've solved the problem with pressing keyboard key to pause, but it doesn't work correctly. When i try to pause workerpool with queue ~6k, it waits for complete all tasks. is there any way to limit tasks queue? Smth like limiting queue to workers num, so i will be able to just wait for all workers are finished

gammazero commented 3 years ago

When i try to pause workerpool with queue ~6k, it waits for complete all tasks

The pause feature was designed to work by pausing at the point after executing all previously submitted tasks. One reason for this was to avoid doing any additional checks for pause signal in the critical path. I see that for your use case, you would prefer that workerpool pauses without consuming additional queued items. I will evaluate a minimal approach this week.

is there any way to limit tasks queue?

The purpose of workerpool is to accept tasks without ever blocking. If the task queue size is limited, then workerpool will need to block (I do not think discarding tasks is appropriate). My thought on providing this is to have a new flavor of workerpool that is created using a new constructor: workerpool.NewBlocking(maxWorkers, queueSize int). I will also be able to look into this and provide an experimental version this week.

For your use case(s), do you see one or both of these features as useful?

Numenorean commented 3 years ago

@gammazero thanks. just for me first feature would be more useful. But now i realise one big problem, when i call pause method it blocks main goroutine because it waits for complete all tasks, so i cannot resume working(function that pause pool doesn't call anyway). Here is example, i believe there is an any solution:

package main

import (
    "context"
    "fmt"
    "os"
    "time"

    "github.com/Numenorean/keypresses"
    "github.com/gammazero/workerpool"
)

var cancel context.CancelFunc

func checkPauseOrStop(wp *workerpool.WorkerPool) {
    if keypresses.IsKeyPressedGlobal(0x50, false) {
        if cancel == nil {
            // pause work
            var ctx context.Context
            ctx, cancel = context.WithCancel(context.Background())
            wp.Pause(ctx)
            fmt.Println("--- paused ---")
        } else {
            // already paused, so resume work
            fmt.Println("--- resuming ---")
            cancel()
            cancel = nil
        }
    } else if keypresses.IsKeyPressedGlobal(0x53, false) {
        wp.StopWait()
        fmt.Println("--- stopped ---")
        os.Exit(1)
    }
}

func main() {
    wp := workerpool.New(5)

    fmt.Println("Press p to pause/resume, s to stop, any other key to do work")
    for i := 0; i < 0xf4240; i++ {
        checkPauseOrStop(wp)
        fmt.Println("Queue:", wp.WaitingQueueSize())
        wp.Submit(func() {
            doSomeWork(i)
        })
    }
    wp.StopWait()
    fmt.Println("no more data")
}

func doSomeWork(s int) {
    fmt.Println("Work:", s)
    time.Sleep(5 * time.Second)
}
Numenorean commented 3 years ago

@gammazero i've posted an issue above this post, is there anyway to resolve that?

gammazero commented 3 years ago

@Numenorean The way to solve that is for workerpool to implement the pause differently. As I stated above:

The pause feature was designed to work by pausing at the point after executing all previously submitted tasks.

Since you have a large number of tasks already submitted, and each task takes 5 seconds to complete, you will be waiting for a very long time for a call to Pause to return.

If you want the pause to take effect immediately then a different pause implementation is needed; one that does not wait for previously submitted tasks to finish. I will publish an experimental example soon.

Numenorean commented 3 years ago

@Numenorean The way to solve that is for workerpool to implement the pause differently. As I stated above:

The pause feature was designed to work by pausing at the point after executing all previously submitted tasks.

Since you have a large number of tasks already submitted, and each task takes 5 seconds to complete, you will be waiting for a very long time for a call to Pause to return.

If you want the pause to take effect immediately then a different pause implementation is needed; one that does not wait for previously submitted tasks to finish. I will publish an experimental example soon.

any news?

Numenorean commented 3 years ago

Hey man, any news?

Numenorean commented 3 years ago

any news?

gammazero commented 3 years ago

I do have a couple implementations that I can publish in a branch. Here are some variations. Do you need any other than 1?

  1. Pause immediately (before running more queued tasks) and continue to queue submitted tasks
  2. Pause immediately and block new tasks from being submitted
  3. Pause after running all tasks that were already submitted, and continue queuing new tasks
  4. Pause after running all tasks that were already submitted, and block any new tasks

3 and 4 might better be called SyncPause. I am considering that blocking submission of new tasks (2 and 4) could be a parameter to pause. Thoughts?

Numenorean commented 3 years ago

continue to queue submitted tasks

Sorry for not replying right away. I don't quite understand if I have, for example, 1 million tasks, after pause -> waiting for all the tasks to be queued -> resume, will I be able to pause it one more time? And what about memory usage, after all the million tasks will be queued?

Jbaukens commented 2 years ago

Hello @gammazero hope u are doing well. Did you eventually published somewhere in a branch option 1 of your latest comment ?

Cheers

gammazero commented 2 years ago

@Jbaukens, @Numenorean https://github.com/gammazero/workerpool/tree/pause-immediate