Closed hassanselim0 closed 3 months ago
Each Event should take self._loop and not self.loop as it used to do in older versions (like 0.2.x)
Looks like the changes were made in https://github.com/faust-streaming/mode/commit/96d4f72dab3ea3035d91c19db8f8fe9ee5d9d8e4 and originally used asyncio.Event
objects until they got their own dedicated class. The change you're talking about was made by me in https://github.com/faust-streaming/mode/commit/cd40e6f23523be71fffee8ebf9ae9082e6ed5a4e to address https://github.com/faust-streaming/mode/issues/38 so that thread loops would shut down properly. It's been a while so I'll need to test your changes further.
loop.run_in_executor exists for similar reasons since there should only be one event loop per process, otherwise things get messy. I haven't used Faust with Django before and it's a bit in deprecated territory in my opinion. I've been wanting to move the project more towards FastAPI since it properly integrates with asynchronous Python patterns where Django creates more potentially blocking code that could mess up event loop timing.
By the way, do you only need these threads for creating Kafka topics? Because if so, I recommend using aiokafka
to create the topics manually. Even though it's not officially supported yet, it has worked fine for me so far.
Sorry for the late reply, I slept and took a long holiday after writing this issue and related PR. I'm happy that the fix has been merged.
I haven't used Faust with Django before and it's a bit in deprecated territory in my opinion. I've been wanting to move the project more towards FastAPI since it properly integrates with asynchronous Python patterns where Django creates more potentially blocking code that could mess up event loop timing.
Yeah I've been stumbling onto hard-to-reproduce issues with consumers getting stuck and I believe it has to do with django blocking code (despite all db access being correctly wrapped in sync_to_async
which I believe places blocking code onto a thread pool, which should be "fine"). So yeah, leaving away django is something I hope to do in future projects, but it's such a big headache to rebuild all the niceties I got used to into the FastAPI world.
Checklist
master
branch of Mode.Steps to reproduce
--workers=1 --threads=2
)consumer = faust.App(…)
Expected behavior
No exceptions happen
Actual behavior
A Runtime Error is raised, no current event loop in thread. The problem basically lies here: https://github.com/faust-streaming/mode/blob/0.4.0/mode/services.py#L599 (and the three other similar methods). Each
Event
should takeself._loop
and notself.loop
as it used to do in older versions (like0.2.x
), because creating a Faust App doesn't necessarily mean I'll be running a consumer in this process (in my case I'm using it to create topics for the django app to use), so there might not be an event loop on the current thread, and in that caseself._loop
will beNone
(because I'm not passing a loop for the faust app constructor, and that arg gets passed all the way down to thatEvent
) and that's "fine" because I'm not going to run the mode service or await on it or anything (and if I do, then theEvent
has aloop
property that will rightfully try andget_event_loop
, so it will still work).Full traceback
Versions