RichardKnop / machinery

Machinery is an asynchronous task queue/job queue based on distributed message passing.
Mozilla Public License 2.0
7.52k stars 914 forks source link

Help with Periodic Task Run on v1 #778

Closed cameronbarclift closed 9 months ago

cameronbarclift commented 2 years ago

I'm having issues running a periodic task. I'm setting up the machinery server with a redis backend, then sleeping, and I can submit individual task runs that get picked up by my worker just fine. But the periodic tasks don't ever submit to redis or run. Any ideas on what I'm doing wrong?

I'm on macOS, running machinery v1

func main() {
    server := initMachinery()

    signature := &tasks.Signature{
        Name: "add",
        Args: []tasks.Arg{
            {
                Type:  "int64",
                Value: 1,
            },
            {
                Type:  "int64",
                Value: 1,
            },
        },
    }

    err := server.RegisterPeriodicTask("* * * * *", "add-task", signature)

    server.SendTask(signature)  // this works
    if err != nil {
        log.Fatalln(err)
    }
    fmt.Println("Starting server")
    fmt.Println(server.GetRegisteredTaskNames())

    time.sleep(1000 * time.Second)
}

func initMachinery() (server *machinery.Server) {
    cnf := config.Config{
        Broker:                  "redis://localhost:6379/0",
        Lock:                    "redis://localhost:6379/0",
        MultipleBrokerSeparator: "",
        DefaultQueue:            "default",
        ResultBackend:           "redis://localhost:6379/0",
        ResultsExpireIn:         0,
        AMQP:                    nil,
        SQS:                     nil,
        Redis:                   nil,
        GCPPubSub:               nil,
        MongoDB:                 nil,
        TLSConfig:               nil,
        NoUnixSignals:           false,
        DynamoDB:                nil,
    }

    server, err := machinery.NewServer(&cnf)

    if err != nil {
        log.Fatalln(err)
    }

    err = server.RegisterTasks(map[string]interface{}{
        "add":      Add,
        "multiply": Multiply,
    })

    if err != nil {
        log.Fatalln(err)
    }

    return server
}

func Add(args ...int64) (int64, error) {
    sum := int64(0)
    for _, arg := range args {
        sum += arg
    }
    fmt.Println(sum)
    return sum, nil
}
gurpreetg commented 1 year ago

err := server.RegisterPeriodicTask(" *", "add-task", signature)

according to the docs err := server.RegisterPeriodicTask(" ", "add-task", signature), this needs to be `server.RegisterPeriodicTask(" ", "periodic-task", signature)`

cameronbarclift commented 9 months ago

very late: closing since I'm not working on this project anymore, thanks for answering though