airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.34k stars 117 forks source link

Bug: NATS unstable connections #1581

Open theobouwman opened 1 month ago

theobouwman commented 1 month ago

Describe the bug Our FastStream NATS implementation has seen a lot of error, since the start of using it:

Error in NATS: nats: 'Authentication Timeout'
Error in NATS: nats: connection closed
Error in NATS: Connection lost
Error setting key value: nats: no response from stream
Error in NATS: [Errno 104] Connection reset by peer

How to reproduce Our NATSService, used by our regular FastAPI app:

import asyncio
from enum import Enum
import logging
from opentelemetry import trace
from typing import List, TypeVar
from common.schemas.user import GetMeUserResponse
from faststream.nats import NatsBroker, StorageType, RetentionPolicy, JStream
from nats.js.api import KeyValueConfig
from nats.js.errors import KeyNotFoundError
from pydantic import AwareDatetime, RootModel
import sentry_sdk

from common.config import get_sabo_event_categories
from common.schemas.event import GetEventResponse
from common.schemas.feed import GetFeedItemResponse

from async_lru import alru_cache

T = TypeVar("T")

MOMO_WORKER_DEFAULT = JStream(
    name="momo-worker-default",
    retention=RetentionPolicy.WORK_QUEUE,
    max_age=60*60*1*3, # 3 hours
    declare=True,
    storage=StorageType.MEMORY,
    allow_direct=False,
)

class NATSBuckets(Enum):
    MOMO_ORGANISATION_EVENTS = "momo-organisation-events"
    MOMO_ORGANISATION_FEEDS = "momo-organisation-feeds"
    MOMO_ORGANISATION_MEMBERS = "momo-organisation-members"

class NATSService:
    def __init__(self, broker: NatsBroker):
        self.broker = broker

    async def init(self):
        """
        create KV stores for each bucket
        """
        try:
            tasks = []
            for bucket in NATSBuckets:
                tasks.append(asyncio.create_task(self.broker.stream.create_key_value(KeyValueConfig(
                    bucket=bucket.value,
                    ttl=60*60*24*7, # 1 week
                    storage=StorageType.MEMORY,
                ))))

            await asyncio.gather(*tasks)
        except Exception as e:
            logging.error(f"Error creating key value stores: {e}")

    @alru_cache(maxsize=16)
    async def _key_value(self, bucket):
        return await self.broker.stream.key_value(bucket)

    async def get_key_value(self, bucket: str, key: str):
        with trace.get_tracer_provider().get_tracer("nats").start_as_current_span(f"get key value") as span:
            span.set_attribute("bucket", bucket)
            span.set_attribute("key", key)

            try:
                result = await (await self._key_value(bucket)).get(key)
                return result
            except KeyNotFoundError as e:
                return None
            except Exception as e:
                logging.error(f"Error getting key value: {e}")
                sentry_sdk.capture_exception(e)
                return None

     async def set_key_value(self, bucket: str, key: str, value: bytes):
        with trace.get_tracer_provider().get_tracer("nats").start_as_current_span(f"set key value") as span:
            span.set_attribute("bucket", bucket)
            span.set_attribute("key", key)

            try:
                await (await self._key_value(bucket)).put(key, value)
            except Exception as e:
                logging.error(f"Error setting key value: {e}")
                sentry_sdk.capture_exception(e)

And here is is our Dishka setup for the NATS Worker service which will handle NATS messages, the broker is used by the NATSService and the router by the FastAPI FastStream implementation:

class NATSProvider(Provider):
    def __init__(self, broker: NatsBroker):
        super().__init__(scope=Scope.APP)
        self.broker = broker

    @provide(scope=Scope.APP)
    async def nats_service(self) -> NATSService:
        client = await self.broker.connect()

        nats_service = NATSService(self.broker)
        await nats_service.init()

        return nats_service

def _make_nats_broker() -> NatsBroker:
    async def error_cb(e):
        logging.error(f"Error in NATS: {e}")
        sentry_sdk.capture_exception(e)

    tracer_provider = trace.get_tracer_provider()

    _broker = NatsBroker(
        get_config().NATS_SERVER_URL,
        user_credentials="./nats-production-momo-api.creds",
        error_cb=error_cb,
        allow_reconnect=True,
        middlewares=(
            NatsTelemetryMiddleware(
                tracer_provider=tracer_provider,
            ),
        )
    )

    _router = NatsRouter(
        get_config().NATS_SERVER_URL,
        user_credentials="./nats-production-momo-api.creds",
        error_cb=error_cb,
        middlewares=(
            NatsTelemetryMiddleware(
                tracer_provider=tracer_provider,
            ),
        )
    )

    return _broker, _router

def make_container() -> Tuple[AsyncContainer, NatsBroker, NatsRouter]:
    nats_broker, nats_router = _make_nats_broker()
    container = make_async_container(
        DBProvider(),
        FastStreamProvider(),
        NATSProvider(broker=nats_broker),
        ServiceProvider(),
    )

    return container, nats_broker, nats_router

And lastly the Worker App:

import asyncio
import random
from typing import Annotated
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
from faststream.nats import DeliverPolicy, PullSub
from dishka.integrations.faststream import inject, FromDishka
from api.auth import firebase_initialization
from common.config import get_config
from common.exceptions.handlers import init_exception_handlers
from common.middleware import init_middleware
from common.observability import init_observability
from common.services.nats_service import MOMO_WORKER_DEFAULT
from common.services.user_service import UserService
from di import make_container
from dishka.integrations.faststream import setup_dishka as setup_faststream_ioc
from dishka.integrations.fastapi import setup_dishka as setup_fastapi_ioc

from common.schemas.worker import UserEmailSummarySendPayload

#
# DI container
#
container, _, nats_router = make_container()

#
# FastStream setup
#
setup_faststream_ioc(
    container,
    nats_router,
    finalize_container=False,
)

#
# FastAPI setup
#
_fastapi_app = FastAPI(
    title="momo-worker",
    lifespan=nats_router.lifespan_context,
    docs_url=get_config().swagger_url(),
    redoc_url=get_config().redoc_url(),
    debug=get_config().DEBUG,
    default_response_class=ORJSONResponse,
)

setup_fastapi_ioc(container, _fastapi_app)
init_observability("momo-worker", _fastapi_app)
init_exception_handlers(_fastapi_app)
init_middleware(_fastapi_app)
firebase_initialization()

# NOTE: subject: ENTITY.DOMAIN.ACTION
# e.g. user.email.summary.send
# e.g. organisation.feed.reset
# e.g. organisation.event.created
# e.g. organisation.event.deleted
# e.g. organisation.event.started
# e.g. organisation.event.ended
# e.g. organisation.member.added

#
# Handlers
#

@nats_router.subscriber(
    "scheduled.bimonthly.user.email.summary.send",
    durable="bimonthly-user-email-summary-send",
    stream=MOMO_WORKER_DEFAULT,
    max_workers=10,
    deliver_policy=DeliverPolicy.ALL,
    pull_sub=PullSub(batch_size=10),
)
@inject
async def handler(user_service: Annotated[UserService, FromDishka()]):
    users = await user_service.get_all_users()

    tasks = []

    for user in users:
        tasks.append(asyncio.create_task(nats_router.broker.publish(UserEmailSummarySendPayload(user_id=user.uid), "user.email.summary.send")))

    await asyncio.gather(*tasks)

@nats_router.subscriber(
    "user.email.summary.send",
    durable="user-email-summary-send",
    stream=MOMO_WORKER_DEFAULT,
    max_workers=10,
    deliver_policy=DeliverPolicy.ALL,
    pull_sub=PullSub(batch_size=10)
)
@inject
async def handler(msg: UserEmailSummarySendPayload, user_service: Annotated[UserService, FromDishka()]):
    print(f"Sending email to {msg.user_id}")
    try:
        user = await user_service.get_user(msg.user_id)
    except Exception as e:
        pass
    await asyncio.sleep(random.randint(1, 5))
    return msg

#
# Add routers
#
_fastapi_app.include_router(nats_router)

app = _fastapi_app

Expected behavior We expect our NATSService and FastStream app to have constant connection with the NATS server (hosted on Scaleway), without the errors, and to automatically reconnect which is default behaviour.

Observed behavior We observe connection errors, so does this suggest that the broker is not reconnecting?:

Error in NATS: nats: 'Authentication Timeout'
Error in NATS: nats: connection closed
Error in NATS: Connection lost
Error setting key value: nats: no response from stream
Error in NATS: [Errno 104] Connection reset by peer

Environment

Running FastStream 0.5.13 with CPython 3.11.6 on Darwin

Additional context We have contacted Scaleway support. They stated that resource limits are:

The limits are :
300Mb totals in stream/Kv
50 streams/Kv
50 consumers per stream

Which is more than enough, and we are not exceeding this.

They also stated that we must be sure to handle automatic reconnection. But to our understanding this is the default behaviour.

so are we doing something totally wrong with our implementation? We are not always getting these errors. They seem to appear irregular.

Lancetnik commented 1 month ago

nats-py provides us with a connection recovery functional, but seems like recovery doesn't work if Authentication was failed. Also, it looks like Authentication fails on connection timeout too... I need to dig into nats-py to find the reason

theobouwman commented 2 weeks ago

Authentication error are not occurring anymore. We use Cloud Run with min instance 1. So a started instance could run for days, only when we push new a new container will be created. But we noticed that when we push no updates for a day or 1, 2 or 3 we get error like connection closed could there be a setting to automatically reconnect after a certain period?