nats-io / stan.py

Python Asyncio NATS Streaming Client
https://nats.io
Apache License 2.0
113 stars 23 forks source link

Example of a subscriber on an infinite loop #12

Closed yanpozka closed 6 years ago

yanpozka commented 6 years ago

Hi thanks for publishing a Python client for nats-streaming, results that asyncio is relative new for a lot of Python developers (like me) Is possible to add an example with a subscriber listening messages forever e.i. inside of an infinite loop or something equivalent on the asyncio way ? I want to have a service to listening updates from NATS.

Generic examples: https://gist.github.com/Integralist/6f34e23f71340a1a23e846cd2f64cf32 https://tutorialedge.net/python/concurrency/asyncio-event-loops-tutorial/

Thanks in advance!

wallyqs commented 6 years ago

Thanks for the feedback I will add an example that does exactly that. Sharing them first here below:

Example consumer receiving messages forever:

import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN

async def run(loop):
    nc = NATS()
    sc = STAN()

    # Start session with NATS Streaming cluster using
    # the established NATS connection.
    await nc.connect(io_loop=loop)
    await sc.connect("test-cluster", "client-123", nats=nc)

    # Example async subscriber
    async def cb(msg):
        print("Received a message (seq={}): {}".format(msg.seq, msg.data))

    # Subscribe to get all messages from the beginning.
    await sc.subscribe("greetings", start_at='first', cb=cb)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

Example producer sending messages forever:

import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN

async def run(loop):
    nc = NATS()
    sc = STAN()

    # First connect to NATS, then start session with NATS Streaming.
    await nc.connect(io_loop=loop)
    await sc.connect("test-cluster", "client-456", nats=nc)

    # Periodically send a message
    while True:
        await sc.publish("greetings", b'Hello World!')
        await asyncio.sleep(1, loop=loop)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()
yanpozka commented 6 years ago

@wallyqs thanks a lot for your prompt response! I've tried your Example consumer receiving messages forever but it's not working, python client is trying to subscribe and unsubscribe a lot of times (probably coz the infinite loop), we can see the six tries of subscriptions on nats server logs:

2018/06/28 23:10:35.286569 [DBG] 172.18.0.1:43394 - cid:7 - Deferring actual UNSUB(_INBOX.lqc06jDlxe63OS3rz3jN8O): 1 max, 0 received
2018/06/28 23:10:35.286581 [TRC] 172.18.0.1:43394 - cid:7 - ->> [PUB _STAN.close.FUlqxXDk0C3F29nclnZ5Kh _INBOX.lqc06jDlxe63OS3rz3jN8O 21]
2018/06/28 23:10:35.286585 [TRC] 172.18.0.1:43394 - cid:7 - ->> MSG_PAYLOAD: [python-client-andry]
2018/06/28 23:10:35.286590 [TRC] 127.0.0.1:40514 - cid:2 - <<- [MSG _STAN.close.FUlqxXDk0C3F29nclnZ5Kh 7 _INBOX.lqc06jDlxe63OS3rz3jN8O 21]
2018/06/28 23:10:35.286657 [TRC] 127.0.0.1:40514 - cid:2 - ->> [PUB _INBOX.lqc06jDlxe63OS3rz3jN8O 0]
2018/06/28 23:10:35.286664 [TRC] 127.0.0.1:40514 - cid:2 - ->> MSG_PAYLOAD: []
2018/06/28 23:10:35.286672 [DBG] 127.0.0.1:40514 - cid:2 - Auto-unsubscribe limit of 1 reached for sid '6'
2018/06/28 23:10:35.286676 [TRC] 172.18.0.1:43394 - cid:7 - <<- [MSG _INBOX.lqc06jDlxe63OS3rz3jN8O 6 0]
2018/06/28 23:10:35.286681 [TRC] 172.18.0.1:43394 - cid:7 - <-> [DELSUB 6]
2018/06/28 23:10:35.286960 [DBG] 172.18.0.1:43394 - cid:7 - Client connection closed

Any idea of how to solve this? Thanks in advance

wallyqs commented 6 years ago

@yanpozka which versions of the asyncio-nats-client and asyncio-nats-streaming clients are you using? Sharing a gif below of how it works if using v0.7.0 version of the NATS client and latest asyncio-nats-streaming. Are you not receiving the messages in the consumer client?

asyncio-nats

yanpozka commented 6 years ago

@wallyqs I'm using the latest version of server and client, I found the problem I was closing the connections at the end of run() method, it's working :+1: Thank you very much for your help!

Olshansk commented 5 years ago

@wallyqs If the nats server were to restart while the client is running, how can the client become aware of that and reconnect?

It seems like there is already a heartbeat mechanism implement (https://github.com/nats-io/stan.py/blob/d5858b91501fc067ace49da6ea2bcde6ea1da7f5/stan/aio/client.py), but I believe I might be missing something to make use of it.