SkyAPM / go2sky

Distributed tracing and monitor SDK in Go for Apache SkyWalking APM
https://skywalking.apache.org/
Apache License 2.0
448 stars 122 forks source link

Tracing event source and sink in knative eventing #134

Closed snowwolf007cn closed 2 years ago

snowwolf007cn commented 2 years ago

Is your feature request related to a problem? Please describe. I'm using Knative to build an event bus, and I want to trace the event source and sink. Because Knative only supports Zipkin and Jaeger, and the Skywalking Zipkin receiver is still experimental, I had to manually trace the event source and sink. I can parse sw8 headers in the event source, and put them into cloud events context attributes extension. the event source sends events to Knative eventing broker through broker ingress and stores events into an MQ service, and sink to the sink service with broker dispatcher and broker filter. after that, I can extract sw8 header values from cloud events context attributes extension. After that, I try to build an EntrySpan using CreateEntrySpan. the first param of the method is the type of context.Context. Because I can't control how brokers do tracing, and I can't make broker dispatcher propagate sw8 header in the request context. I need to build the context manually with info extracted from cloud events attribute extensions. I tried to

sc := &go2sky.SpanContext()
sc.decode(sw8String)
ctx := context.Background()
ctx = context.WithValue(ctx, key, sc)

I read the source code and found that the key is an interface of type go2sky.ctxKey, which is not a public type. So I can't build a context manually, because I can't generate the key for SpanceContext.

Describe the solution you'd like I think I need a method to generate the key of context value, and then I can't build the EntrySpan and ExistSpan manually.

wu-sheng commented 2 years ago

https://github.com/SkyAPM/go2sky#crossing-process

This doc provides you the way to inject context into context. Then you can use as needed. I don't think exposing other methods is a good idea. Because you will see in SkyWalking, there are more than one header key actually.

Of course, sw8 is required at least.

snowwolf007cn commented 2 years ago

@wu-sheng thanks for your reply. I have read the doc of the crossing process, but it doesn't help. So I need to clarify the scenario. I have a service named "SinkService" for example. It receives requests from another Knative component Broker Dispatcher. Because I don't have control of the source code of Broker Dispatcher, and Dispatcher only consumes messages from an MQ, such as Kafka, so I need to put sw8 header info into Kafka message as a string, and the Dispatcher will send it to SinkService in request body, not in request header. And r.Context() won't include any information about sw8, because there isn't any sw8 info included in the request context from Broker Dispatcher to SinkService. I can only parse the Skywalking tracing data from the request body, because it's stored in MQ message, and I need to inject these data manually into context, not extract from the request context, in code, which are two lines below

ctx := context.Background()
ctx = context.WithValue(ctx, key, sc)

the problem here is: I can't generate the key of context value, because the key is the type ofsky2go.ctxKey, it's a private type used internally in SDK.

wu-sheng commented 2 years ago

Kafka is a very common case, Java has its support clear. You don't need to put context in the message, usually you append new header into the Message's header. Kafka and Pulsar both support this.

I think you misunderstand the purpose of this API, it doesn't require you and system has to propagate the context we provided, it is just a container, like a map to host the data. You can read from it if you want, and set to any place you like, if and only if your Kafka doesn't support message header.

wu-sheng commented 2 years ago

Even in a very special case, you could create a context as a container and communication channel between your codes and Go2Sky. The reason of this is, what I have shown you, we don't want you to manipulate anything about setting keys and encoding values.

snowwolf007cn commented 2 years ago

Let's make the question simple and straight. Given I have an application, which composes of 4 parts:

  1. Service 'TestSource' write in Golang as an HTTP web service deployed as a Knative serving service, which receives a request with the body of 'application/json', the request body will be serialized and put into cloud event "data", and send a cloud event to Knative eventing broker
  2. A Knative eventing broker receive cloud event from Service 'TestSource'
  3. Service 'TestSink' write in Golang as an HTTP web service deployed as a Knative serving service, which receive cloud event and log the event data set in TestSource.
  4. Knative event trigger delivery events to Service 'TestSink'.

can you show me an example code on how to implement https://github.com/SkyAPM/go2sky#crossing-process in such a scenario, and link TestSink and TestSource together in tracing.

snowwolf007cn commented 2 years ago

After struggling for hours, the problem seems solved:

  1. create an exit span on the side of the event source and pass sw8 header value in the cloud event's context attribute extension:
    
    span, err := tracer.CreateExitSpan(ctx, bUrl.Path, bUrl.Host, func(headerKey, headerValue string) error {
    // Since cloud event context attribute extension can only receive alphanum string as attribute name, and skywalking 
        // will generator a header names 'sw8-correlation', we need to rename the attribute to 'sw80correlation'
        headerKey = strings.Replace(headerKey, "-", "0", -1)
    ce.SetExtension(headerKey, headerValue)
    return nil
    })

if err != nil { log.Panicf( "failed to create skywalking exit span, error:%v, context:%v, operation:%s, peer:%s", err, c.Request.Context(), bUrl.Path, bUrl.Host, ) } res := client.Send(ctx, ce) if cloudevents.IsUndelivered(res) { log.Panicf("failed to send cloudevent: %v", res) }

if !cloudevents.IsACK(res) { log.Panicf("failed to send cloudevent: %v", res) }

log.Println("eventing sent") span.End()

2. add middleware to cloud event HTTP client on the sink side to extract cloud event context attribute extension of sw8 and put sw8 headers in the request, and now you can create entry span from request context:
```golang
func skywalkingMiddleware(next nethttp.Handler) nethttp.Handler {
    return nethttp.HandlerFunc(func(w nethttp.ResponseWriter, r *nethttp.Request) {
                // cloud event context attribute extension will be sent to upstream as special headers prefix with "Ce-",
                // we could extract sw8 info as shown
        sw8Value := r.Header.Get("Ce-Sw8")
        sw8CorrelationValue := r.Header.Get("Ce-Sw80correlation")
        r.Header.Add("sw8", sw8Value)
        r.Header.Add("sw8-correlation", sw8CorrelationValue)

        span, ctx, err := tracer.CreateEntrySpan(r.Context(), r.URL.Path, func(headerKey string) (string, error) {
            return r.Header.Get(headerKey), nil
        })

        if err != nil {
            log.Printf("unable to create entry span: %s", err.Error())
            next.ServeHTTP(w, r)
            return
        }
        r = r.WithContext(ctx)
        next.ServeHTTP(w, r)

        span.End()
    })
}