airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
3.15k stars 161 forks source link

Bug: NATS subscriber retry seems not working #1888

Closed Atanusaha143 closed 1 week ago

Atanusaha143 commented 3 weeks ago

Describe the bug According to NATS ack it should retry, whenever an error occurs. But not getting the expected behavior.

How to reproduce

from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

@broker.subscriber(subject="test", retry=True)
async def handle_test(data: dict):
    a = 5
    b = 0
    c = a / b

And/Or steps to reproduce the behavior:

  1. similar for any error cases
  2. NatsMessage.nack() is not helping

Expected behavior

If the retry flag is set to True, the message will be nacked and placed back in the queue each time an error occurs.

Observed behavior Even if the retry flag is set to True, the message is not retrying

Lancetnik commented 3 weeks ago

@Atanusaha143 thank you for the report! Anyway, I should warn you: retry option is deprecated and will be removed in 0.6.0 version to prior to ack_policy=AckPolicy.NackOnError

Lancetnik commented 3 weeks ago

You can take a look at draft - #1869

Atanusaha143 commented 3 weeks ago

@Lancetnik Thank you for the prompt reply and the heads-up. Are there alternative methods to implement a retry mechanism using Faststream?

Lancetnik commented 2 weeks ago

@Lancetnik Thank you for the prompt reply and the heads-up. Are there alternative methods to implement a retry mechanism using Faststream?

With Nast Core it can be implemented via re-publish only. retry should works fine with Nats JetStream now - it has acknowledgement policy and FS just calls nak to redeliver message on error if retry option setted. Anyway, retry logic strongly depends on real broker features, this reason I want to remove this option and replace it by much explicit ack_policy

Atanusaha143 commented 2 weeks ago

@Lancetnik The code provided in the description raises a ZeroDivisionError. Based on the logic, it should be re-queued on the subject named test, and the subscriber for the test subject should catch it and execute the handle_test function again. However, this expected behavior is not happening.

Lancetnik commented 2 weeks ago

@Lancetnik The code provided in the description raises a ZeroDivisionError. Based on the logic, it should be re-queued on the subject named test, and the subscriber for the test subject should catch it and execute the handle_test function again. However, this expected behavior is not happening.

It works only for Nats JS. Nats core has no "requeue" functional. You should setup a stream

Atanusaha143 commented 2 weeks ago
# publisher

from nats.aio.client import Client as NATS
from nats.js import JetStreamContext
from nats.js.api import StreamConfig

class TestPublisher:
    def __init__(self):
        self._conn = None

    @staticmethod
    async def _get_jetstream_conn() -> JetStreamContext:
        nats: NATS = NATS()
        await nats.connect("nats://localhost:4222")
        return nats.jetstream()

    @staticmethod
    async def _add_stream(conn: JetStreamContext, subjects: list[str], name: str = "test_stream") -> JetStreamContext:
        try:
            await conn.add_stream(StreamConfig(name=name, subjects=subjects))
            print("Stream created successfully.")
            return conn
        except Exception as e:
            print(f"Failed to create stream: {str(e)}")

    async def _setup_publisher(self, stream_name: str, subjects: list[str]) -> None:
        self._conn = await self._get_jetstream_conn()
        self._conn = await self._add_stream(conn=self._conn, subjects=subjects, name=stream_name)

    async def publish(self, stream_name: str, subjects: list[str], data: bytes) -> None:
        try:
            if self._conn is None:
                await self._setup_publisher(stream_name=stream_name, subjects=subjects)
            for subject in subjects:
                print(f"Published into Stream: {stream_name}")
                ack = await self._conn.publish(subject=subject, payload=data)
                print(f'Ack: stream={ack.stream}, sequence={ack.seq}')
        except Exception as e:
            print(f"Failed to publish to '{subjects}': {str(e)}")
# subscriber

from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

@broker.subscriber(subject="test_subject", retry=True)
async def handle_test(data: str):
    print(data)
    a = 4
    b = 0
    c = a / b

I intentionally performed the division operation to test whether the retry mechanism is functioning properly.

await test_publisher.publish(stream_name="test_stream", subjects=["test_subject"], data=b"started testing")

@Lancetnik When calling the _testpublisher's publish method, the retry mechanism should trigger since it throws a ZeroDivisionError, right?

Lancetnik commented 2 weeks ago

@Atanusaha143 you should setup stream in subscriber, not in publisher to consume messages from Jetstream

Atanusaha143 commented 1 week ago

As a solution, pull consumer is required, and a pull consumer must have a stream