nats-io / nats.py

Python3 client for NATS
https://nats-io.github.io/nats.py/
Apache License 2.0
836 stars 174 forks source link

_read_loop() inside nats/aio/client.py could become a never yield forever loop (nats-py-2.3.1.tar.gz, ssl) #490

Open 3figs opened 10 months ago

3figs commented 10 months ago

What version were you using?

nats-py-2.3.1.tar.gz

What environment was the server running in?

alpine linux - 3.18.2

Is this defect reproducible?

Yes, 1 out 4 of changing certs. after nats-server restarts, client can fall into forever loop.

Given the capability you are leveraging, describe your expectation?

Under no circumstance would _read_loop fall into a non-yield forever loop. It blocked everything.

Given the expectation, what is the defect you are observing?

Hi, Our ssl nats client could fall into a forever loop when certs changed (nat-server restarted). Eventually we located the forever loop was inside _read_loop() inside aio/client/py:

while True:
      try:
           should_bail = self.is_closed or self.is_reconnecting
           if should_bail or self._transport is None:
                  break
          if self.is_connected and self._transport.at_eof():
                 err = errors.UnexpectedEOF()
                 await self._error_cb(err)
                 await self._process_op_err(err)
                 break
          b = await self._transport.read(DEFAULT_BUFFER_SIZE) <-----
          await self._ps.parse(b)

b is always empty, eof is true. When this issue happened, all other asyncio tasks were blocked because this loop never yield.

We hit this issue when certs changes . After certs changed, nats-server restarts first. The nats-client using old certs should detect the failure and eventually reconnect using new certs. But nats-client falls into a forever loop and blocks other tasks from running. This issue can be seen around 1 out 4 tries.

Have anyone ever hit similar issue? It seems a bug to me. Under no circumstance would _read_loop fall into a non-yield forever loop.

Thanks Steven

3figs commented 9 months ago

Forget about certs change. I can make it happen easily. It can happen anytime if the criteria is met. Actually I did see once when my app just restarted (no nats server restart).

When self._status == CONNECTING, self._transport.at_eof(), and b = await self._transport.read(..) is empty, _read_loop will fall into a never-yield-forever-loop. Once this happened, it would block all other tasks from running.

self._status can be CONNECTING at connect or attempt_reconnect(). It depends on when transport.at_eof() happens. If transport eof came in when nats client was at CONNECTING status and if buffer was empty, this issue would happen.

Below is the change I used. It just forces to set_eof in asyncio/sslproto.py. Remember to run "echo>/var/log/robot/set_eof" before restart your app.

# diff /mnt/datafs/new/sslproto.py /usr/lib/python3.11/asyncio/
--- /mnt/datafs/new/sslproto.py
+++ /usr/lib/python3.11/asyncio/sslproto.py
@@ -1,6 +1,5 @@
import collections
import enum
-import os
import warnings
try:
     import ssl
@@ -431,11 +430,6 @@
         return self._ssl_buffer_view

     def buffer_updated(self, nbytes):
-        if os.path.exists("/var/log/robot/set_eof"):
-            logger.info(f"------ set_eof buffer_updated state {self._state}")
-            self._app_state = AppProtocolState.STATE_EOF
-            keep_open = self._app_protocol.eof_received()
-            
         self._incoming.write(self._ssl_buffer_view[:nbytes])

         if self._state == SSLProtocolState.DO_HANDSHAKE:

# diff /mnt/datafs/new/client.py /usr/lib/python3.11/site-packages/nats/aio/
--- /mnt/datafs/new/client.py
+++ /usr/lib/python3.11/site-packages/nats/aio/client.py
@@ -19,7 +19,6 @@
import ipaddress
import json
import logging
-import os
import ssl
import time
import string
@@ -2067,10 +2066,6 @@

                 b = await self._transport.read(DEFAULT_BUFFER_SIZE)
                 await self._ps.parse(b)
-                if os.path.exists("/var/log/robot/read_loop"):
-                    io_reader = self._transport._io_reader
-                    _logger.info(f"---- read_loop parse {self._client_id} {io_reader._paused} status {self._status} state {self._ps.state} {b}")
-
             except errors.ProtocolError:
                 await self._process_op_err(errors.ProtocolError())
                 break