alex-petrenko / faster-fifo

Faster alternative to Python's multiprocessing.Queue (IPC FIFO queue)
MIT License
179 stars 29 forks source link

Put never completes when a list of mongodb documents are put in the queue #21

Closed barathbheeman closed 3 years ago

barathbheeman commented 3 years ago

I have a piece of code that gets a bunch of mongodb docs using pymongo and appends them into a list so that I can put them in to the Queue as a 'batch'. when i call queue.put(batch), it stalls indefinitely. But if I put the individual docs (which are basically dictionaries), then it works fine.

from pymongo import MongoClient
import faster_fifo

q = faster_fifo.Queue()
c = MongoClient()
cursor = c['db']['collection'].find({})

batch = [ ]
for doc in cursor:
   # if I put q.put(doc) here it works

    batch.append(doc)
   # put a list of 100 docs
    if len(batch) == 100:
         print('--- putting ---')
         q.put(batch)  # this line never completes
         print('---- put done -----')  # code never gets here

The above code works fine with the mp.Queue() and mp.Manager().Queue(). I wanted to use the library get some performance out of it.

OS: Ubuntu 18.04 docker image python: 3.6.9

alex-petrenko commented 3 years ago

Hi, sorry for the delay.

This queue implementation is based on the circular buffer data structure. By the nature of its implementation the size of the buffer should be known when the queue is created. You don't specify the size in the constructor so the default size is being used, which is 200 kilobytes. This is sufficient for most applications, but in your case you keep adding stuff into the queue without reading from it, so sooner or later the buffer will be full.

The solution is to increase the circular buffer size in the ctor to make sure the maximum number of messages you want to store in the queue will fit.

You can also use non-blocking q.put_nowait() which will raise an exception whenever the queue is full. Or you can use put() with the timeout, such as q.put(obj, timeout=0.1).

I think it makes sense to alter the default max size and timeouts slightly to make default settings useful in more contexts. I will do this in the next update.

github-actions[bot] commented 3 years ago

This issue is stale because it has been open for 30 days with no activity.

github-actions[bot] commented 3 years ago

This issue was closed because it has been inactive for 14 days since being marked as stale.