Closed travnick closed 4 months ago
Yes you're right, let me look into it!
Looks like fixed, but not in every case:
evt = self._thread_helper.event()
self._write_queue.put((PAUSE, evt))
evt.wait()
Additionally, in my opinion, the whole write_wueue
should be aborted on Writer.run()
exit (and of course do not allow putting any other tasks in the queue in such state). Without that:
AsyncCursor
returned by execute_sql
will wait forever (AsyncCursor._wait()
)You’re correct. Let me look into it, it may be that we just get rid of pause and unpause.
removing pause/unpause does not fix the other "waiting forever" issue and executing outdated queries - stop()
feels like: stop execution, and abort any scheduled task (query here).
I think this may address the issue, thanks for the suggestion about aborting everything. Now when the queue is stopped, we'll drain the queue and unblock any waiters (queries will get a ShutdownException, pause/unpause will just unblock). 519a84e57aa9ff994ddbf1b605b700426d8b3d79
As far as I can see, there are more issues left when Writer
thread is stopped:
_write_queue
access - in the meaning that it's not coupled with Writer
running/stopped state, so putting there anything after Writer
thread is stopped leads to thread starvation on waiting for AsyncCursor
result,unpause
or pause
when Writer thread is stopped leads to calling thread starvation.In my opinion, write_queue
and _is_stopped
belongs to Writer
state itself - managing them outside the Writer
breaks single responsibility principle and is quite error-prone because introduces multiple places to manage them, instead of having only one inside the Writer
.
Sorry for not providing all review results at once, but it's not easy to analyse each case without knowing the project internals and with threads on board.
Thank you I’ll take a look. I’m grateful for the feedback and appreciate you taking the time. This particular module was just thrown together, and it clearly had some issues.
documentation says that:
But these methods do not block:
Queue.put only waits for the space in queue. So this will block only if the queue is full.
I guess Condition Object or Event will do the job here.