funcmike / rabbitmq-nio

A Swift implementation of AMQP 0.9.1 protocol: decoder + encoder and non-blocking client
Apache License 2.0
44 stars 8 forks source link

Support distributed tracing #38

Open sliemeobn opened 1 year ago

sliemeobn commented 1 year ago

I added a tiny wrapper around this library to participate in distributed tracing, works like a charm with nodejs services (communicating with swift services through rabbitmq.)

Ideally, this can come out of the box. We'd either need some sort of config for it or maybe a middleware system?

Here is what I use:

struct AMQPHeaderPropagation: Extractor, Injector {
    typealias Carrier = Table?

    func extract(key: String, from carrier: Carrier) -> String? { carrier?[key]?.asString }

    func inject(_ value: String, forKey key: String, into carrier: inout Carrier) {
        carrier = carrier ?? Table()
        carrier![key] = .longString(value)
    }
}

private extension Field {
    var asString: String? {
        switch self {
        case let .longString(s): return s
        // NOTE: maybe support more data types
        default: return nil
        }
    }
}

extension ServiceContext {
    var asAqmpHeaders: Table? {
        var headers: Table?
        InstrumentationSystem.instrument.inject(self, into: &headers, using: AMQPHeaderPropagation())
        return headers
    }
}

extension AMQPResponse.Channel.Message.Delivery {
    var serviceContext: ServiceContext {
        var context = ServiceContext.topLevel
        InstrumentationSystem.instrument.extract(properties.headers, into: &context, using: AMQPHeaderPropagation())
        return context
    }
}
sliemeobn commented 1 year ago

Btw: When implementing it I stumbled over this spec stating that the traceparent must be some binary thing.

All I can say is that I used strings and it works just fine with the opentelemetry package for amqplib. So that might be a bit sticky situation to consider.

tweidema commented 3 months ago

I would like to second this suggestion. Having distributed tracing integrated would be very nice. @sliemeobn I would be grateful for at few pointers wrt how to integrate your code above with Swift tracing/swift-otel. I am new to both swift and distributed tracing but have a working example with http from a swift client (using AsyncHTTPClient with retrofitted distributed tracing built from https://github.com/bok-/devworld24-tracing-demo.git) to a Hummingbird server (which supports distributed tracing out of the box). Traces sent to Tempo and visualized with Grafana.

sliemeobn commented 2 months ago

@tweidema it sounds like you have it all setup then, I am not quite sure for which part are you looking for pointers.

I guess the only piece missing are the actual spans, I use the following for now (not sure how otel spec-compliant it is though):

extension AMQPChannel {
    func publish(_ buffer: ByteBuffer, to exchange: String, with routingKey: String) async throws {
        try await withSpan("\(exchange) > \(routingKey) publish", ofKind: .producer) { span in
            span.attributes = [
                "messaging.system": "rabbitmq",
                "messaging.destination": .string(exchange),
                "messaging.rabbitmq.routing_key": .string(routingKey),
            ]

            try await basicPublish(
                from: buffer, exchange: exchange, routingKey: routingKey,
                properties: .init(
                    headers: ServiceContext.current?.asAqmpHeaders,
                    deliveryMode: .deliveryModePersistent,
                    expiration: "\(SharelogBrokerDefaults.DEFAULT_ACKNOWLEDGE_EXPIRATION)",
                    type: SharelogBrokerDefaults.MESSAGE_TYPE_PUBSUB
                )
            )
        }
    }
extension BrokerMessageReponder { // <- my type that handles messages
    @discardableResult
    func handleDelivery(_ delivery: AMQPResponse.Channel.Message.Delivery, from channel: AMQPChannel, logger: Logger) async -> HandleDeliveryResult {
        await withSpan("\(delivery.exchange) \(delivery.routingKey) process", context: delivery.serviceContext, ofKind: .consumer) { span in
            span.attributes = [
                "messaging.system": "rabbitmq",
                "messaging.destination": .string(delivery.exchange),
                "messaging.rabbitmq.routing_key": .string(delivery.routingKey),
            ]
            span.attributes["messaging.conversation_id"] = delivery.properties.correlationID

            // ... process message ...
        }
    }
}
tweidema commented 2 months ago

@sliemeobn Thank you very much. That helped me get the span-headers passed back and forth. Still missing a piece reg. how to get the span-context on the receiving side set from those headers. So I get two distinct traces - one from the client and one from the server - they are not being seen as one trace. I can guess it has something to do with the extension you have on AMQPResponse.Channel.Message.Delivery, just can't figure out how to plug that in.

sliemeobn commented 2 months ago

in the snippet I posted above you see where the service context stored in the headers on publish here

headers: ServiceContext.current?.asAqmpHeaders,

and on the receiving side, the span is initialized with the context extracted from the message here

await withSpan("\(delivery.exchange) \(delivery.routingKey) process", context: delivery.serviceContext, ofKind: .consumer) { span in

this should forward the traceId as part of the serviceContext

tweidema commented 1 month ago

@sliemeobn Finally got around to looking at this again, and that was the piece I was missing. Have it working now! Thanks so much for your help.