nats-io / nats.py

Python3 client for NATS
https://nats-io.github.io/nats.py/
Apache License 2.0
885 stars 188 forks source link

Pending_size of subscription `_INBOX` is always growing #113

Closed laishzh closed 5 years ago

laishzh commented 5 years ago

I am using nats in a long-time task. After a long period, an error of "nats: slow consumer, messages dropped" is raised. I find out that there is a problem when dequeue a message from Subscription _INBOX. Following is a demo to show this case. The pending_size of _INBOX is always growing.

Demo:

import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

async def run(loop):
    nc = NATS()

    await nc.connect("127.0.0.1:4222", loop=loop)

    async def help_request(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))
        await nc.publish(reply, b'I can help')

    # Use queue named 'workers' for distributing requests
    # among subscribers.
    sid = await nc.subscribe("help", "workers", help_request)

    while True:
        # Send a request and expect a single response
        # and trigger timeout if not faster than 1 second.
        try:
            response = await nc.request("help", b'help me', timeout=1)
            print("Received response: {message}".format(
                message=response.data.decode()))
        except ErrTimeout:
            print("Request timed out")

        # Output all subscriptions' pending_size
        print("NC subs: {}".format(len(nc._subs)))
        for k, v in nc._subs.items():
            print("Key: {}, SubName: {}, pending_size: {}".format(k, v.subject, v.pending_size))

    # Remove interest in subscription.
    await nc.unsubscribe(sid)

    # Terminate connection to NATS.
    await nc.close()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.close()

Output:

NC subs: 2
Key: 1, SubName: help, pending_size: 0
Key: 2, SubName: _INBOX.sXm35gMyTu5v7u0Zmpi9h2.*, pending_size: 28450
Received a message on 'help _INBOX.sXm35gMyTu5v7u0Zmpi9h2.sXm35gMyTu5vlpVcmpi9h2': help me
Received response: I can help
NC subs: 2
Key: 1, SubName: help, pending_size: 0
Key: 2, SubName: _INBOX.sXm35gMyTu5v7u0Zmpi9h2.*, pending_size: 28460
Received a message on 'help _INBOX.sXm35gMyTu5v7u0Zmpi9h2.sXm35gMyTu5vUuVcmpi9h2': help me
Received response: I can help
brianshannan commented 5 years ago

Resolved by https://github.com/nats-io/nats.py/pull/114