chilts / mongodb-queue

Message queues which uses MongoDB.
209 stars 92 forks source link

get() method doesn't follow the FIFO #10

Closed cresttechhead closed 7 years ago

cresttechhead commented 8 years ago

Hi,

While going through the documentation, I added add and get methods in different setTimeInterval functions to check if the get method is following FIFO.

I found that get() method does not strictly follow the FIFO. Can you please shed some light on this issue?

Thanks, CK

chilts commented 8 years ago

Hi CK,

Can you show me your example code and why you don't think it follows the FIFO. In general, get() should return the oldest message, but in practice it might not always do this - for example an earlier failed message will be returned to the queue later.

Thanks, Andy

cresttechhead commented 8 years ago

Thanks Andy for your response.

Please find below the example: ==================== Add items to queue start here ============= setInterval(function(){ queue.add('Testing', function(err, id) { console.log("item with id = "+id+" inserted in queue for item type = "+itemType ); }) ,1000); ==================== Add items to queue end here =============

====function to get and ack the message from queue ===== function dequeItem () {

queue.get(function(err, msg) {
    if (err) console.log ("error occurred");
    else {
        queue.ack(msg.ack, function(err, id) {
            // this message has now been removed from the queue 
            console.log("msg removed with id ="+id);
        });
    }
});

}

===== code to call the dequeItem ====== setInterval(function(){dequeItem()},2000); setInterval(function(){dequeItem()},2000); setInterval(function(){dequeItem()},2000);

I looked at the mongo db queue and found that after fetching 10-15 docs in sequence there are 3-4 docs that are not fetched by get() and moreover these docs are not failed as the 'tries' and 'visible' fields are not set on these docs.

Please let me know if you need more information.

Thanks, CK

chilts commented 8 years ago

Interesting. Thanks for that. I can take a look tomorrow. :)

cresttechhead commented 8 years ago

That would be of great help Andy :)

cresttechhead commented 8 years ago

Hi Andy,

Were you able to look into this issue?

Thanks, CK

cresttechhead commented 8 years ago

Any updates on this @chilts ?

chilts commented 8 years ago

Sorry @cresttechhead .. I haven't been able to look yet. Am currently involved with a lot of non-geek life at the moment which is sucking my time (but also has priority). I'll try and look as soon as I can, but I can't promise anything. Apologies.

acSpock commented 7 years ago

I'm running into the same issue - For instance, the first two items on the queue get acked after the first time I do a get and processes both. When in reality it should be doing them one at a time at every interval

chilts commented 7 years ago

The example program above isn't complete, so I've written a complete one to test what you're seeing. The complete program is here:

var mongodb = require('mongodb')
var mongoDbQueue = require('.')

var con = 'mongodb://localhost:27017/test'

mongodb.MongoClient.connect(con, function(err, db) {
  var queue = mongoDbQueue(db, 'my-queue')

  console.log("connected")

  // --- enqueue ---
  function enqueue() {
    queue.add('Testing', function(err, id) {
      console.log("added=" + id)
    })
  }

  // --- dequeue ---
  function dequeItem() {
    queue.get(function(err, msg) {
      if (err) {
        console.log ("error occurred")
        return
      }

      if ( !msg ) {
        console.log ("no msg")
        return
      }

      console.log('msg=' + msg.id)
      queue.ack(msg.ack, function(err, id) {
        if ( err ) {
          console.log('err:', err)
          return
        }
        console.log('acked=' + id)
      })
    })
  }

  // --- enqueue ---
  setInterval(enqueue, 1000)

  // keep checking for messages
  setInterval(dequeItem, 1100)
  setInterval(dequeItem, 1300)
  setInterval(dequeItem, 1700)
})

This doesn't show the same problems as you have found. Are any of you able to change the above program to show the same problems you're seeing?

Here's some example output:

connected
added=58f14dc48a34905c17571dc4
msg=58f14d8951029648172fa612
acked=58f14d8951029648172fa612
msg=58f14dc48a34905c17571dc4
acked=58f14dc48a34905c17571dc4
no msg
added=58f14dc58a34905c17571dc5
msg=58f14dc58a34905c17571dc5
acked=58f14dc58a34905c17571dc5
no msg
added=58f14dc68a34905c17571dc6
msg=58f14dc68a34905c17571dc6
acked=58f14dc68a34905c17571dc6
no msg
no msg
added=58f14dc78a34905c17571dc7
msg=58f14dc78a34905c17571dc7
acked=58f14dc78a34905c17571dc7
...etc...

Once this has been running for quite a while and assuming I press C-c after a message has been dequeued, there are no items left on the queue.

> db.getCollection('test').count()
0

I'm going to close this issue but please re-open with either an adjustment to my program above, or a complete program which replicates the issue found. Thanks.

chilts commented 7 years ago

Further, if I use the same program but instead add 10 messages per second, I can still see the same order of messages coming back, which I think applies more to the original issue:

$ node issue-10.js 
connected
added=58f14fb8191c24cd185496e9
added=58f14fb9191c24cd185496ea
added=58f14fb9191c24cd185496eb
added=58f14fb9191c24cd185496ec
added=58f14fb9191c24cd185496ed
added=58f14fb9191c24cd185496ee
added=58f14fb9191c24cd185496ef
added=58f14fb9191c24cd185496f0
added=58f14fb9191c24cd185496f1
added=58f14fb9191c24cd185496f2
msg=58f14fb8191c24cd185496e9
acked=58f14fb8191c24cd185496e9
added=58f14fb9191c24cd185496f3
added=58f14fba191c24cd185496f4
msg=58f14fb9191c24cd185496ea
acked=58f14fb9191c24cd185496ea
added=58f14fba191c24cd185496f5
added=58f14fba191c24cd185496f6
added=58f14fba191c24cd185496f7
added=58f14fba191c24cd185496f8
msg=58f14fb9191c24cd185496eb
acked=58f14fb9191c24cd185496eb
added=58f14fba191c24cd185496f9
added=58f14fba191c24cd185496fa
added=58f14fba191c24cd185496fb
added=58f14fba191c24cd185496fc
added=58f14fba191c24cd185496fd
msg=58f14fb9191c24cd185496ec
acked=58f14fb9191c24cd185496ec
added=58f14fbb191c24cd185496fe
added=58f14fbb191c24cd185496ff
added=58f14fbb191c24cd18549700
added=58f14fbb191c24cd18549701
msg=58f14fb9191c24cd185496ed
acked=58f14fb9191c24cd185496ed
added=58f14fbb191c24cd18549702
added=58f14fbb191c24cd18549703
added=58f14fbb191c24cd18549704
added=58f14fbb191c24cd18549705
added=58f14fbb191c24cd18549706
added=58f14fbb191c24cd18549707
added=58f14fbc191c24cd18549708
msg=58f14fb9191c24cd185496ee
acked=58f14fb9191c24cd185496ee
added=58f14fbc191c24cd18549709
msg=58f14fb9191c24cd185496ef
acked=58f14fb9191c24cd185496ef
added=58f14fbc191c24cd1854970a
added=58f14fbc191c24cd1854970b
added=58f14fbc191c24cd1854970c
added=58f14fbc191c24cd1854970d
added=58f14fbc191c24cd1854970e
msg=58f14fb9191c24cd185496f0
acked=58f14fb9191c24cd185496f0
added=58f14fbc191c24cd1854970f
added=58f14fbc191c24cd18549710
added=58f14fbc191c24cd18549711
added=58f14fbd191c24cd18549712
added=58f14fbd191c24cd18549713
msg=58f14fb9191c24cd185496f1
acked=58f14fb9191c24cd185496f1
added=58f14fbd191c24cd18549714
added=58f14fbd191c24cd18549715
added=58f14fbd191c24cd18549716
added=58f14fbd191c24cd18549717
added=58f14fbd191c24cd18549718
added=58f14fbd191c24cd18549719
added=58f14fbd191c24cd1854971a
msg=58f14fb9191c24cd185496f2
acked=58f14fb9191c24cd185496f2
added=58f14fbe191c24cd1854971b
msg=58f14fb9191c24cd185496f3
acked=58f14fb9191c24cd185496f3
added=58f14fbe191c24cd1854971c
added=58f14fbe191c24cd1854971d
added=58f14fbe191c24cd1854971e
msg=58f14fba191c24cd185496f4
acked=58f14fba191c24cd185496f4
added=58f14fbe191c24cd1854971f
added=58f14fbe191c24cd18549720
added=58f14fbe191c24cd18549721
added=58f14fbe191c24cd18549722
added=58f14fbe191c24cd18549723
added=58f14fbe191c24cd18549724
added=58f14fbf191c24cd18549725