nats-io / nats.py

Python3 client for NATS
https://nats-io.github.io/nats.py/
Apache License 2.0
865 stars 177 forks source link

Question about race to connect and subscribe #377

Open mriedem opened 1 year ago

mriedem commented 1 year ago

Hi, we're using version 2.1.7 of the library and I'm wondering about a race / usage issue we might be having, I'm trying to sort out if it's a bug in how we're using the library or maybe something that could be better in the library itself.

Our issue is that we make the client connection using max_reconnect_attempts=-1, e.g.:

            await self.nc.connect(
                nats_bus,
                tls=ssl_ctx,
                token=self.config.nats_token,
                ping_interval=self.config.nats_ping_interval,
                max_reconnect_attempts=self.config.nats_reconnect_attempts,
                **get_nats_callbacks('RuntimeInnerLoop', nats_bus)
            )

Then we do some other setup and eventually create some subscriptions:

        self.subs = [
            await self.nc.subscribe(f"commands.cancel.{self.backend}.*", "cancel", cancel_job),
            await self.nc.subscribe(f"jobs.{self.backend}.*", "workers", run_job),
            await self.nc.subscribe(
                f"sw-aot-calibrations-service.update.{self.backend}",
                "calibration",
                update_current_calibration)
        ]

The thing we noticed once is that when we had a bad cert, the nats connection failed and called the error callback but our app is still running with essentially a dead client.

Another app which publishes the messages that we're subscribed to get hits "no responders" errors because apparently our subscriptions were never made.

What I'm trying to figure out is why the subscribe() calls were allowed if the state of the client connection is known to not be connected. Is it assumed that it will eventually connect or the code using the nats client instance will detect and crash and cleanup (or try to reconnect)?

For now it seems our solution is just going to be to set max_reconnect_attempts to something more sane, or leave to the defaults so that the library tries to connect to the server every 2 seconds up to 60 times and fails if it can't, which would all happen before we try to create subscriptions.

I'm mainly curious as to why the subscribe() calls pass even though the client isn't successfully connected.

ljluestc commented 2 months ago
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

async def connect_to_nats(nats_bus, ssl_ctx, config):
    nc = NATS()

    async def error_cb(e):
        print("Error:", e)

    async def closed_cb():
        print("Connection closed")

    async def disconnected_cb():
        print("Disconnected")

    async def reconnected_cb():
        print("Reconnected")

    await nc.connect(
        nats_bus,
        tls=ssl_ctx,
        token=config.nats_token,
        ping_interval=config.nats_ping_interval,
        max_reconnect_attempts=config.nats_reconnect_attempts,
        error_cb=error_cb,
        closed_cb=closed_cb,
        disconnected_cb=disconnected_cb,
        reconnected_cb=reconnected_cb
    )
    return nc

async def create_subscriptions(nc, backend, cancel_job, run_job, update_current_calibration):
    if nc.is_connected:
        subs = [
            await nc.subscribe(f"commands.cancel.{backend}.*", "cancel", cancel_job),
            await nc.subscribe(f"jobs.{backend}.*", "workers", run_job),
            await nc.subscribe(f"sw-aot-calibrations-service.update.{backend}", "calibration", update_current_calibration)
        ]
        return subs
    else:
        print("NATS client is not connected. Cannot create subscriptions.")
        return []

async def main():
    nats_bus = "nats://localhost:4222"
    ssl_ctx = None  # Your SSL context here
    config = {
        "nats_token": "your_token",
        "nats_ping_interval": 120,
        "nats_reconnect_attempts": -1  # Infinite retries
    }

    nc = await connect_to_nats(nats_bus, ssl_ctx, config)
    if nc.is_connected:
        print("Connected to NATS")

        def cancel_job(msg):
            print(f"Received cancel job: {msg}")

        def run_job(msg):
            print(f"Received run job: {msg}")

        def update_current_calibration(msg):
            print(f"Received update calibration: {msg}")

        subs = await create_subscriptions(nc, "backend_name", cancel_job, run_job, update_current_calibration)
        if subs:
            print("Subscriptions created successfully")
        else:
            print("Failed to create subscriptions")
    else:
        print("Failed to connect to NATS")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())