airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.4k stars 121 forks source link

Bug: NATS ConsumerConfig only used for Push subscribers #1598

Closed tim-hutchinson closed 2 months ago

tim-hutchinson commented 2 months ago

Describe the bug Provide a clear and concise description of the bug.

How to reproduce Include source code:

from faststream import FastStream

@broker.subscriber(
    config=ConsumerConfig(
        filter_subjects=["b.a", "b.b"],
    ),
    stream=JStream(
        "test-stream2",
        subjects=["b.*"],
    ),
    pull_sub=True
)
async def handler(msg):
    print(msg)

...

Expected behavior A pull based consumer should be created for NATS jetstream with the filter_subjects values.

Observed behavior The consumer is created but filter_subjects is empty

 nats consumer info
? Select a Stream test-stream2
? Select a Consumer pull-sub
Information for Consumer test-stream2 > pull-sub created 2024-07-15T13:17:37-04:00

Configuration:

                    Name: pull-sub
               Pull Mode: true
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
       Max Waiting Pulls: 512

State:

  Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
    Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
           Waiting Pulls: 0 of maximum 512

For comparison, here's the same configuration output, but with pull_sub=False:

❯ nats consumer info
? Select a Stream test-stream2
? Select a Consumer push-sub
Information for Consumer test-stream2 > push-sub created 2024-07-15T13:18:19-04:00

Configuration:

            Durable Name: push-sub
        Delivery Subject: _INBOX.zyOnP02Ci3Gc8BtZILE7Wa
         Filter Subjects: b.a, b.b
          Deliver Policy: All
              Ack Policy: Explicit
                Ack Wait: 30.00s
           Replay Policy: Instant
         Max Ack Pending: 1,000
            Flow Control: false

State:

  Last Delivered Message: Consumer sequence: 0 Stream sequence: 0
    Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0
        Outstanding Acks: 0 out of maximum 1,000
    Redelivered Messages: 0
    Unprocessed Messages: 0
         Active Interest: Active

Environment

Running FastStream 0.5.14 with CPython 3.12.2 on Darwin

Additional context It looks like #1519 only added config passing into the push subscriber _create_subscription functions, not the pull ones, which only provide extra_options.

Lancetnik commented 2 months ago

Indeed, thank you a lot for such detail report