hibiken / asynq

Simple, reliable, and efficient distributed task queue in Go
MIT License
9k stars 659 forks source link

[FEATURE REQUEST] Dynamically Changing Concurrency Config Value? #852

Open windowshopr opened 3 months ago

windowshopr commented 3 months ago

Is your feature request related to a problem? Please describe. I would LOVE, and maybe it's already done somehow I just haven't found it yet, to be able to create a separate handler function for calculating a changing "Concurrency" value, based on the worker servers that will connect/disconnect over the uptime of the server. For example, if I have a POTENTIAL pool of say 100 workers across 10 different machines, it doesn't make much sense to set the value to the default of 10 as this would severely limit the worker pool, however setting a maximum of 100 is sort of a naive approach as well. What would be cool is to allow worker servers to connect/disconnect whenever they want (like say, when a new machine is purchased and wants to be added to the server without stopping it, changing the Concurrency, then restarting it), and have a function that updates the Concurrency value in real-time based on the number of threads now available to the scheduler!

Describe the solution you'd like Conceivably, I haven't tested this code yet, something like this could work:

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "strconv"
    "time"

    "github.com/hibiken/asynq"
)

// Define the task type
const TypeSendEmail = "email:send"

// Define the task payload
type EmailPayload struct {
    To      string
    Subject string
    Body    string
}

// Define the task function
func sendEmail(ctx context.Context, task *asynq.Task) error {
    var payload EmailPayload
    if err := task.Payload(&payload); err != nil {
        return fmt.Errorf("failed to unpack payload: %v", err)
    }

    // Send the email here
    fmt.Printf("Sending email to %s with subject: %s\n", payload.To, payload.Subject)
    time.Sleep(5 * time.Second) // Simulating email sending
    fmt.Println("Email sent successfully!")

    return nil
}

func main() {
    // Create a new Asynq server
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: "localhost:6379"},
        asynq.Config{
            Concurrency: getInitialConcurrency(),
            Queues: map[string]int{
                "critical": 6,
                "default":  3,
                "low":      1,
            },
        },
    )

    // Create a new ServeMux and register the email task function
    mux := asynq.NewServeMux()
    mux.HandleFunc(TypeSendEmail, sendEmail)

    // Start a background goroutine to update the Concurrency value
    go updateConcurrency(srv)

    // Run the server
    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}

// getInitialConcurrency retrieves the initial number of available worker nodes from the configuration
func getInitialConcurrency() int {
    // In a real-world scenario, you would use a configuration management system
    // or a service discovery mechanism to retrieve the initial number of available workers
    numWorkersStr := os.Getenv("NUM_WORKERS")
    if numWorkersStr == "" {
        return 10 // Default to 10 workers if not set
    }

    numWorkers, err := strconv.Atoi(numWorkersStr)
    if err != nil {
        log.Fatalf("failed to parse NUM_WORKERS: %v", err)
    }

    return numWorkers
}

// updateConcurrency periodically checks for changes in the worker pool and updates the Concurrency value
func updateConcurrency(srv *asynq.Server) {
    ticker := time.NewTicker(1 * time.Minute) // Check for changes every minute
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            numWorkers, err := getNumWorkers()
            if err != nil {
                log.Printf("failed to get number of workers: %v", err)
                continue
            }

            if numWorkers != srv.Config().Concurrency {
                log.Printf("Updating Concurrency from %d to %d", srv.Config().Concurrency, numWorkers)
                srv.UpdateConfig(asynq.Config{
                    Concurrency: numWorkers,
                    Queues:      srv.Config().Queues,
                })
            }
        }
    }
}

// getNumWorkers retrieves the number of available worker nodes from the configuration
func getNumWorkers() (int, error) {
    // In a real-world scenario, you would use a configuration management system
    // or a service discovery mechanism to retrieve the number of available workers
    numWorkersStr := os.Getenv("NUM_WORKERS")
    if numWorkersStr == "" {
        return 10, nil // Default to 10 workers if not set
    }

    numWorkers, err := strconv.Atoi(numWorkersStr)
    if err != nil {
        return 0, fmt.Errorf("failed to parse NUM_WORKERS: %v", err)
    }

    return numWorkers, nil
}

This sets the initial number of workers from the .env file, and then starts a separate go routine to update the Concurrency config value from the same .env file value. In practise, there would be a "getAvailableCPUCount" method instead of grabbing from a .env file. Only question, without testing it yet, would be if the Concurrency value is in fact able to be updated in real-time and take advantage of the newly connect workers or not. I plan on testing this out by monitoring the GUI to see if it works once I get my multiple machines setup, just thought I'd post it here in case something is already in the works for it, and if it might be of use to others.

Describe alternatives you've considered NONE

Additional context I will update this thread with my results if desired.

kamikazechaser commented 3 months ago

Related to #850