Closed cleomart closed 1 year ago
Interesting idea, never thought about something like this. There's a feature in faust
called app.crash
, I don't generally recommend it, but it should be interesting for your use-case. I just tested this without issues:
import faust
app = faust.App("my-app",
broker="localhost:9092",
broker_heartbeat_interval=30,
broker_session_timeout=360,
broker_request_timeout=360,
broker_max_poll_records=1,
topic_disable_leader=True,
broker_max_poll_interval=3600)
topic = app.topic("test-stop", value_serializer="raw")
@app.agent(topic)
async def process_messages(stream):
async for message in stream.items():
await app.crash(reason=Exception("Shutdown signal received on stop topic"))
if __name__ == "__main__":
app.main()
There's probably a more graceful way of shutting things down, but calling app.crash
guarantees a shutdown.
@wbarnha Can you please elaborate on why you do not recommend using app.crash? Any potential pitfalls when using app.crash method?
After running a few tests, for your use case, it seems harmless. My main concern was a consumer service gets stopped or killed too quickly and the Kafka broker wouldn't be able to gracefully coordinate the consumer exiting. But it seems that's not true at all, so app.crash()
should work fine in your case.
import faust
app = faust.App("my-app",
broker="localhost:9092",
broker_heartbeat_interval=30,
broker_session_timeout=360,
broker_request_timeout=360,
broker_max_poll_records=1,
topic_disable_leader=True,
broker_max_poll_interval=3600)
topic = app.topic("test-stop", value_serializer="raw")
@app.agent(topic)
async def process_messages(stream):
async for message in stream.items():
break
await app.stop()
app.loop.stop()
if __name__ == "__main__":
app.main()
Gave it another crack at using app.stop()
so we can assume all services get gracefully terminated.
import faust app = faust.App("my-app", broker="localhost:9092", broker_heartbeat_interval=30, broker_session_timeout=360, broker_request_timeout=360, broker_max_poll_records=1, topic_disable_leader=True, broker_max_poll_interval=3600) topic = app.topic("test-stop", value_serializer="raw") @app.agent(topic) async def process_messages(stream): async for message in stream.items(): break await app.stop() app.loop.stop() if __name__ == "__main__": app.main()
Gave it another crack at using
app.stop()
so we can assume all services get gracefully terminated.
When I tried this , I am getting another error:
`[2023-05-18 19:16:15,650] [7033] [INFO] [^--CacheBackend]: Stopping...
[2023-05-18 19:16:15,652] [7033] [ERROR] [^Worker]: Error: RuntimeError('Event loop stopped before Future completed.',) Traceback (most recent call last): File "/opt/conda/envs/index_launcher/lib/python3.6/site-packages/mode/worker.py", line 273, in execute_from_commandline self.loop.run_until_complete(self._starting_fut) File "/opt/conda/envs/index_launcher/lib/python3.6/asyncio/base_events.py", line 482, in run_until_complete raise RuntimeError('Event loop stopped before Future completed.') RuntimeError: Event loop stopped before Future completed.
[2023-05-18 19:16:15,654] [7033] [INFO] [^Worker]: Gathering service tasks... [2023-05-18 19:16:15,654] [7033] [INFO] [^Worker]: Gathering all futures... [2023-05-18 19:16:17,662] [7033] [INFO] [^Worker]: Closing event loop ` and the script python returns an exit code of 1. Is this the expected behavior? If the app was shutdown gracefully, I expect no errors, and the exit code would be 0.
Checklist
master
branch of Faust.Steps to reproduce
I would like to create a on-demand kafka streaming job using faust that will get terminated once a certain message is received. Here is my code. For simplicity, I terminate the faust when a message is received.
Expected behavior
I expect the faust app to be terminated gracefully and the python script would exit and the job will be completed.
Actual behavior and Full traceback
When the faust app is stopped, I get this error and the job is stuck in this infinite loop.
Versions