reagento / dishka

Cute DI framework with agreeable API and everything you need
https://dishka.readthedocs.io
Apache License 2.0
382 stars 38 forks source link

how to attach FastStream NATS OpenTelemetry middleware #174

Closed theobouwman closed 1 month ago

theobouwman commented 3 months ago

I already have our NATSService used by our FastAPI instance which works great. Up until now just for KV store for some caching. I want to add fastStream as a separate instance which will need to use the common shared code like all services/repos/config/observability etc.

The problem is: I initialise the broker in the NATSProvider which is already being used by other services, but how can I attach that instance to the FastStream instance and attach handlers to it?

Here my provider:

class NATSProvider(Provider):
    @provide(scope=Scope.APP)
    async def nats_service(self) -> NATSService:
        async def error_cb(e):
            logging.error(f"Error in NATS: {e}")
            sentry_sdk.capture_exception(e)

        tracer_provider = trace.get_tracer_provider()

        _broker = NatsBroker(
            get_config().NATS_SERVER_URL,
            user_credentials="./secret.creds",
            error_cb=error_cb,
            middlewares=(
                NatsTelemetryMiddleware(
                    tracer_provider=tracer_provider,
                ),
            )
        )

        client = await _broker.connect()

        nats_service = NATSService(_broker)
        await nats_service.init()

        return nats_service

my container:


container = make_async_container(
    DBProvider(),
    NATSProvider(),
    ServiceProvider(),
)

here the observability init, this is shared code in the shared/ folder, used by my FastAPI API and FastStream Worker:

def init_observability(
    service_name: str,
    app: FastAPI = None,
):
    resource = Resource.create({
        "service.name": service_name,
    })

    tracer_provider = TracerProvider(resource=resource)

    if get_config().ENVIRONMENT == 'development':
        OTLP_HTTP_ENDPOINT = get_config().OTLP_HTTP_ENDPOINT
        trace_exporter = OTLPSpanExporter(endpoint=OTLP_HTTP_ENDPOINT)
        tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter))
    else:
        tracer_provider.add_span_processor(SentrySpanProcessor())

    trace.set_tracer_provider(tracer_provider)
    set_global_textmap(SentryPropagator())

    if app:
        FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer_provider)

    if get_config().ENVIRONMENT != 'development':
        sentry_sdk.init(
            dsn=get_config().SENTRY_DSN,
            environment=get_config().ENVIRONMENT,
            traces_sample_rate=0.4,
            profiles_sample_rate=0.4,
            enable_tracing=True,
            instrumenter="otel",
        )

And here is the FastStream app.py:


from di import container

from common.config import get_config
from common.schemas.worker import NewUserSignupPayload

app = FastStream(broker)

init_observability("momo-worker")
firebase_initialization()

setup_dishka(container, app)

@broker.subscriber(
    "user.email.summary.send",
    queue="worker",
    max_workers=40,
    deliver_policy=DeliverPolicy.ALL
)
@broker.publisher(
    "user.email.summary.success",
)
@inject
async def handler(msg: NewUserSignupPayload, logger: Logger, user_service: Annotated[UserService, FromDishka()], nats_service: Annotated[NATSService, FromDishka()]):
    logger.info(msg)
    try:
        tasks = []

        for i in range(10):
            tasks.append(asyncio.create_task(nats_service.get_cache_key_organisation_event('test')))

        user = await user_service.get_user_by_email(msg.user_id)

        await asyncio.gather(*tasks)
    except Exception as e:
        pass
IvanKirpichnikov commented 3 months ago

please describe your problem in more detail