contribsys / faktory

Language-agnostic persistent background job server
https://contribsys.com/faktory/
Other
5.78k stars 230 forks source link

Stop the manager from using os.Exit when sigterm happens #391

Closed jose-zenledger closed 2 years ago

jose-zenledger commented 2 years ago

Are you using an old version? No Have you checked the changelogs to see if your issue has been fixed in a later version? yes

https://github.com/contribsys/faktory/blob/master/Changes.md https://github.com/contribsys/faktory/blob/master/Pro-Changes.md https://github.com/contribsys/faktory/blob/master/Ent-Changes.md

IMO, if the server running the worker is basic and doesn't have other processes running like an http server or other types of worker routines then it makes sense to just have mgr.Run() at the end of your main function and let the faktory package handle listening for the signals to then exit the app. However if you do have other processes that need to be gracefully shut down then it doesn't make sense to do that. If you listen for signals manually in your app it's difficult to stop the manager and avoid the os.Exit(0) call.

Take the following code which doesn't have other processes running but is being done to prepare for an http server running:

package main

import (
    "context"
    "fmt"
    worker "github.com/contribsys/faktory_worker_go"
    "github.com/go-pg/pg/v9"
    "github.com/zenledger-io/zenledger-go/configuration"
    "github.com/zenledger-io/zenledger-go/db"
    "github.com/zenledger-io/zenledger-go/log"
    "github.com/zenledger-io/zenledger-go/workerfunc"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func fini() {
    time.Sleep(5 * time.Second) // give time for routines to cancel
}

func main() {
    defer fini()

    ctx := log.ContextWithUUID(context.Background(), configuration.UUID)
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    log.Start(ctx)

    logger := log.ContextLogger(ctx)
    defer logger.Monitor()

    faktoryURL := configuration.GetFaktoryURL(ctx)
    if err := os.Setenv("FAKTORY_PROVIDER", "FAKTORY_URL"); err != nil {
        logger.Errorf("error setting FAKTORY_PROVIDER env: %v", err)
        return
    }
    if err := os.Setenv("FAKTORY_URL", faktoryURL); err != nil {
        logger.Errorf("error setting FAKTORY_URL env: %v", err)
        return
    }

    dbParams := configuration.GetDBParameters(ctx)
    qh := db.NewHandler(&pg.Options{
        User:     dbParams.Username,
        Password: dbParams.Password,
        Database: dbParams.Database,
        Addr:     fmt.Sprintf("%v:%v", dbParams.Host, dbParams.Port),
    })
    if err := qh.Start(ctx); err != nil {
        logger.Errorf("error starting query handler: %v", err)
        return
    }

    mgr := worker.NewManager()
    mgr.Register("tax_calculation", workerfunc.Create(qh))
    mgr.Concurrency = configuration.FaktoryWorkerConcurrency
    mgr.ProcessStrictPriorityQueues("critical", "default", "bulk")

    go func() {
        defer func() {
            if err := recover(); err != nil {
                logger.Printf("%v", err)
            }
        }()

        logger.Printf("starting faktory worker")
        // Start processing jobs
        mgr.Run()
    }()

    go func() {
        stopSignals := []os.Signal{
            syscall.SIGTERM,
            syscall.SIGINT,
            syscall.SIGSTOP,
        }
        stop := make(chan os.Signal, len(stopSignals))
        for _, s := range stopSignals {
            signal.Notify(stop, s)
        }

        for {
            select {
            case <-ctx.Done():
                return
            case <-stop:
                mgr.Quiet()
                cancel()
            }
        }
    }()
    <-ctx.Done()

    logger.Printf("stopping server")
}

mgr.Quiet() gets called as soon as the signal comes in but it's too late. The faktory package will still call terminate and will exit internally. If I call mgr.Terminate(false) in that same spot I get a panic because terminate gets called twice and is not guarded by a mutex so the closed channel gets closed again. The only way I can think of to stop this is by adding a LifecycleEventHandler that blocks indefinitely so terminate can never finish but that is hacky and stops the manager from stopping gracefully.

Ideally there would be a RunWithContext(ctx context.Context) function that would shut down the manager when the context finishes or at least a way of turning off the signal listening. Also I propose using a mutex to guard Quiet and Terminate so that the state can properly be checked and we avoid a panic when calling terminate again from a different routine.

I'm happy to open a PR, just let me know.

mperham commented 2 years ago

Hi Jose, I’d love a PR. More Context support is always welcome.

On Mon, Jan 31, 2022 at 10:09 jose-zenledger @.***> wrote:

  • Which Faktory package and version? contribsys/faktory:1.0.1
  • Which Faktory worker package and version? github.com/contribsys/faktory_worker_go v1.5.0

Are you using an old version? No Have you checked the changelogs to see if your issue has been fixed in a later version? yes

https://github.com/contribsys/faktory/blob/master/Changes.md https://github.com/contribsys/faktory/blob/master/Pro-Changes.md https://github.com/contribsys/faktory/blob/master/Ent-Changes.md

IMO, if the server running the worker is basic and doesn't have other processes running like an http server or other types of worker routines then it makes sense to just have mgr.Run() at the end of your main function and let the faktory package handle listening for the signals to then exit the app. However if you do have other processes that need to be gracefully shut down then it doesn't make sense to do that. If you listen for signals manually in your app it's difficult to stop the manager and avoid the os.Exit(0) call.

Take the following code which doesn't have other processes running but is being done to prepare for an http server running:

package main import ( "context" "fmt" worker "github.com/contribsys/faktory_worker_go" "github.com/go-pg/pg/v9" "github.com/zenledger-io/zenledger-go/configuration" "github.com/zenledger-io/zenledger-go/db" "github.com/zenledger-io/zenledger-go/log" "github.com/zenledger-io/zenledger-go/workerfunc" "os" "os/signal" "syscall" "time" ) func fini() { time.Sleep(5 * time.Second) // give time for routines to cancel } func main() { defer fini()

ctx := log.ContextWithUUID(context.Background(), configuration.UUID) ctx, cancel := context.WithCancel(ctx) defer cancel()

log.Start(ctx)

logger := log.ContextLogger(ctx) defer logger.Monitor()

faktoryURL := configuration.GetFaktoryURL(ctx) if err := os.Setenv("FAKTORY_PROVIDER", "FAKTORY_URL"); err != nil { logger.Errorf("error setting FAKTORY_PROVIDER env: %v", err) return } if err := os.Setenv("FAKTORY_URL", faktoryURL); err != nil { logger.Errorf("error setting FAKTORY_URL env: %v", err) return }

dbParams := configuration.GetDBParameters(ctx) qh := db.NewHandler(&pg.Options{ User: dbParams.Username, Password: dbParams.Password, Database: dbParams.Database, Addr: fmt.Sprintf("%v:%v", dbParams.Host, dbParams.Port), }) if err := qh.Start(ctx); err != nil { logger.Errorf("error starting query handler: %v", err) return }

mgr := worker.NewManager() mgr.Register("tax_calculation", workerfunc.Create(qh)) mgr.Concurrency = configuration.FaktoryWorkerConcurrency mgr.ProcessStrictPriorityQueues("critical", "default", "bulk")

go func() { defer func() { if err := recover(); err != nil { logger.Printf("%v", err) } }()

  logger.Printf("starting faktory worker")
  // Start processing jobs
  mgr.Run()

}()

go func() { stopSignals := []os.Signal{ syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP, } stop := make(chan os.Signal, len(stopSignals)) for _, s := range stopSignals { signal.Notify(stop, s) }

  for {
      select {
      case <-ctx.Done():
          return
      case <-stop:
          mgr.Quiet()
          cancel()
      }
  }

}() <-ctx.Done()

logger.Printf("stopping server") }

mgr.Quiet() gets called as soon as the signal comes in but it's too late. The faktory package will still call terminate and will exit internally. If I call mgr.Terminate(false) in that same spot I get a panic because terminate gets called twice and is not guarded by a mutex so the closed channel gets closed again. The only way I can think of to stop this is by adding a LifecycleEventHandler that blocks indefinitely so terminate can never finish but that is hacky and stops the manager from stopping gracefully.

Ideally there would be a RunWithContext(ctx context.Context) function that would shut down the manager when the context finishes or at least a way of turning off the signal listening. Also I propose using a mutex to guard Quiet and Terminate so that the state can properly be checked and we avoid a panic when calling terminate again from a different routine.

I'm happy to open a PR, just let me know.

— Reply to this email directly, view it on GitHub https://github.com/contribsys/faktory/issues/391, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAAWX5EP2UURUFF5Y4FWHDUY3F37ANCNFSM5NG7HTKA . You are receiving this because you are subscribed to this thread.Message ID: @.***>

jose-zenledger commented 2 years ago

@mperham I created a PR and tested locally with our app and it works but will add tests to the PR if you think it looks good

https://github.com/contribsys/faktory_worker_go/pull/58/files