nats-io / nats.py

Python3 client for NATS
https://nats-io.github.io/nats.py/
Apache License 2.0
865 stars 177 forks source link

Client does not retry if TLS handshake fails #382

Open nosammai opened 1 year ago

nosammai commented 1 year ago

When the client receives an error during the TLS handshake, it looks like the retry logic doesn't kick in and the call to connect will never return. It's calling my error_cb properly, but then I never see it attempt to connect again, even though I have retries enabled in the settings.

2022-11-09 19:02:11,187 nats_util        ERROR    NATS Error:
Traceback (most recent call last):
  File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/nats/aio/client.py", line 469, in connect
    await self._process_connect_init()
  File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/nats/aio/client.py", line 1892, in _process_connect_init
    await self._transport.connect_tls(
  File "/opt/rh/rh-python38/root/usr/local/lib/python3.8/site-packages/nats/aio/transport.py", line 155, in connect_tls
    transport = await asyncio.wait_for(
  File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/tasks.py", line 494, in wait_for
    return fut.result()
  File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 1200, in start_tls
    await waiter
ConnectionResetError
charbonnierg commented 1 year ago

That's weird, ConnectionResetError is definitely a subsclass of OSError which is catched by the _process_connect_init method.

I wrote this small reproduction script:

import asyncio
from nats import NATS

class ReproNATS(NATS):
    def _process_connect_init(self) -> None:
        raise ConnectionResetError

async def error_callback(err: Exception) -> None:
    print(f"Encountered an error: {repr(err)}")

async def main() -> None:
    nc = ReproNATS()
    await asyncio.wait_for(nc.connect(error_cb=error_callback), timeout=10)

if __name__ == "__main__":
    asyncio.run(main())

And did not manage to reproduce the issue: when I run the script, client tries to reconnect.

> python repro.py 
Encountered an error: ConnectionResetError()
Encountered an error: ConnectionResetError()
Encountered an error: ConnectionResetError()
Encountered an error: ConnectionResetError()
Encountered an error: ConnectionResetError()

It seems that your example is using a custom error callback, are you sure that it works as expected ? (default callback logs error using nats: encountered error: ... )

nosammai commented 1 year ago

My error callback is just logging the error:

    async def on_error(e):
        LOGGER.exception(f"NATS Error: {e}")

The error seems to specifically be happening during the TLS upgrade - is there any chance that there's some state that is not reset properly compared to when the error happens right away or during the initial connect?

charbonnierg commented 1 year ago

Just a guess, but did you try setting self._transport to None in Client._close() method ? (at the end). I took a quick look and it seems that this attribute is never set back to None after being assigned the first time ?

bvanelli commented 1 year ago

Hello @charbonnierg, the self._transport is not reset because Client manages the reconnection by itself, using the self._status flag:

https://github.com/nats-io/nats.py/blob/0443f8a3ca322ea1c19c2e50053e58add9870fa7/nats/aio/client.py#L1348-L1358

Then when it gets closed it can reset itself to allow for reconnects:

https://github.com/nats-io/nats.py/blob/0443f8a3ca322ea1c19c2e50053e58add9870fa7/nats/aio/client.py#L625-L629

The error is handled here:

https://github.com/nats-io/nats.py/blob/0443f8a3ca322ea1c19c2e50053e58add9870fa7/nats/aio/client.py#L479-L490

The only way for the error to be then raised is that it ran out of reconnect attempts. Could it be the case @nosammai that the allow_reconnect is set to False or None?

nosammai commented 1 year ago

It seems to reconnect in other cases just fine - if i turn off the nats server or start up my app without the server running it will continuously reconnect with no issues.

charbonnierg commented 1 year ago

Hello @charbonnierg, the self._transport is not reset because Client manages the reconnection by itself, using the self._status flag:

https://github.com/nats-io/nats.py/blob/0443f8a3ca322ea1c19c2e50053e58add9870fa7/nats/aio/client.py#L1348-L1358

Then when it gets closed it can reset itself to allow for reconnects:

https://github.com/nats-io/nats.py/blob/0443f8a3ca322ea1c19c2e50053e58add9870fa7/nats/aio/client.py#L625-L629

The error is handled here:

https://github.com/nats-io/nats.py/blob/0443f8a3ca322ea1c19c2e50053e58add9870fa7/nats/aio/client.py#L479-L490

The only way for the error to be then raised is that it ran out of reconnect attempts. Could it be the case @nosammai that the allow_reconnect is set to False or None?

I did not want to expand because I was not so sure, but I think I have a good idea of how the client is implemented.

What I was thinking is that self._transport is assigned in only two lines in the codebase:

                if not self._transport:
                    if s.uri.scheme in ("ws", "wss"):
                        self._transport = WebSocketTransport()
                    else:
                        # use TcpTransport as a fallback
                        self._transport = TcpTransport()

Moreover, self._transport is never resetted to None (even though it's closed).

So the statement if not self._transport will always be False after first connect.

I have almost 0 knowledge about asyncio and TLS, but I was thinking that maybe this could explain the behaviour observed by @nosammai (waiting forever ?)

The TLS transport always recreates a new asyncio.StreamWriter each time connect method is called, so it should not matter if the same transport instance is reused, as long as connect is called first.

But if connect_tls was called without calling connect first, then the closed transport could be used which made me doubt this line which is the line raising error in traceback.

I think that connect_tls is never called before connect, but better be safe than sorry :)

charbonnierg commented 1 year ago

BTW, can you not run your client with some HTTP server and expose an endpoint to list running asyncio tasks ? Something like this server.py file:

import asyncio
import io
from nats import NATS
from fastapi import FastAPI

app = FastAPI()
nc = NATS()

@app.on_event("startup")
async def start_client() -> None:
    asyncio.create_task(nc.connect())

@app.get("/tasks")
async def get_asyncio_tasks():
    all_tasks = asyncio.all_tasks()
    tasks_infos: List[Any] = []
    for task in all_tasks:
        memio = io.StringIO()
        task.print_stack(file=memio)
        memio.seek(0)
        tasks_infos.append(
            {
                "name": task.get_name(),
                "coro": task.get_coro().__qualname__,
                "stack": memio.read().splitlines(False),
            }
        )
    return {"count": len(all_tasks), "tasks": tasks_infos}

Run the app using uvicorn:

uvicorn server:app

Then you can query which asyncio tasks are running:

curl -s http://localhost:8000/tasks | jq .tasks[].coro

In case client is connected the output should be:

# NATS
"Client._read_loop"
"Client._ping_interval"
"Client._flusher"
# Uvicorn
"LifespanOn.main"
"Server.serve"
"RequestResponseCycle.run_asgi"

Example of a detailed output:

    {
      "name": "Task-10",
      "coro": "Client._flusher",
      "stack": [
        "Stack for <Task pending name='Task-10' coro=<Client._flusher() running at ~/repro-nats/.venv/lib/python3.8/site-packages/nats/aio/client.py:1981> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f60abcb9ac0>()]>> (most recent call last):",
        "  File \"~/repro-nats/.venv/lib/python3.8/site-packages/nats/aio/client.py\", line 1981, in _flusher",
        "    future: asyncio.Future = await self._flush_queue.get()"
      ]
    }

I helped us debug problems several times. I'm sure there are metter methods though, but being able to list running tasks at any time is really valuable when debugging code with NATS client involved