nats-io / stan.py

Python Asyncio NATS Streaming Client
https://nats.io
Apache License 2.0
113 stars 23 forks source link

Use async for #2

Open barrachri opened 6 years ago

barrachri commented 6 years ago

Hi, did you consider using something like async for?

Instead of

async def cb(msg):
  print("Received a message (seq={}): {}".format(msg.seq, msg.data))

# Receive messages starting at a specific sequence number
await sc.subscribe("foo", start_at="sequence", sequence=3, cb=cb)

something like this

messages = await sc.subscribe("foo", start_at="sequence", sequence=3)
async for msg in messages:
    print("Received a message (seq={}): {}".format(msg.seq, msg.data))
wallyqs commented 6 years ago

Haven't looked into it, but think that would be a nice addition to both the underlying asyncio-nats client and this one.

barrachri commented 6 years ago

Also something similar to aioredis without using async generator would be fine:

async def reader(ch):
    while (await ch.wait_message()):
        msg = await ch.get_json()
        print("Got Message:", msg)

Mostly because with the current asyncio-nats-streaming is a little bit tricky to achieve the same


async def scheduler():
    """Launch the task manager."""
    log.info("Loading subscribers....")
    for queue_name, funcs in _subscribers.items():
        # add a callback when you receive a message
        await streaming.subscribe(queue_name, on_message)

    # really, really ugly
    while True:
        await asyncio.sleep(1)