tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.75k stars 526 forks source link

Zipkin (Operational tracing) integration #605

Open high-stakes opened 4 years ago

high-stakes commented 4 years ago

Hi

Just letting you know I made zipkin integration for KafkaJS a while back https://github.com/openzipkin/zipkin-js/tree/master/packages/zipkin-instrumentation-kafkajs

It was a bit problematic without interception support, are there any plans to add that? Would also be useful for various message formats such as protobuf or avro to convert the messages automatically.

Nevon commented 4 years ago

It was a bit problematic without interception support, are there any plans to add that?

Can you describe what you mean by that?

Would also be useful for various message formats such as protobuf or avro to convert the messages automatically.

Since we don't know how the messages are encoded, we can't really do that within the scope of the Kafka client. However, maybe it would be interesting to expose a pluggable interface of some kind for encoding and decoding message values and keys, so that libraries like @kafkajs/confluent-schema-registry could plug into the client so that userland code gets automatically decoded messages.

In your particular case though, why do you need access to the decoded messages? Don't you propagate some kind of trace id through headers, which aren't encoded anyway?

high-stakes commented 4 years ago

Interception points around message production and consumption, like in the Java client. I did not mean to add support for those formats specifically just it's much easier to have a fixed interceptor API instead of proxying the consumers and producers ourselves in a hacky manner which opens up a lot of points of failure. The current compression codecs could be added as interceptors instead in a more streamlined manner also. One method for automatic encoding-decoding is passing the message format and type in headers. For tracing this is not needed as you mentioned as it uses plain-text headers, I am just listing some use-cases for the potential usability of it.

Nevon commented 4 years ago

Interception points around message production and consumption, like in the Java client. I did not mean to add support for those formats specifically just it's much easier to have a fixed interceptor API instead of proxying the consumers and producers ourselves in a hacky manner which opens up a lot of points of failure.

I see. We've actually discussed this in the past (there is an issue somewhere about a plugin interface), but as far as possible, we prefer these kind of plugins to use the same interface as the user does. I wouldn't say that it's a hacky solution, and it will give you the exact same API stability as some hypothetical interception API.

Taking the example use case from the Kafka Java documentation for ConsumerInterceptor being able to intercept and possibly mutate messages, such as decoding AVRO, I would implement that just as a simple higher-order function, which seems rather idiomatic for Javascript:

const intercept = eachMessage => async ({ message, ...rest }) => {
  const decodedMessage = {
    ...message,
    value: await decode(message.value)
    key: await decode(message.key)
  }

  return {
    ...rest,
    message: enhancedMessage
  }
}

await consumer.run({ 
  eachMessage: intercept(eachMessage)
})

In your case, I guess you want to start a trace when the user receives a message and stop the trace when it finishes:

const trace = eachMessage => async ({ message, ...rest }) => {
  const { headers } = message

  // simplified, but you know what I mean
  const traceId = headers[HttpHeaders.TraceId] || createRootId()

  const stop = startSpan(traceId)
  try {
     return eachMessage({ message, ...rest })
  } finally {
    stop()
  }
}

await consumer.run({
  eachMessage: trace(eachMessage) 
})

This kind of design gives a lot more flexibility for the user. For example, it would allow people to integrate it directly into eachBatch, which wouldn't be possible otherwise:

const trace = // same trace function as before

await consumer.run({
  eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
    const eachMessage = trace(async ({ message }) => {
      await processMessage(message)
      resolveOffset(message.offset)
    })

    for (let message of batch.messages) {
      await eachMessage({ message })
      await heartbeat()
    }
  }
})
chkp-talron commented 4 years ago

+1

it would be great if there was a single point in which we can instrument incoming messages instead of doing so in each consumer. for me it relates to open-tracing

any plan for this feature?

ankon commented 4 years ago

From a discussion in the slack channel about this topic: Here's an open telemetry plugin that might serve as inspiration for implementing tracing in other systems: https://github.com/aspecto-io/opentelemetry-ext-js/tree/master/packages/plugin-kafkajs

(see https://kafkajs.slack.com/archives/CF6RFPF6K/p1597371469246600)

krvajal commented 3 years ago

You can inject the trace in the message headers, see how Datadog does it https://github.com/DataDog/dd-trace-js/blob/master/packages/datadog-plugin-kafkajs/src/index.js#L29