hibiken / asynq

Simple, reliable, and efficient distributed task queue in Go
MIT License
9k stars 659 forks source link

How to mark a task as completed externally #862

Open ScribeSavant opened 2 months ago

ScribeSavant commented 2 months ago

Hello, How can I mark a process as completed externally, for example I have an http server and I create a job with it and I want to stop the job with it, I tried the 'Inspector.CancelProcessing()' option but the job falls into the retry section and then starts again.

When I set MaxRetry to 0, the work goes to the archive section on restarts, and it does not start again.

Here is my full code

Controller

// create job
func (s *Server) createEvm() http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        payload, err := io.ReadAll(r.Body)

        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
        }

        now := time.Now()
        fiveHours := now.Add(5 * time.Hour)
        t1 := workers.NewEvmTask(payload, asynq.MaxRetry(0), asynq.Deadline(fiveHours))
        info, err := s.workerServer.Enqueue(t1)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
        }
        w.Header().Set("content-type", "application/json")
        json.NewEncoder(w).Encode(info)
    }
}

// stop job
func (s *Server) stopTask() http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        var payload *CancelTaskPayload

        if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
        }

        if err := s.workerServer.Inspector.CancelProcessing(payload.JobID); err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
        }
        w.Header().Set("content-type", "application/json")
        json.NewEncoder(w).Encode(map[string]bool{"stopped": true})
    }
}

Evm Worker

package workers

import (
    "context"
    "fmt"
    "time"

    "github.com/hibiken/asynq"
)

type EvmTaskPayload struct {
    UserID int `json:"userId"`
}

type EvmJob struct {
}

func NewEvmTask(payload []byte, opts ...asynq.Option) *asynq.Task {
    return asynq.NewTask("evm:task", payload, opts...)
}

func (e *EvmJob) Processor(ctx context.Context, t *asynq.Task) error {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Stopped")
            return nil
        default:
            fmt.Println("Still working")
            time.Sleep(3 * time.Second)
        }
    }
}

Worker server

package workers

import (
    "context"

    "github.com/hibiken/asynq"
)

type WorkerServer struct {
    *asynq.Server
    *asynq.ServeMux
    *asynq.Client
    *asynq.Inspector
}

func CreateWorkerServer() *WorkerServer {
    s := &WorkerServer{
        Server: asynq.NewServer(
            asynq.RedisClientOpt{Addr: "localhost:6379"},
            asynq.Config{Concurrency: 1000},
        ),
        ServeMux: asynq.NewServeMux(),
        Client: asynq.NewClient(
            asynq.RedisClientOpt{Addr: "localhost:6379"},
        ),
        Inspector: asynq.NewInspector(
            asynq.RedisClientOpt{Addr: "localhost:6379"},
        ),
    }

    s.routes()
    return s
}

func (w *WorkerServer) routes() {
    w.ServeMux.HandleFunc("evm:task", func(ctx context.Context, t *asynq.Task) error {
        job := &EvmJob{}
        return job.Processor(ctx, t)
    })
}

func (w *WorkerServer) StopJob(jobId string) {
    w.Inspector.CancelProcessing(jobId)
    w.Inspector.DeleteAllArchivedTasks("default")
}

In this example, when the process is stopped, it falls into the retry section, but since the maximum Retry is 0, it is automatically archived, but the problem is that if the server or the entire code is restarted, the jobs fall into the archive in the same way.

thucnq commented 1 month ago

Maybe you can try method DeleteTask