adjust / rmq

Message queue system written in Go and backed by Redis
MIT License
1.57k stars 206 forks source link

Question on Queue prefetchLimit #85

Closed nehaboob closed 4 years ago

nehaboob commented 4 years ago

I have a question on the implementation of prefetchLimit. Suppose there are enough ready deliveries then -

  1. queue/connection consumes in batches of prefetchLimit and waits for the consumer to consume the batch and then pulls the next batch from the ready delivery ?
  2. or queue/connection continuously fills delivery channel as soon as there is a slot available ?

It seems that prefetchLimit is implemented as 2. So even in case of prefetchLimit 1, connection has 2 unAck deliveries, one in the process of consumption and one in the delivery chain. Is there any workaround this, I want to use rmq for long running CPU tasks(varied length) and only want to pull from ready queue when one delivery consumption has finished. Support for batch prefetchLimit may be ?

wellle commented 4 years ago

@nehaboob: You're right, it's implemented as 2.. So once you start consuming the first delivery the next one will be unacked and put to deliveryChan, waiting to be consumed. There are currently no plans to support your use case and it also doesn't seem easy to implement AFAICT, because it seems like this would need a dependency between the prefetch logic and the actual consumers, and that doesn't seem nice to have.

What's the problem with prefetching the second delivery early?

nehaboob commented 4 years ago

@wellle thanks for getting back. I am trying to use rmq for long running cpu tasks and a bunch of those should finish before I move to next step in processing. For eg. there are 5 tasks taking time 30, 10, 2, 2, 2 to finish and I add 2 consumers so tasks are queued as [30, 10] [2,2,2] which is not very efficient, I want to fetch delivery only when consumer is done, support for prefetchlimit 0 may be ?

wellle commented 4 years ago

Understood. The problem with prefetch limit 0 is that then nothing would get fetched at all, at least in the current implementation. And making it work as you describe doesn't seem straight forward. I guess we could consider trying to keep track of how many deliveries are currently in the process of being consumed, but I probably won't have time to look into that in the near future.

nehaboob commented 4 years ago

@wellle Do you think if we change the batchSize function as below it will work for my use case ?

func (queue *redisQueue) batchSize() int {
    prefetchLimit := queue.prefetchLimit  - queue.UnackedCount()

    if readyCount := queue.ReadyCount(); readyCount < prefetchLimit {
        return readyCount
    }
    return prefetchLimit
}
wellle commented 4 years ago

Hmm, I guess that might work. Would you be willing to give it a try?

nehaboob commented 4 years ago

Sure, I will check and report back :)

nehaboob commented 4 years ago

@wellle I changed batchSize() function and it works fine for my use case. Should I create a pull request for this change or keep on using my local fork ?

wellle commented 4 years ago

Yes please open a PR 👍

nehaboob commented 4 years ago

@wellle Done, Thanks :)

nehaboob commented 4 years ago

@wellle I am also getting this new error since yesterday when gettting the package

go get github.com/adjust/rmq
package github.com/go-redis/redis/v7: cannot find package "github.com/go-redis/redis/v7" in any of:
    /usr/local/go/src/github.com/go-redis/redis/v7 (from $GOROOT)
    /go/src/github.com/go-redis/redis/v7 (from $GOPATH)
narqo commented 4 years ago

Could you share what version of Go do you use? How can we reproduce it — this looks like a part of a docker's build, could share the details?

nehaboob commented 4 years ago

@narqo Please ignore this. I was using Docker image golang:1.14.2-alpine3.11 but error is not coming now. Must have been some interim issue.