nats-io / nats.py

Python3 client for NATS
https://nats-io.github.io/nats.py/
Apache License 2.0
902 stars 193 forks source link

WORK_QUEUE policy not working #340

Open rduque1 opened 2 years ago

rduque1 commented 2 years ago

Hi I am using nats-py==2.1.7 and when I am trying to make a load balancing with the retention policy RetentionPolicy.WORK_QUEUE the msgs are still delivered to both workers.

I don't understand what I am doing wrong, but I can't make it work so the msgs are only delivered to one subscription. The messages are deleted from the stream, but still both workers get them. I tried with a callback set and without, but it didn't change anything.

The code is to test it is here below.

import asyncio
import nats
from nats.errors import TimeoutError
from nats.aio.client import Client as NatsClient  # type: ignore
from nats.aio.client import Msg as NatsMsg
from nats.js.api import RetentionPolicy
from nats.js.api import ConsumerConfig
from nats.js.api import StreamInfo
from nats.js.api import StreamConfig
from nats.aio.subscription import Subscription

stream_name = "job-manager"
service_name = "app-condition"
full_sub = "%s.%s.demo.proces" % (stream_name, service_name)

async def msgcb(sub: Subscription, name: str):
    while True:
        try:
            msg = await sub.next_msg()
            await msg.ack()
            print("%s:" % name, msg.data.decode())
            await asyncio.sleep(2)
        except TimeoutError:
            await asyncio.sleep(2)
            print("no msgs")

async def qsub_a(msg: NatsMsg):
    print("QSUB A:", msg.data.decode())
    await msg.ack()
    await asyncio.sleep(2)

async def qsub_b(msg: NatsMsg):
    print("QSUB B:", msg.data.decode())
    await msg.ack()
    await asyncio.sleep(2)

async def get_or_create_nats_queue_stream(strm_name: str, nc: NatsClient) -> StreamInfo:
    # Create JetStream context
    js = nc.jetstream()
    strm_config = StreamConfig()
    strm_config.retention = RetentionPolicy.WORK_QUEUE
    strm_config.name = strm_name
    strm_config.subjects = ["%s" % strm_name, "%s.>" % strm_name]
    strm_config.max_age = 3600000000
    try:
        strm_info = await js.add_stream(config=strm_config)
        print('Nats stream %s created with subjects %s' % (strm_name, strm_info.config.subjects))
        return strm_info
    except Exception as e:
        if e is not None and e.err_code == 10058:
            strm_info = await js.stream_info(strm_name)
            print('Nats stream %s retrieved with subjects %s' % (strm_name, strm_info.config.subjects))
            return strm_info
        else:
            print(e)

async def get_or_create_nats_stream_sub(stream_name: str, service_name: str, nc: NatsClient, cb = None) -> Subscription:
    subject = "%s.%s.>" % (stream_name, service_name)
    js = nc.jetstream()
    cs_config = ConsumerConfig()
    cs_config.deliver_group = service_name
    print('Nats stream queue subscription created on subject %s and queue name %s' % (subject, service_name))
    if cb is not None:
        sub = await js.subscribe(subject, service_name, cb=cb, config=cs_config, manual_ack=True)
    else:
        sub = await js.subscribe(subject, service_name, config=cs_config, manual_ack=True)
    return sub

async def keyboardinterrupt():
    while True:
        await asyncio.sleep(1)

async def main(publish: bool = False):
    nc = await nats.connect("localhost")
    js = nc.jetstream()
    strm_info = await get_or_create_nats_queue_stream(stream_name, nc)
    print(strm_info)

    if publish:
        for i in range(0, 100):
            ack = await js.publish(
                full_sub, f"hello world: {i}".encode(), stream=stream_name
            )
            print(full_sub, ack)

    sub1 = await get_or_create_nats_stream_sub(stream_name, service_name, nc, cb=qsub_a)
    sub2 = await get_or_create_nats_stream_sub(stream_name, service_name, nc, cb=qsub_b)
    # sub1 = await get_or_create_nats_stream_sub(stream_name, service_name, nc)
    # sub2 = await get_or_create_nats_stream_sub(stream_name, service_name, nc)
    # await msgcb(sub1, 'sub1')
    # await msgcb(sub2, 'sub2')

    try:
        loop = asyncio.get_event_loop()
        task = loop.create_task(keyboardinterrupt())
        await task
    except KeyboardInterrupt:
        loop.stop()

if __name__ == "__main__":
    asyncio.run(main(True))
movestill commented 1 year ago

I'm seeing the same issue with version 2.2.0. I'm running using the NATS Docker image 2.9.10 for my server. I ran @rduque1's code and also my own adaptation of the demo code. I tried tweaking the consumer settings to delay redelivery beyond what my random sleeps, but I still see several redeliveries.

import asyncio
from datetime import datetime
import nats
from nats.js.api import ConsumerConfig, RetentionPolicy
from random import random

async def main():
    async def closed_cb():
        print("Connection closed")

    nc = await nats.connect("localhost", closed_cb=closed_cb)
    # Create JetStream context.
    js = nc.jetstream()
    await js.add_stream(name="work-stream", subjects=["logs"], retention=RetentionPolicy.WORK_QUEUE)

    for i in range(0, 100):
        ack = await js.publish("logs", f"hello world: {i}".encode())
        print(f"{datetime.now()} {ack}")

    # Create deliver group that will be have load balanced messages.
    async def qsub_a(msg):
        print(f"QSUB A {msg.metadata.timestamp}: {msg.data}")
        await msg.in_progress()
        await asyncio.sleep(.5 + random())
        # await asyncio.sleep(random())
        await msg.ack_sync()
        if msg.metadata.num_delivered > 1:
            print(f"Delivered {msg.metadata.num_delivered} times")

    async def qsub_b(msg):
        print(f"QSUB B {msg.metadata.timestamp}: {msg.data}")
        await msg.in_progress()
        await asyncio.sleep(2)
        # await asyncio.sleep(random())
        await msg.ack_sync()
        if msg.metadata.num_delivered > 1:
            print(f"Delivered {msg.metadata.num_delivered} times")

    config_a = ConsumerConfig(ack_wait=120, flow_control=True, max_ack_pending=1)
    config_b = ConsumerConfig(ack_wait=120, flow_control=True, max_ack_pending=1)
    await js.subscribe("logs", "workers", cb=qsub_b, manual_ack=True, config=config_b)
    await js.subscribe("logs", "workers", cb=qsub_a, manual_ack=True, config=config_a)

    try:
        while(True):
            await asyncio.sleep(2)
    except KeyboardInterrupt:
        pass
    finally:
        await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
movestill commented 1 year ago

I actually tried this again, yesterday, and it worked as expected. I didn't realize that the server's deliver group's configuration wasn't updated as I changed settings. Since I started a fresh server, it got the settings above and the ack_wait setting was used.