Closed codefromthecrypt closed 3 years ago
Here is an example which doesn't work as expected The sender and receiver options creation is omitted:
// sender
sender = KafkaSender.create(senderOptions);
processor = DirectProcessor.<Tuple2<String,String>>create().serialize();
sink = processor.sink();
sender.send(processor.map(objects -> SenderRecord.create(new ProducerRecord<>("topic1", objects.getT1(), objects.getT2()),objects.getT2())))
.doOnNext(senderResult -> log.info(senderResult.correlationMetadata()))
.subscribe();
// receiver
receiver = KafkaReceiver.create(receiverOptions);
receiver
.receive()
.doOnNext(receiverRecord -> log.info("key: {}, value: {}", receiverRecord.key(), stringStringReceiverRecord.value()))
.subscribe();
Sending the message eg. from a rest call:
sink.next(Tuples.of("key", ""+System.nanoTime()));
Log messages:
2020-05-29 00:07:43.063 INFO [demo,,,] 31028 --- [ single-1] com.example.demo.DemoApplication : 88843267100400
2020-05-29 00:07:43.064 INFO [demo,,,] 31028 --- [ parallel-2] com.example.demo.DemoApplication : key: key, value: 88843267100400
as this uses static methods to start the flow, tracing would require some sort of static registration. Let's also say that tracing and context propagation are separate. Your question minimally needs context propagation, not necessarily tracing (ex making spans for the various stages)
sleuth has this which propagates the context (makes logging work) https://github.com/spring-cloud/spring-cloud-sleuth/blob/master/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ScopePassingSpanSubscriber.java
what this does is attach the scope on subscribe. In static case like the code above, hooks like I said are needed, and in sleuth this part attaches them https://github.com/spring-cloud/spring-cloud-sleuth/blob/a27d6346453a103dbe07cb32073f6dd8b92c5e1e/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/TraceReactorAutoConfiguration.java#L131
This is a bit not-spring like if you think about it, relying on static hooks which means some weird dancing on context refresh etc. In reactor-netty I opened an issue as ideally things that would be needed to propagate can be specified somehow at construction time https://github.com/reactor/reactor-netty/issues/1036
Things like ScopePassingSpanSubscriber arguably should live in reactor as multiple components would need this, and to do this well requires reactor expertise more than anything. For example, in a couple years no one has figured out how to do fusion, or even get most of the bugs done. the act of "try/finally" isn't the key expertise here.. rather, the actual lifecycle of reactive commands. While this paragraph is a separate discussion it is important, as it has limited ambitions the last couple years.
Hello Adrian, thank you for the explanation. I've tried to instrument the receiver and the sender with kafkaTracing.consumer()
and kafkaTracing.producer()
(see below). Now i'm getting the propagation of the context (B3 headers) down to the kafka receiver. The log messages look promising (enriched with spanid and parent) at least on the sender side and also seem to be properly traced to Zipkin. The only problem i can't figure out is the logging on the receiver side: The B3 header seems to be extracted by TracingConsumer but not propagated to MDC context. Here are the improved parts:
receiverSubscription = KafkaReceiver.create(consumerFactory, receiverOptions)
.........
senderSubscription = KafkaSender.create(producerFactory, senderOptions)
and
@Component
public class TracingConsumerFactory extends ConsumerFactory {
private final KafkaTracing kafkaTracing;
protected TracingConsumerFactory(KafkaTracing kafkaTracing) {
this.kafkaTracing = kafkaTracing;
}
@Override
public <K, V> Consumer<K, V> createConsumer(ReceiverOptions<K, V> config) {
return kafkaTracing.consumer(super.createConsumer(config));
}
}
@Component
public class TracingProducerFactory extends ProducerFactory {
private final KafkaTracing kafkaTracing;
protected TracingProducerFactory(KafkaTracing kafkaTracing) {
this.kafkaTracing = kafkaTracing;
}
@Override
public <K, V> Producer<K, V> createProducer(SenderOptions<K, V> senderOptions) {
return kafkaTracing.producer(super.createProducer(senderOptions));
}
}
How can i complete the circle and see the B3 information on a receiver (reactive consumer) side in a log so i can propagate (log and trace) it further (eg. WebClient call etc.)?
Regards, Gena
@ghenadiibatalski it is as far as I know the same problem. once in reactor you can hop threads and when that happens the context is not re-applied such that logging etc works. Perhaps this issue isn't titled properly, but the basic problem is propagation is not implemented once you pass control to reactor.
ConsumerFactory
indeed exists for use cases like wrapping the Consumer
. Keep in mind that reactor-kafka
uses reactor-core
under the hood and it needs to be instrumented as well if you rely on threadlocals and stuff like this.
Why is this issue closed? Where is the working example provided? I'm facing some issues trying to trace the Kafka Consumer using Reactor Kafka and Spring Sleuth. Can you please point out a working example of how to configure/instrument the tracing?
@ghenadiibatalski did you manage to complete the instrumentation of these reactive components? If that is the case, do you have a working example out there you can share?
Hi @jfsanchez91, my instrumentation doesn't work well and causes memory leaks. After initial instrumentation above one should propagate subscriber context or the like... I've heard, the latest Sleuth has an improved handling of reactive pipelines, eventually, you should take a look at these improvements. If you have success, please provide a hint here.
@leoandrea7 asked about sleuth and reactor-kafka
Motivation
someone using sleuth should be able to join traces coming from Kafka messages and propagate them at producer /sender time.
Desired solution
An example using a producer wrapper like so https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients
We may still need support for the scope subscriber in sleuth, so it is possible this integration will have to live in sleuth (to access that library) until there's a general reactor context library in reactor or brave directly.
Considered alternatives
Possibly the user could not use this library directly and instead an abstraction over that supports tracing.
Additional context
@jeqo knows a lot about tracing and kafka