hibiken / asynq

Simple, reliable, and efficient distributed task queue in Go
MIT License
9.64k stars 696 forks source link

[FEATURE REQUEST] Ability add workers dynamically on a server #250

Open sujit-baniya opened 3 years ago

sujit-baniya commented 3 years ago

Currently whenever the task is queued, it creates queue dynamically. For a server to allow to consume the queue, server needs to be stopped, add queue and queue handlers on Mux manually and then start the server.

In my view, creating the workers dynamically is important feature (like in RabbitMQ) without manually intervening server operation.

hibiken commented 3 years ago

@sujit-baniya thank you for creating this issue!

I think this is a commonly requested feature. I'll come up with a design/proposal and update this thread again 👍

sujit-baniya commented 2 years ago

@hibiken Not sure if it's correct way of adding handler dynamically, this is working for me so far


// Checks server state and returns an error if pre-condition is not met.
// Otherwise it sets the server state to active.
func (srv *Server) start() error {
    srv.state.mu.Lock()
    defer srv.state.mu.Unlock()
    switch srv.state.value {
    case srvStateActive:
        return fmt.Errorf("asynq: the server is already running")
    case srvStateStopped:
        return fmt.Errorf("asynq: the server is in the stopped state. Waiting for shutdown.")
    case srvStateClosed:
        return ErrServerClosed
    }
    srv.state.value = srvStateActive
    return nil
}

func (srv *Server) AddHandler(handler *ServeMux) {
    srv.handler = handler
}

func (srv *Server) AddQueueHandler(queue string, handler func(ctx context.Context, t *Task) error) {
    srv.handler.HandleFunc(queue, handler)
}

func (srv *Server) AddQueues(queues map[string]int) {
    srv.queues = queues
    for queue := range srv.queues {
        srv.forwarder.queues = append(srv.forwarder.queues, queue)
        srv.recoverer.queues = append(srv.recoverer.queues, queue)
    }

    srv.heartbeater.queues = srv.queues
    srv.processor.queueConfig = srv.queues
    ques, orderedQueues := prepareQueues(srv.processor.queueConfig, srv.strictPriority)
    srv.processor.queueConfig = ques
    srv.processor.orderedQueues = orderedQueues
}

func (srv *Server) AddQueue(queue string, prio ...int) {
    priority := 0
    if len(prio) > 0 {
        priority = prio[0]
    }
    srv.queues[queue] = priority
    srv.heartbeater.queues = srv.queues
    srv.forwarder.queues = append(srv.forwarder.queues, queue)
    srv.processor.queueConfig[queue] = priority
    queues, orderedQueues := prepareQueues(srv.processor.queueConfig, srv.strictPriority)
    srv.processor.queueConfig = queues
    srv.processor.orderedQueues = orderedQueues
    srv.recoverer.queues = append(srv.recoverer.queues, queue)
}

func (srv *Server) RemoveQueue(queue string) {
    var qName []string
    delete(srv.queues, queue)
    for queue := range srv.queues {
        qName = append(qName, queue)
    }
    srv.heartbeater.queues = srv.queues
    srv.forwarder.queues = qName
    srv.processor.queueConfig = srv.queues
    queues, orderedQueues := prepareQueues(srv.processor.queueConfig, srv.strictPriority)
    srv.processor.queueConfig = queues
    srv.processor.orderedQueues = orderedQueues
    srv.recoverer.queues = qName
}

func (srv *Server) HasQueue(queueName string) bool {
    for _, que := range srv.forwarder.queues {
        if que != queueName {
            return true
        }
    }
    return false
}

func (srv *Server) Tune(concurrency int) {
    srv.concurrency = concurrency
    srv.heartbeater.concurrency = concurrency
    srv.processor.sema = make(chan struct{}, concurrency)
}

func (srv *Server) IsRunning() bool {
    return srv.state.value == srvStateActive
}

func (srv *Server) IsStopped() bool {
    return srv.state.value == srvStateStopped
}

func (srv *Server) IsClosed() bool {
    return srv.state.value == srvStateClosed
}
realmicro commented 2 years ago

@sujit-baniya Can you list the completed code? Thanks

sujit-baniya commented 2 years ago

@realmicro The full code for server.go

https://gist.github.com/sujit-baniya/4d7145c1adb2f97d80955fb31858716e

ghstahl commented 2 years ago

Another approach, related to the following https://github.com/hibiken/asynq/issues/439, is to dynamically shutdown the whole server and restart it. The theory is that the server must be in a certain state and the simplest thing for me to do was shut it down and bring it back up with the new settings.

You have to manage the go routine that the asynq server runs in, but that is not that much code.

A scenario with a single app hosting 2 asynq servers: A first go routine hosts an entire asynq server with all its settings (queues from config 1, what handlers from config 1) A second go routine hosts an entire asynq server with all its settings (queues from config 2, what handlers from config 2)

Again, it was much simpler to just shut the thing down and restart a given go routine with new settings.

archit-harness commented 2 years ago

@sujit-baniya @hibiken were we able to solve this, that we can request dequeue from any given queue name?

sujit-baniya commented 2 years ago

@archit-harness I've been using https://gist.github.com/sujit-baniya/4d7145c1adb2f97d80955fb31858716e to solve this. It's working fine as per my need. @hibiken might consider working on it later as per his priority

archit-harness commented 2 years ago

Thanks @sujit-baniya for such quick response, i see there are many differences in new server.go file and your file which you have shared, but I think the main difference is added new functions above right? https://github.com/hibiken/asynq/issues/250#issuecomment-1088649898

So basically now our workers can add listening to queues and remove it.

Few Questions -

  1. Did you see any performance issues with it?
  2. How many queues are you trying with it? The reason is my use-case is to create queues for each user which will be max 50K, Do you see any issues with it?

For first point, what we tried to benchmark is, suppose we create 1k queues having 10 items each and our server has 1k queues to consumer from vs all items in single queue and consumer reading from it, the latter has much better performance.

archit-harness commented 2 years ago

But for our use-case we cannot club everything into single queue, as we want to allow the worker to run only X number of tasks for each user.

archit-harness commented 2 years ago

@sujit-baniya @hibiken did you get chance to look at my question?

sujit-baniya commented 2 years ago

@archit-harness I've not done any benchmarks for my use-case. But I'm using this changes on asynq in on of my products: https://oarkflow.com and working well so far. Multiple Servers and Queues are created on demand and destroyed as required.

but I think the main difference is added new functions above right? 1) Added new functions 2) Change in API as well for Server to handle the "Handler"

gebv commented 1 year ago

@hibiken Hi! We need functionality related to dynamic queues with a quality of service (qos) of 1 for each queue. Is there a branch where this work is being carried out? I would like to join in the implementation or contribute in any other way to this feature.