RichardKnop / machinery

Machinery is an asynchronous task queue/job queue based on distributed message passing.
Mozilla Public License 2.0
7.51k stars 915 forks source link

Keep reloading configuration doesn't work with v1.4.1 #309

Open RainbowMango opened 6 years ago

RainbowMango commented 6 years ago

@RichardKnop After worker launched, then i start a go routine to send task every 5 seconds. I set broker with Redis sentinel addr, that doesn't work. So, I tried to update broker's URL, E.g "redis://x.x.x.x:6379", but new configuration cannot take effect.

The error log like "Set state pending error: ERR unknown command 'SET'."

I have fast go through the code, the config.NewFromYaml() indeed reload the configuration(cnf = newCnf). But the new configuration not be active by broker.

RainbowMango commented 6 years ago

@RichardKnop

RainbowMango commented 6 years ago

Someone asked a similar question. https://github.com/RichardKnop/machinery/issues/200

I checked the code carefully. Update Redis broker's URL can't take affect automatically. When worker need connect redis, just get one from pool. broker.pool will not rebuild even all connection become invalid in the pool.

// open returns or creates instance of Redis connection
func (b *Broker) open() redis.Conn {
    if b.pool == nil {
        b.pool = b.NewPool(b.socketPath, b.host, b.password, b.db, b.GetConfig().Redis)
    }
    if b.redsync == nil {
        var pools = []redsync.Pool{b.pool}
        b.redsync = redsync.New(pools)
    }
    return b.pool.Get()
}

@RichardKnop Can you confirm about this?

So, can we set broker.pool with nil when we found connect failed with redis and reload conf?

RainbowMango commented 6 years ago

Bad news. After Redis failover from master to slave the connection cannot be re-build. I get the following errors:"Set state pending error: READONLY You can't write against a read only slave."

I used https://github.com/go-redis/redis that can switch connection to new master.

RainbowMango commented 6 years ago

I need to update broker's URL runtime, I tried following steps:

  1. worker.Quit()
  2. Update server's config server.config.Broker = newURL; as well as server.config.ResultBackend = newURL.
  3. server.SetBroker(broker) //the broker comes from BrokerFactory(server.GetConfig())
  4. server.SetBackend(backend) //the backend comes from BrokerFactory(server.GetConfig())
  5. server.GetBroker().SetRegisteredTaskNames(server.GetRegisteredTaskNames())

I really works...but, i wonder if there is a easy way for doing this? @RichardKnop

RainbowMango commented 6 years ago

@RichardKnop I'm trying to fix this issue. And i think the 'host'、'password'、'db' should be removed. Since the three info could get from broker.cnf, seems it breaks some design patterns. Even though configuration changed outside, the there info never be updated.

` // Broker represents a Redis broker type Broker struct { common.Broker common.RedisConnector host string password string db int pool redis.Pool stopReceivingChan chan int stopDelayedChan chan int processingWG sync.WaitGroup // use wait group to make sure task processing completes on interrupt signal receivingWG sync.WaitGroup delayedWG sync.WaitGroup // If set, path to a socket file overrides hostname socketPath string redsync redsync.Redsync }

// New creates new Broker instance func New(cnf *config.Config, host, password, socketPath string, db int) iface.Broker { b := &Broker{Broker: common.NewBroker(cnf)} b.host = host b.db = db b.password = password b.socketPath = socketPath

return b

} `

eldad87 commented 5 years ago

@RainbowMango, is there any updated with your issue?

RainbowMango commented 5 years ago

@eldad87 What's your problem? For my project, I restarted worker for workaround.