reactor / reactor-kafka

Reactive Kafka Driver with Reactor
http://projectreactor.io
613 stars 227 forks source link

GH-321 / PR 325 - Observation propagation #368

Open janchristian-haddorp opened 10 months ago

janchristian-haddorp commented 10 months ago

Currently testing the enhancements added by PR #325. Great work!!

Is there a chance that observation context will be fully initialized on consumer side? I mean without manually calling API as described below?

When using HTTP calls tag/tap or Span.current() can be used on consumer side (Spring Boot 3.1.5). What is different when propagating observation via Kafka topic? Or how does Kafka limit observation propagation?

Because the reverse order nature of the Reactor context, the observation functionality on the KafkaReceiver is limited just to a single trace logging message for each received record. Restored tracing information will be correlated into logs if so configured for the logging system. If there are requirements to continue an observation on the consumer side, the KafkaReceiverObservation.RECEIVER_OBSERVATION API must be used manually in the record processing operator:

[source,java]
--------
KafkaReceiver.create(receiverOptions.subscription(List.of(topic)))
        .receive()
        .flatMap(record -> {
            Observation receiverObservation =
                KafkaReceiverObservation.RECEIVER_OBSERVATION.start(null,
                        KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
                        () ->
                                new KafkaRecordReceiverContext(
                                    record, "user.receiver", receiverOptions.bootstrapServers()),
                            observationRegistry);
            return Mono.just(record)
                    .flatMap(TARGET_RECORD_HANDLER)
                    .doOnTerminate(receiverObservation::stop)
                    .doOnError(receiverObservation::error)
                    .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
        })
        .subscribe();
--------
patpatpat123 commented 10 months ago

upvoting this

artembilan commented 10 months ago

When you mention Spring Boot and its WebFlux server, you don't compare apples with apples in this project. The Reactor Kafka is just a library when you use an API for your own configuration in contradiction to an opinion provided for us in Spring Boot. There you just declare your @PostMapping for a Mono method and WebFlux server takes care for its injection in the proper place together with all the useful infrastructure like security and/or observation. With this library you are on your own with that KafkaReceiver.create() and it API for processing. There is just no the point where end-user can inject his/her function for records processing and have an expected observation around.

At the same time we accept this ticket as a feature improvement since we also think that something like Mono<Void> receiveaAndProcess(Function<Mono<ConsumerRecord<K, V>>, Mono<Void>> userFunction) would do that trick hiding an observation API behind the scene.

sinfull1 commented 9 months ago

upvoting

maciej-gromul commented 7 months ago

In general it seems it's impossible to make "seamless" tracing with flux publishers (whether it's reactor kafka or other flux source) since the context is bound with publisher as whole, when it's created and not with it's records.

Thus tracing for flux will always require significant code changes, since we need to change the logic from providing handler to Flux.flatMap to actually making a wrapper to Flux.flatMap that wraps a handler for Mono.flatMap that would have access to correct context as that mono would have special context designed for that 1 record on it's queue upon receiving that event.