Open adurai81 opened 5 years ago
Here is my code -
Consumer 1
public static void subscribe() {
final KafkaConsumer<String, String>
consumer =
new KafkaConsumer(consumerrConfig);
BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
(operationName, consumerRecord) -> "SignalsConsumer";
TracingKafkaConsumer<String, String> tracingKafkaConsumer = new TracingKafkaConsumer<String, String>(consumer, tracer(), consumerSpanNameProvider);
tracingKafkaConsumer.subscribe(Arrays.asList(listenTopic));
while (true) {
ConsumerRecords<String, String> records = tracingKafkaConsumer.poll(100);
for (final ConsumerRecord<String, String> record : records) {
String leadJson = record.value();
Lead lead = convertJsonToLead(leadJson);
System.out.printf("CustomerEmail: %s; LeadName: %s; CustomerId: %s; Record-Key: %s; Record-Partition: %d; Record-offset: %d\n",
lead.getCustomerEmail(), lead.getLeadName(), lead.getCustomerId(),
record.key(), record.partition(), record.offset());
publish(lead); // This calls Producer 2
}
}
}
Producer 2
private static void publish(Lead lead) {
BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
(operationName, producerRecord) -> "EnrichedLeadProducer";
Producer<String, String> producer = createProducer();
TracingKafkaProducer<String, String> tracingKafkaProducer = new TracingKafkaProducer<String, String>(producer, tracer(), producerSpanNameProvider);
lead = enrichLead(lead);
String leadJsonStr = convertLeadToJsonStr(lead);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(publishTopic,
lead.getLeadName(), leadJsonStr);
tracingKafkaProducer.send(record);
tracingKafkaProducer.close();
}
@burkaa01 @thall any thoughts on this please ?
@adurai81 in Consumer you can get SpanContext from ConsumerRecord:
SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
Then
you can use this spanContext as a parent for new span which should be active when you publish.
Another solution is to create a new method in producer: send(record, spanContext)
to use spanContext as parent of producer span
Thanks @malafeev for the response.
For option-1: I tried this -
Tracer tracer = tracer();
tracer.buildSpan("EnrichedLeadProducer").asChildOf(consumerSpanContext).start();
TracingKafkaProducer<String, String> tracingKafkaProducer = new TracingKafkaProducer<String, String>(producer, tracer, producerSpanNameProvider);
lead = enrichLead(lead);
String leadJsonStr = convertLeadToJsonStr(lead);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(publishTopic,
lead.getLeadName(), leadJsonStr);
tracingKafkaProducer.send(record);
tracingKafkaProducer.close();
the value for consumerSpanContext
is fetched from -
SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer());
For option-2:
Are you suggesting a possible change to overload send
API in TracingKafkaProducer
with this signature ?
public Future<RecordMetadata> send(ProducerRecord<K, V> record, SpanContext span) {
Thanks for the idea @malafeev
I could inject the consumer's SpanContext into the 2nd producer with this.
tracer.inject(consumerSpanContext, Format.Builtin.TEXT_MAP, new LeadHeadersMapInjectAdapter(record.headers()));
@malafeev Should inject
in TracingKafkaUtils
be exposed public so that consumers don't have to work-around this it interms of injecting the parent SpanContext ?
@adurai81 this example has the multi-hop-through-Kafka scenario: https://github.com/PacktPublishing/Mastering-Distributed-Tracing/tree/master/Chapter05
@adurai81 yes, I propose to overload send API in TracingKafkaProducer:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, SpanContext parentSpanContext) {
So provided parentSpanContext
will be used as parent for producer span.
I don't think inject in TracingKafkaUtils should be public.
Thanks @yurishkuro I'll take a look at the example.
@malafeev , yes it would be of help to provide overload. How do I take it forward. Will this issue be used to provide the overloaded api ?
@adurai81 let's use this issue for that
done in #62
Thanks @malafeev , I'll test this out and let you know.
@adurai81 have you tested it?
I've producer-1 (SignalsProducer) which is consumed by consumer-1 (SignalsConsumer). This consumer then enriches the record and then it becomes producer-2 (EnrichedLeadProducer) which is then consumed by consumer-2 (EnrichedLeadConsumer).
These leads to 2 different traces (each of which having 2 spans) looking like this -
Sample spans within the first trace (similarly the 2nd trace) -
Expectation: In this case there should be only 1 trace will 4 spans.
Question : How do I propagate the span context from Consumer-1 to Producer-2 ?
Reviewed issue - https://github.com/opentracing-contrib/java-kafka-client/issues/49
The above issue seems to be very similar to my ask (may be slightly different). Is there an example of how the context could be propagated from consumer to producer ?