adjust / rmq

Message queue system written in Go and backed by Redis
MIT License
1.57k stars 206 forks source link

5 Add stopper channel to stop consuming on a consumer #7

Closed wellle closed 4 years ago

wellle commented 9 years ago

Inspired by #5

@ympons: I took the mechanics from your fork, but implemented it slightly differently:

Instead of adding a new interface, I just added a return value stopper chan bool to the Queue.AddConsumer function. At any later time you can then stop consuming on this consumer by sending a value to the stopper channel:

name, stopper := queue.AddConsumer("task consumer", taskConsumer)
stopper <- 1 // now taskConsumer.Consume() won't be called anymore

Please let me know if this works for you!

ympons commented 9 years ago

@wellle Thanks so much for working on this!

This works great in some of my scenarios. By the way, we're using the Connection.CollectStats to receive rmq.Stats, so it's great if we remove the consumer from the bookkeeping data structure once we stop consuming on this consumer. Probably:

func (queue *redisQueue) consumerConsume(consumer Consumer, name string) {
    defer queue.RemoveConsumer(name)
    ...
}

I have another case where the consumer must consume only one delivery and stop. I implemented this case with adding a method in the consumer consumer.ConsumeOneTime():

func (c *Consumer) ConsumeOneTime() bool {
    return true // true: if you want to consume only one delivery
}

I use it into the queue.consumerConsume method like this

Maybe a way to integrate this case without changing the API is through a ConsumerOneTime interface. What do you think about it?

Thanks again!

wellle commented 9 years ago

@ympons: Yeah I saw the one time code. What is it for? Why do you have a consumer to only consume a single delivery?

ympons commented 9 years ago

@wellle In our solution we have some map/tiles generators prepared to receive messages from our microservice and this one creates or reuses the necessary consumers to approach this task. Some of these consumers belong to a special queue where they must consume only one delivery and stop.

wellle commented 9 years ago

Interesting. We could add a function like AddLimitedConsumer:

queue.AddLimitedConsumer("task consumer", taskConsumer, limit)

And it would only consume limit deliveries.

Also good point on defer queue.RemoveConsumer(name) :+1:

ympons commented 9 years ago

@wellle Yeah I like the idea!

kuangchanglang commented 7 years ago

Seems no one is maintaining this project.

wellle commented 4 years ago

I believe this has been done in #33.