Open kc41 opened 4 years ago
Thanks for the question.
I'm not sure if the exception raising is good for this case: it just means that every q.put()
should be wrapped in try/except
because literally every call may raise RuntimeError
.
It looks very annoying.
What about notifying q._sync_not_full
in q.close()
?
We may get RuntimeError
for closing as usual if q._check_closing()
is called after _sync_not_full.wait()
in sync_q.put()
.
Fixed by #267
@asvetlov How could #267 fix this?
Ooops. Sorry, you are right. Hard day for me.
Please feel free to propose a pull request.
I also found a related problem.
def threaded(sync_q):
print("before")
sync_q.put(1)
sync_q.put(2)
print("after")
queue = janus.Queue(1)
Thread(target=threaded, args=[queue.sync_q]).start()
for _ in range(min(32, (os.cpu_count() or 1) + 4)):
loop.run_in_executor(None, time.sleep, 1)
await queue.async_q.get()
queue.close()
In this example, we fill the default executor with the maximum number of callbacks, which causes get()
to not notify the thread immediately - quite a common state for a highly loaded application. The close()
call cancels the scheduled notification, causing the thread's second put()
to never complete - "after" will never be printed. However, if we remove the queue closing, everything is fine.
This behavior is caused by the _notify_sync_not_full()
change in b77ca59. Meanwhile _notify_sync_not_empty()
has no such semantics (why?). Either both methods should add futures to _pending
, or both shouldn't, because otherwise this distinction doesn't make sense.
Hi! I found some potentially unexpected behaviour of queue closing. If thread
producer
blocks on attempt to sync put to queue and we close queue in anothercontrol
thread, threadproducer
will wait forever. I suppose that expected behaviour should be aRuntimeError
in syncput()
method on queue closing. What do you think about it?Here is a code to reproduce this situation: