nats-io / stan.py

Python Asyncio NATS Streaming Client
https://nats.io
Apache License 2.0
113 stars 23 forks source link

how to get all available messages and exit ? #50

Closed chermed closed 3 years ago

chermed commented 3 years ago

I can't get all messages from a channel using STAN, I have more than 20000 messages in the queue, so each time I run my script I got a small amount of messages: 1310, 784, 25, 456, etc.

The messages doesn't grown during all tests

I need to increase the sleep time to get more messages, for 20 seconds I can get the all 20000 messages, but my question is that why ? and is there a way to receive all messages then exit without adjusting the sleep time in relation to number of messages.

here is my code :

total_messages = 0
async def cb(msg):
  total_messages += 1
  print("Received a message (seq={}): {}".format(msg.seq, msg.data))

# await sc.subscribe("foo", start_at="first", cb=cb)   # tested also
await sc.subscribe("foo", deliver_all_available=True, cb=cb)
await asyncio.sleep(1)
print(total_messages)

My use case : I have some results stored back to a channel, so I need to do a lookup on all messages, but the problem here is that a lot of message are skipped, and if I increase the sleep time I will have a latence

chermed commented 3 years ago

We can just use the await Future
See the readme: https://github.com/nats-io/stan.py#basic-usage