nats-io / nats.py

Python3 client for NATS
https://nats-io.github.io/nats.py/
Apache License 2.0
885 stars 188 forks source link

NATS client doesn't reconnect on NATS reboot (LB setup) #184

Closed SimonVHB closed 3 years ago

SimonVHB commented 4 years ago

Hello,

I've encountered a problem: When restarting a NATS cluster, the NATS python client doesn't reconnect by itself. This makes it really annoying, because all our services have to be restarted then...

Maybe it's because our NATS server is using tls because I get the following output on the subscriber:

Error: <class 'nats.aio.errors.ErrStaleConnection'>

Our config looks like this:

 # PID file shared with configuration reloader.
 pid_file: "/var/run/nats/nats.pid"
 listen: 0.0.0.0:4222 

 ###############      
 #             #      
 # Monitoring  #      
 #             #      
 ###############      
 http: 8222           
 server_name: $POD_NAME
 #####################
 #                   #
 # TLS Configuration #
 #                   #
 #####################

 tls {                
    cert_file: /etc/nats-certs/clients/nats-client-tls/tls.crt
    key_file: /etc/nats-certs/clients/nats-client-tls/tls.key
 }                    

 ###################################
 #                                 #
 # NATS Full Mesh Clustering Setup #
 #                                 #
 ###################################
 cluster {
   port: 6222

   routes = [         
     nats://nats-0.nats.testing.svc:6222,nats://nats-1.nats.testing.svc:6222,nats://nats-2.nats.testing.svc:6222,
   ]
   cluster_advertise: $CLUSTER_ADVERTISE
   no_advertise: true

   connect_retries: 30
 }

  ##################
  #                #
  # Authorization  #
  #                #
  ##################
  accounts: {
      ...
  }

  system_account: SYS

How to reproduce:

Cluster setup

The server is running on a k8s cluster using the existing helm chart: The auth config is a bit tweaked, resulting in the above config file (just loading the accounts.conf file into the configmap). We are using Nkeys to authenticate.

nats:
  image: nats:2.1.7-alpine3.11
  pullPolicy: IfNotPresent

  externalAccess: false
  advertise: false

  serviceAccount: "nats-server"

  connectRetries: 30

  pingInterval:

  # Server settings.
  limits:
    maxConnections:
    maxSubscriptions:
    # maxControlLine: 512
    # maxPayload: 65536

    writeDeadline: "2s"
    maxPending:
    maxPings:
    lameDuckDuration:

  logging:
    debug: false
    trace: false
    logtime:
    connectErrorReports:
    reconnectErrorReports:

  tls:
    secret:
      name: nats-client-tls
    # ca: "ca.crt"
    cert: "tls.crt"
    key: "tls.key"

nameOverride: ""
imagePullSecrets: []

securityContext: null

affinity: {}

podAnnotations: {}

cluster:
  enabled: true
  replicas: 3
  noAdvertise: true

leafnodes:
  enabled: false

gateway:
  enabled: false
  name: 'default'

bootconfig:
  image: connecteverything/nats-boot-config:0.5.2
  pullPolicy: IfNotPresent

reloader:
  enabled: true
  image: connecteverything/nats-server-config-reloader:0.6.0
  pullPolicy: IfNotPresent

# Authentication setup
auth:
  enabled: true

  resolver:
    type: memory
    accountsFileName: accounts.conf

Simple subscriber using the nats.py client

import asyncio
import ssl
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

SUBJECT = "test"
SEED = "/path/to/nkey/seed"
CLUSTER_ADDRESS = "nats.cluster.address:4222"

async def run(loop):

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))

    async def error_cb(e):
        print("Error:", e)

    nc = NATS()
    context = ssl.create_default_context()
    await nc.connect(CLUSTER_ADDRESS, tls=context, nkeys_seed=SEED, error_cb=error_cb)

    await nc.subscribe(SUBJECT, cb=message_handler)

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.create_task(run(loop))
    loop.run_forever()

The problem

Output when sending a message on the subject:

Received a message on 'test ': test-message

Then during the restart we get this error:

Error: <class 'nats.aio.errors.ErrStaleConnection'>

After the restart, when I publish something on the same subject, the subscriber does not get any messages.

The server restart happens in under two minutes (which is the default reconnect time for the nats client). But I still need to restart all our services to reconnect to the nats server.

Thanks in advance for the help!

SimonVHB commented 4 years ago

I've done some more testing. When using the nats go client, this problem does not occur. As subscriber I used the nats-pub example: https://github.com/nats-io/nats.go/blob/master/examples/nats-sub/main.go The only thing I changed was the option to use an nkey seed as credentials.

wallyqs commented 3 years ago

Thanks for the report @SimonVHB I haven't been able to reproduce locally so may need more info, this example can reconnect for example:

import asyncio
import ssl
import nats
# import uvloop
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

SUBJECT = "test"
SEED = "./tests/nkeys/foo-user.nk"
# CLUSTER_ADDRESS = "nats.cluster.address:4222"
CLUSTER_ADDRESS = "nats://localhost:4222"

async def run(loop):

    async def disconnected_cb():
        print("Disconnected!")

    async def reconnected_cb(
][):
        print("Reconnected!")

    async def error_cb(e):
        print("Error:", e)

    context = ssl.create_default_context()
    context.load_verify_locations('tests/certs/ca.pem')
    context.load_cert_chain(
        certfile='tests/certs/client-cert.pem',
        keyfile='tests/certs/client-key.pem'
        )
    nc = await nats.connect(servers=[CLUSTER_ADDRESS],
                       tls=context,
                       nkeys_seed=SEED,
                       error_cb=error_cb,
                       disconnected_cb=disconnected_cb,
                       reconnected_cb=reconnected_cb,
                       )

    async def message_handler(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))
        await nc.publish(msg.reply, b'pong')
    await nc.subscribe(SUBJECT, cb=message_handler)

if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.create_task(run(loop))
    loop.run_forever()

Using the certs for testing found in this repo you can start the server like:

nats-server -DV --tls --tlscert tests/certs/server-cert.pem --tlskey tests/certs/server-key.pem --tlsverify --tlscacert tests/certs/ca.pem --cluster_listen nats://127.0.0.1:6224  --config ./tests/nkeys/nkeys_server.conf

On then on reconnection you would see something like:

python3.8 examples/nkeys-reconnect.py 
Error: nats: 'Permissions Violation for Subscription to "test"'
Error: <class 'nats.aio.errors.ErrStaleConnection'>
Disconnected!
Error: [Errno 61] Connect call failed ('127.0.0.1', 4222)
An open stream object is being garbage collected; call "stream.close()" explicitly.
Error: nats: 'Permissions Violation for Subscription to "test"'
Reconnected!
SimonVHB commented 3 years ago

Just a small update, I've been doing some more testing. When I'm running the nats server locally, the client does reconnect, like expected. On our k8s cluster it doesn't though...

I've disabled TLS as well, now I can say that that isn't the problem. With TLS disabled on our nats cluster on k8s, it still has the same problem.

What I did notice is that when the client disconnects, locally you'll see it trying to reconnect because of these messages:

Error: [Errno 111] Connect call failed ('127.0.0.1', 4222)
Error: [Errno 111] Connect call failed ('127.0.0.1', 4222)
Error: [Errno 111] Connect call failed ('127.0.0.1', 4222)

When trying to reconnect to our cluster, it does not show that. My guess now is that it maybe has something to do with the fact that the NATS server is running behind a load-balancer on a digital ocean k8s cluster. When I have some time later today or tomorrow I'll look into that.

wallyqs commented 3 years ago

Thanks @SimonVHB for the update. I can try behind an LB from Digital Ocean, does the problem appears within Kubernetes as well by the way? Is there a Python container base image in particular that you are using or just connecting externally from the K8S cluster?

SimonVHB commented 3 years ago

@wallyqs just tested, the reconnect does work from within the cluster! Almost all of our services do run in a different K8S cluster, running in a custom container based on python:3.8-alpine3.11.

wallyqs commented 3 years ago

Apologies for the delay, found the issue and client with the hot-fix is available here now: https://github.com/nats-io/nats.py/releases/tag/v0.11.4