nats-io / nats.py

Python3 client for NATS
https://nats-io.github.io/nats.py/
Apache License 2.0
902 stars 193 forks source link

JetStreamContext.subscribe() and JetStreamContext.pull_subscribe() ignore "durable_name" from ConsumerConfig when looking up for consumer info. #603

Open iamdbychkov opened 2 months ago

iamdbychkov commented 2 months ago

Observed behavior

When .subscribe() or .pull_subscribe() methods of JetStreamContext are provided with config parameter which has durable_name parameter set, those methods ignore it and use durable parameter of methods themselves. Which may results in following JS error if consumer does not exist and parameter name was not specified on ConsumerConfig, which is not required by the NATS server:

Traceback (most recent call last):
  File "/tmp/ttt.py", line 25, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/tmp/ttt.py", line 18, in main
    await js.pull_subscribe(
  File "/tmp/venv/lib/python3.12/site-packages/nats/js/client.py", line 454, in pull_subscribe
    await self._jsm.add_consumer(stream, config=config)
  File "/tmp/venv/lib/python3.12/site-packages/nats/js/manager.py", line 223, in add_consumer
    resp = await self._api_request(subject, req_data, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/venv/lib/python3.12/site-packages/nats/js/manager.py", line 382, in _api_request
    raise APIError.from_error(resp['error'])
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/venv/lib/python3.12/site-packages/nats/js/errors.py", line 88, in from_error
    raise BadRequestError(**err)
nats.js.errors.BadRequestError: nats: BadRequestError: code=400 err_code=10017 description='consumer name in subject does not match durable name in request'

Expected behavior

My expectations for those methods were that they would use "durable_name" parameter from ConsumerConfig in order to create a consumer / lookup existing consumer.

I can see the logic of not using ConsumerConfig if the consumer is already created and thus passing durable to JetStreamContext methods so it is used to lookup for already created consumer.

But I believe this behaviour is unclear. If user provides config - config values should be used OR they should be overrided by .evolve() as it's done in other methods.

I think it's better to use provided config, because it opens the possibility to update consumer config automatically as it is requested in #503.

Server and client version

Server: docker image: nats:latest Client: nats-py==2.7.2

Host environment

docker run -d --name durable -p 4222:4222 nats -js

Steps to reproduce

import asyncio

import nats
from nats.js.api import ConsumerConfig, DeliverPolicy, AckPolicy

async def main():
    nc = await nats.connect()
    js = nc.jetstream()

    await js.add_stream(name='HELLO', subjects=['hello'])

    consumer_config = ConsumerConfig(
        durable_name='messages:result',
        deliver_policy=DeliverPolicy.LAST,
        ack_policy=AckPolicy.NONE,
    )
    await js.pull_subscribe(
        subject='hello',
        stream='HELLO',
        config=consumer_config,
    )

if __name__ == '__main__':
    asyncio.run(main())