redis / go-redis

Redis Go client
https://redis.uptrace.dev
BSD 2-Clause "Simplified" License
19.89k stars 2.34k forks source link

[otel] Redis not instrumenting in kafka consumer's processing #2812

Open nifrasinnovent opened 9 months ago

nifrasinnovent commented 9 months ago

I'm facing an issue of instrumenting the redis after i received a message from kafka consumer. But mongo & kafka producer are instrumenting in the same consumer's processing

Expected Behavior

redis should push the spans along with consumer's traceid

Current Behavior

except redis, kafka producer & mongo are instrumenting

Possible Solution

Seems like an issue of the context, but not sure

Steps to Reproduce

`
func redisInit(ctx context.Context) {
    RedisDB = redis.NewClient(&redis.Options{
        Addr:     viper.GetString("redis.host") + ":" + viper.GetString("redis.port"),
        Password: viper.GetString("redis.password"),
        DB:       0,
    })
    if err := redisotel.InstrumentTracing(RedisDB); err != nil {
        panic(err)
    }

    result, err := RedisDB.Ping(ctx).Result()
    if err != nil {
        fmt.Println("Error pinging Redis server:", err)
        return
    }

    fmt.Println("Redis Ping result:", result)
}

`

`
func (idc *KafkaController) ProcessMessage(topic string, msg *sarama.ConsumerMessage) <-chan bool {

done := make(chan bool)
propagators := propagation.TraceContext{}
ctx := propagators.Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))
propagators.Inject(ctx, otelsarama.NewConsumerMessageCarrier(msg))
message := string(msg.Value)
databases.RedisDB.Get(ctx, "1245DGF").Result()
go func() {
    defer close(done)

    switch topics.KafkaConsumer(topic) {
    case topics.KAFKA_IOT_LIVE_VEHICLE_TOPIC:
        data := topics.KafkaIotLiveVehicleTopic{}
        if err := json.Unmarshal([]byte(message), &data); err != nil {
            zap.Error(ctx, "vehicle controller ProcessMessage", err)
            return
        }
        helper.PrintConsumerTime(ctx, "kafka_cg START: MERGED_DEVICE_VEHICLE_TOPIC: "+data.Data.ImeiNumber)
        start := time.Now()
        if data.Data.Iot {
            err := <-idc.VehicleService.AddVehicleIdToVehicle(ctx, &data.Data)
            if err != nil {
                zap.Error(ctx, "AddVehicleIdToVehicle", err)
                return
            }
        }
        elapsed := time.Since(start).String()
        helper.PrintConsumerTime(ctx, "TRACING_TIME UTCTIME: AddVehicleIdToVehicle took: "+elapsed)
        idc.IotDeviceServices.ProcessIotDeviceData(ctx, data.Data)
        helper.PrintConsumerTime(ctx, "TRACING_TIME UTCTIME: ProcessIotDeviceData took: "+elapsed)
        helper.PrintConsumerTime(ctx, "kafka_cg END: MERGED_DEVICE_VEHICLE_TOPIC"+data.Data.ImeiNumber)

    case topics.KAFKA_EVENTS:

    default:
        break
    }

    done <- true
}()
return done

} `

Context (Environment)

Running on local Ubuntu machine

nifrasinnovent commented 9 months ago
func (th *tracingHook) ProcessHook(hook redis.ProcessHook) redis.ProcessHook {
    return func(ctx context.Context, cmd redis.Cmder) error {
        if !trace.SpanFromContext(ctx).IsRecording() {
          return hook(ctx, cmd)
         }
         ....
 }

I found, this is causing the issue. There is no SpanFromContext while extracting from NewConsumerMessageCarrier. But i did not understand the reason yet.