nats-io / nats.py

Python3 client for NATS
https://nats-io.github.io/nats.py/
Apache License 2.0
885 stars 188 forks source link

Closing the NATS connections blocks on something #52

Closed c0deaddict closed 6 years ago

c0deaddict commented 6 years ago

After closing the connection with close() some tasks keep running for a few seconds. I expected that 'await nc.close()' would wait until all tasks are cancelled and stopped.

Sample code:

import asyncio
import signal

from nats.aio.client import Client as NATS

NATS_SERVERS = ['nats://localhost:4222']

async def main():
    nc = NATS()
    await nc.connect(servers=NATS_SERVERS)
    print('Connected to NATS at {}'.format(nc.connected_url.netloc))

    async def signal_handler():
        if nc.is_closed:
            return

        print('Disconnecting from NATS...')
        await nc.close()

        pending = asyncio.Task.all_tasks()
        for task in pending:
            print('Task is still running: {} cancelled? {}'.format(task, task.cancelled())) 

    loop = asyncio.get_event_loop()
    for sig in ('SIGINT', 'SIGTERM'):
        loop.add_signal_handler(getattr(signal, sig),
                                lambda: asyncio.ensure_future(signal_handler()))

    # subscribe on messages

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    try:
        loop.run_forever()
    finally:
        print('Shutting down')
        loop.close()

This is the output I get, you can see some tasks are still running.

Connected to NATS at localhost:4222
Disconnecting from NATS...
Task is still running: <Task pending coro=<main.<locals>.signal_handler() running at /usr/src/app/notifier/app.py:35>>, cancelled? False
Task is still running: <Task pending coro=<Client._ping_interval() running at /usr/local/lib/python3.6/site-packages/nats/aio/client.py:963> wait_for=<Future cancelled>>, cancelled? False
Task is still running: <Task pending coro=<Client._read_loop() running at /usr/local/lib/python3.6/site-packages/nats/aio/client.py:996> wait_for=<Future cancelled>>, cancelled? False
Task is still running: <Task cancelling coro=<Client._flusher() running at /usr/local/lib/python3.6/site-packages/nats/aio/client.py:944> wait_for=<Future finished result=None>>, cancelled? False
wallyqs commented 6 years ago

Found out that relinquishing cpu before closing allows time for the background tasks to complete. Have a working fix now in this PR: https://github.com/nats-io/asyncio-nats/pull/69

 async def signal_handler():
        if nc.is_closed:
            return

        print('Disconnecting from NATS...')
        await nc.close()
        await asyncio.sleep(0) # <--- allow tasks to complete in the background

        pending = asyncio.Task.all_tasks()
        for task in pending:
            print('Task is still running: {} cancelled? {}'.format(task, task.cancelled()))