adjust / rmq

Message queue system written in Go and backed by Redis
MIT License
1.57k stars 206 forks source link

Adding locks to make stopping queues and connections atomic #105

Closed fmstephe closed 3 years ago

fmstephe commented 3 years ago

Because the connection has a heartbeat checker running in a background goroutine, the method connection.StopAllConsuming() may be called concurrently at any time.

It is possible for some methods to write data to redis relating to connections/queues which have already shut down.

These methods are connection.OpenQueue(), queue.StartConsuming() and queue.AddConsumer()

For this we add some concurrency controls to ensure that once a connection has stopped all consuming these methods will not write data to redis but return an ErrorConsumingStopped error instead.

In the redisConnection implementation we use a bool to inidcate that consuming has stopped and a mutex to make the state change atomic. This does mean that calls to OpenQueue() will take the lock for the duration of a network call to Redis. This means that calls to StopAllConsuming() can be delayed for as long as it takes these network calls to complete.

The redisQueue implementation uses the consumingStopped channel to indicate whether the queue has been stopped. We also use the nil status of deliveryChan to determine whether consuming has started. Both of these state changes are protected by a single mutex. Again the methods queue.StartConsuming() and queue.AddConsumer() will hold the mutex for the duration of a network call to redis, which can delay the shutdown process.

fmstephe commented 3 years ago

@psampaz there are a number of additional commits to clean up the tests. Please have another look.

fmstephe commented 3 years ago

@psampaz the unit tests should work reliably now, even with a slower redis connection. Please take another look.

psampaz commented 3 years ago

confirmed