acaloiaro / neoq

Queue-agnostic background job library for Go, with a pleasant API and powerful features.
MIT License
270 stars 4 forks source link

redesign HandlerCtxVars so it doesn't need to include a pgx.Tx #35

Closed github-actions[bot] closed 1 year ago

github-actions[bot] commented 1 year ago

The deadline is the amount of time that can be spent executing the handler's Func

when a deadline is exceeded, the job is failed and enters its retry phase

the default concurrency is the number of (v)CPUs on the machine running Neoq

queues that have reached capacity cause Enqueue() to block until the queue is below capacity

https://github.com/acaloiaro/neoq/blob/a9f2f6b4f46c1df8ad2be60de85c2f84a1753aad/handler/handler.go#L44


package handler

import (
    "context"
    "errors"
    "fmt"
    "runtime"
    "time"

    "github.com/acaloiaro/neoq/jobs"
)

const (
    DefaultHandlerDeadline = 30 * time.Second
)

type contextKey struct{}

var (
    CtxVarsKey           contextKey
    ErrContextHasNoJob   = errors.New("context has no Job")
    ErrNoHandlerForQueue = errors.New("no handler for queue")
    // TODO this error is here because cyclic imports with neoq
    ErrNoProcessorForQueue = errors.New("no processor configured for queue")
)

// Func is a function that Handlers execute for every Job on a queue
type Func func(ctx context.Context) error

// Handler handles jobs on a queue
type Handler struct {
    Handle        Func
    Concurrency   int
    Deadline      time.Duration
    QueueCapacity int64
}

// CtxVars are variables passed to every Handler context
type CtxVars struct {
    Job *jobs.Job
    // this is a bit hacky. Tx here contains a pgx.Tx for PgBackend, but because we're in the handlers package, and we don't
    // want all neoq users to have pgx as a transitive dependency, we store Tx as any, and coerce it to a pgx.Tx inside
    // the postgres backend
    // TODO redesign HandlerCtxVars so it doesn't need to include a pgx.Tx
    Tx any
}

// Option is function that sets optional configuration for Handlers
type Option func(w *Handler)

// WithOptions sets one or more options on handler
func (h *Handler) WithOptions(opts ...Option) {
    for _, opt := range opts {
        opt(h)
    }
}

// Deadline configures handlers with a time deadline for every executed job
// The deadline is the amount of time that can be spent executing the handler's Func
// when a deadline is exceeded, the job is failed and enters its retry phase
func Deadline(d time.Duration) Option {
    return func(h *Handler) {
        h.Deadline = d
    }
}

// Concurrency configures Neoq handlers to process jobs concurrently
// the default concurrency is the number of (v)CPUs on the machine running Neoq
func Concurrency(c int) Option {
    return func(h *Handler) {
        h.Concurrency = c
    }
}

// MaxQueueCapacity configures Handlers to enforce a maximum capacity on the queues that it handles
// queues that have reached capacity cause Enqueue() to block until the queue is below capacity
func MaxQueueCapacity(capacity int64) Option {
    return func(h *Handler) {
        h.QueueCapacity = capacity
    }
}

// New creates a new queue handler
func New(f Func, opts ...Option) (h Handler) {
    h = Handler{
        Handle: f,
    }

    h.WithOptions(opts...)

    // default to running one fewer threads than CPUs
    if h.Concurrency == 0 {
        Concurrency(runtime.NumCPU() - 1)(&h)
    }

    // always set a job deadline if none is set
    if h.Deadline == 0 {
        Deadline(DefaultHandlerDeadline)(&h)
    }

    return
}

// WithContext creates a new context with the job and transaction set
func WithContext(ctx context.Context, v CtxVars) context.Context {
    return context.WithValue(ctx, CtxVarsKey, v)
}

// Exec executes handler functions with a concrete time deadline
func Exec(ctx context.Context, handler Handler) (err error) {
    deadlineCtx, cancel := context.WithDeadline(ctx, time.Now().Add(handler.Deadline))
    defer cancel()

    var errCh = make(chan error, 1)
    var done = make(chan bool)
    go func(ctx context.Context) {
        errCh <- handler.Handle(ctx)
        done <- true
    }(ctx)

    select {
    case <-done:
        err = <-errCh
        if err != nil {
            err = fmt.Errorf("job failed to process: %w", err)
        }

    case <-deadlineCtx.Done():
        ctxErr := deadlineCtx.Err()
        if errors.Is(ctxErr, context.DeadlineExceeded) {
            err = fmt.Errorf("job exceeded its %s deadline: %w", handler.Deadline, ctxErr)
        } else if errors.Is(ctxErr, context.Canceled) {
            err = ctxErr
        } else {
            err = fmt.Errorf("job failed to process: %w", ctxErr)
        }
    }

    return
}

// JobFromContext fetches the job from a context if the job context variable is already set
func JobFromContext(ctx context.Context) (*jobs.Job, error) {
    if v, ok := ctx.Value(CtxVarsKey).(CtxVars); ok {
        return v.Job, nil
    }

    return nil, ErrContextHasNoJob
}