airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.2k stars 106 forks source link

Feature: subscribers should be resilient to segmentation faults #1059

Closed lucsorel closed 1 week ago

lucsorel commented 7 months ago

Thank you for FastStream, I really enjoy the use of pydantic here :smiley:

Is your feature request related to a problem? Please describe.

Segmentation faults can happen in the process of handling a message, when involving a library causing the segmentation fault (I don't think it is possible to cause a segmentation fault with native Python code).

When a segmentation fault occurs in the FastStream application which consumes the message, the application stops and is not restarted (that is for faststream[rabbit]==0.3.6; for 0.2.5, the application was hanging defunct). The message is not processed nor redirected to a dead letter queue, for example (in the case of a RabbitMQ cluster).

Describe the solution you'd like

I suggest that the message causing the segmentation fault does not stop the application, which would react the same way as if the message had raised an error/exception: the message is rejected and the subscriber keeps on consuming the next message.

Feature code example I published this project to demonstrate how a segmentation fault stops the subscriber application: https://github.com/lucsorel/sigseg-faststream.

Describe alternatives you've considered

Being resilient to segmentation faults might involve handling each message in a sub-process for the main process to be resilient to segmentation faults.

davorrunje commented 7 months ago

@lucsorel thanx for the kind words :)

We cannot stop the application from crashing in case of a segmentation fault, but we could restart the worker when it happens. You could also run any code that can cause segmentation faults in an isolated process. That way you would not crash the FastStream worker and would get better control on handling this kind of errors.

@Lancetnik what do you think?

Lancetnik commented 7 months ago

Thank you @lucsorel for the Issue and such detail problem representation!

But I am confusing a bit with such request: seems like creates a thread to proccess any message is an overkill to protect from segfault. Also, in all my time working with python (6 years), I have never encountered such an error in production (only with pytests). Also, I don't know about any other frameworks behavior in such cases (FastAPI, Django, etc).

Probably, adding healthcheck to application to make it fault-tolerant is a much Python-way decision

lucsorel commented 7 months ago

Thank you @davorrunje and @Lancetnik for your answers.

I agree that segmentation faults occur rarely in Python. But they can happen when using Python wrappers around compiled code.

A healthcheck or any form of restart would help restarting the application, but it would not discard the message as when an exception is raised while handling it. Therefore, the application could keep on crashing on the same message again and again, as the application keeps on restarting. Do you see what I mean?

Being resilient to segmentation faults would indeed imply that the message is handled in a sub-process. But each message does not need the creation of its own process, processes can be pooled and reused for the sake of performance. I am just elaborating on the idea, I don't want to be perceived as being pushy :smiley:

In the meantime, I agree that the quick solution is to handle the process and fault management myself within FastStream.

davorrunje commented 7 months ago

Isolating every message processing in an isolated process would introduce a performance hit due to interprocess communication costs. That would be unjustified for most users IMO.

We should monitor works and restart them if they crash, that clearly is an improvement that has no downside in terms of performance.

So, the best solution IMO is to leave process isolation to applications because this is the best place to trade performance for robustness if you need it.

lucsorel commented 7 months ago

:pray: @davorrunje for your answer.

I understand your views in process isolation and performance. Restarting failed workers would be great, I don't know how hard that would be to implement. Thank you for the time you dedicate to this project.

lucsorel commented 7 months ago

To follow up on the resilience topic, I would like to suggest some improvement of the workers when the FastStream application is launched in the multiprocessing mode (as in faststream package.module:app --workers 3).

Workers are sub-processes; they can die out for different reasons (out-of-memory killed by the operating system, for example) but they are currently not restarted. As you mentioned restarting the application when it crashes, I thought that I could follow up on the previous discussion (let me know if I should create another issue; or rename this one).

To illustrate the problem, let's start the consumers of the sample project I created (https://github.com/lucsorel/sigseg-faststream):

# configure a test rabbit-mq cluster
docker run --rm -d -p 5672:5672 --name sigseg-mq rabbitmq:3.12-management
docker exec sigseg-mq rabbitmqadmin declare exchange name=sigseg.x type=direct
docker exec sigseg-mq rabbitmqadmin declare queue name=sigseg.q durable=true
docker exec sigseg-mq rabbitmqadmin declare binding source=sigseg.x destination=sigseg.q destination_type=queue

# spawn 3 workers
poetry run faststream run sigseg_faststream.consumer:consume --workers 3
2023-12-18 16:34:45,332 INFO     - Started parent process [7866]
2023-12-18 16:34:45,338 INFO     - Started child process [7938]
2023-12-18 16:34:45,350 INFO     - Started child process [7939]
2023-12-18 16:34:45,357 INFO     - Started child process [7940]

The ps -fax displays the processes and their filiation (we can see the parent process id and the 3 sub-process ids):

7585 pts/1  Ss  0:00  | ... \_ /usr/bin/zsh -i
7866 pts/1  S+  0:01  | ...     \_ /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/python /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/faststream run sigseg_faststream.consumer:consume --workers 3
7937 pts/1  S+  0:00  | ...         \_ /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/python -c from multiprocessing.resource_tracker import main;main(3)
7938 pts/1  Sl+ 0:00  | ...         \_ /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=6) --multiprocessing-fork
7939 pts/1  Sl+ 0:00  | ...         \_ /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=8) --multiprocessing-fork
7940 pts/1  Sl+ 0:00  | ...         \_ /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=10) --multiprocessing-fork

In another terminal, if I kill a sub-process (to simulate an out-of-memory kill by the OS, for example), I can see that the subprocess remain zombie (defunct) and is not restarted by the parent:

# gracefull kill (not kill -9) the 1st worker
kill 7938

ps -fax
7585 pts/1  Ss   0:00  | ... \_ /usr/bin/zsh -i
7866 pts/1  S+   0:01  | ...     \_ /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/python /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/faststream run sigseg_faststream.consumer:consume --workers 3
7937 pts/1  S+   0:00  | ...         \_ /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/python -c from multiprocessing.resource_tracker import main;main(3)
7938 pts/1  Z+   0:00  | ...         \_ [python] <defunct>
7939 pts/1  Sl+  0:00  | ...         \_ /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=8) --multiprocessing-fork
7940 pts/1  Sl+  0:00  | ...         \_ /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=10) --multiprocessing-fork

If I kill the other two workers, the parent application process still runs without any worker:

kill 7939 7940

ps -fax
7585 pts/1  Ss  0:00  | ... \_ /usr/bin/zsh -i
7866 pts/1  S+  0:01  | ...     \_ /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/python /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/faststream run sigseg_faststream.consumer:consume --workers 3
7937 pts/1  S+  0:00  | ...         \_ /home/lsorelgiffo/Documents/repos/sigseg-faststream/.venv/bin/python -c from multiprocessing.resource_tracker import main;main(3)
7938 pts/1  Z+  0:00  | ...         \_ [python] <defunct>
7939 pts/1  Z+  0:01  | ...         \_ [python] <defunct>
7940 pts/1  Z+  0:00  | ...         \_ [python] <defunct>

I saw that the workers are spawned in here (https://github.com/airtai/faststream/blob/main/faststream/cli/supervisors/multiprocess.py#L45-L51) and that their life cycle happens here (https://github.com/airtai/faststream/blob/main/faststream/cli/supervisors/basereload.py#L85-L90). Would you consider a PR that respawns a worker when it is killed?

Lancetnik commented 7 months ago

@lucsorel, workers respawning sounds great, I'll happy to merge your PR, thanks!

davorrunje commented 7 months ago

@lucsorel that's a great idea, thanx :)

Lancetnik commented 1 week ago

I think, we can close this Issue. Thanks for @gostilovichd

lucsorel commented 1 week ago

:pray: thank you @gostilovichd for addressing the issue.