spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
997 stars 607 forks source link

SCS closing the observation scope before executing the user-provided function #2782

Closed maslailleron closed 1 year ago

maslailleron commented 1 year ago

I'm using the spring-cloud-stream-binder-kafka library to handle messages exchanged in the microservices environment and I notice a problem with micrometer.observation after migration from the spring-boot 3.0.8 to the spring-boot 3.0.9. The problem is that before migration to the new version of spring-boot observation in the DefaultContextSnapshot was:

Context snapshot: DefaultContextSnapshot{micrometer.observation={name=spring.cloud.function(null), error=null, context=name='spring.cloud.function', contextualName='null', error='null', lowCardinalityKeyValues=[spring.cloud.function.definition='importSaveEventHandler'], highCardinalityKeyValues=[], map=[class io.micrometer.tracing.handler.TracingObservationHandler$TracingContext='TracingContext{span=53ff897199d2ff81/51ac0549fa93c152}', class io.micrometer.core.instrument.Timer$Sample='io.micrometer.core.instrument.Timer$Sample@e4c50df', class io.micrometer.core.instrument.LongTaskTimer$Sample='SampleImpl{duration(seconds)=0.075336514, duration(nanos)=7.5336514E7, startTimeNanos=2255617003442595}'], parentObservation={name=spring.cloud.function(null), error=null, context=name='spring.cloud.function', contextualName='null', error='null', lowCardinalityKeyValues=[spring.cloud.function.definition='importsTransferSaveIn'], highCardinalityKeyValues=[], map=[class io.micrometer.tracing.handler.TracingObservationHandler$TracingContext='TracingContext{span=53ff897199d2ff81/fc3cef0dc19191b2}', class io.micrometer.core.instrument.Timer$Sample='io.micrometer.core.instrument.Timer$Sample@3435936b', class io.micrometer.core.instrument.LongTaskTimer$Sample='SampleImpl{duration(seconds)=0.076250186, duration(nanos)=7.6250186E7, startTimeNanos=2255617002642833}'], parentObservation={name=spring.kafka.listener(null), error=null, context=name='spring.kafka.listener', contextualName='null', error='null', lowCardinalityKeyValues=[spring.kafka.listener.id='KafkaConsumerDestination{consumerDestinationName='imports', partitions=0, dlqName='null'}.container-0'], highCardinalityKeyValues=[], map=[class io.micrometer.tracing.handler.TracingObservationHandler$TracingContext='TracingContext{span=53ff897199d2ff81/13e5ae1940a1a55f}', class io.micrometer.core.instrument.Timer$Sample='io.micrometer.core.instrument.Timer$Sample@39180743', class io.micrometer.core.instrument.LongTaskTimer$Sample='SampleImpl{duration(seconds)=0.077807016, duration(nanos)=7.7807016E7, startTimeNanos=2255617001138405}'], parentObservation=null}}}}

and it contains parentObservation which is spring.kafka.listener, but after migration is:

Context snapshot: DefaultContextSnapshot{micrometer.observation={name=spring.cloud.function(null), error=null, context=name='spring.cloud.function', contextualName='null', error='null', lowCardinalityKeyValues=[spring.cloud.function.definition='importSaveEventHandler'], highCardinalityKeyValues=[], map=[class io.micrometer.core.instrument.LongTaskTimer$Sample='SampleImpl{duration(seconds)=0.160052083, duration(nanos)=1.60052083E8, startTimeNanos=2000434300721519}', class io.micrometer.tracing.handler.TracingObservationHandler$TracingContext='TracingContext{span=59a4b3fce3198567/852a259423fb6d3a}', class io.micrometer.core.instrument.Timer$Sample='io.micrometer.core.instrument.Timer$Sample@5ad5dc55'], parentObservation={name=spring.cloud.function(null), error=null, context=name='spring.cloud.function', contextualName='null', error='null', lowCardinalityKeyValues=[spring.cloud.function.definition='importsTransferSaveIn'], highCardinalityKeyValues=[], map=[class io.micrometer.core.instrument.LongTaskTimer$Sample='SampleImpl{duration(seconds)=0.161990362, duration(nanos)=1.61990362E8, startTimeNanos=2000434298889185}', class io.micrometer.tracing.handler.TracingObservationHandler$TracingContext='TracingContext{span=59a4b3fce3198567/5ccd2090de4e967a}', class io.micrometer.core.instrument.Timer$Sample='io.micrometer.core.instrument.Timer$Sample@11bb5222'], parentObservation=io.micrometer.observation.NoopObservation@64d1a92c}}}

and it doesn't contain this parentObservation - in this case it is parentObservation=io.micrometer.observation.NoopObservation

I get DefaultContextSnapshot to be printed as follows:

final ContextSnapshotFactory SNAPSHOT_FACTORY = ContextSnapshotFactory.builder()
                .contextRegistry(ContextRegistry.getInstance())
                .clearMissing(false)
                .captureKeyPredicate(key -> true)
                .build();
final ContextSnapshot context = SNAPSHOT_FACTORY.captureAll();
System.out.println("Context snapshot: " + context);

in the kafka message handler function.

Related to Scoping issue using Spring Cloud Stream and Micrometer 1.10.9

The explanation of the problem by micrometer team is:

An issue was fixed in Micrometer/Reactor that kept scopes open even if they should have been closed. This issue was fixed in Micrometer 1.10.9 and now users see the intended behavior of Spring Cloud Stream (closing the scope before executing the user-provided function instead of keeping the scope open). So in Micrometer 1.10.9 and Spring Cloud Stream this works as intended and if one wants to change this behavior they should open a new issue for Spring Cloud Stream and request this feature.

In the context of this explanation, I have a question - why is the scope closed before executing a user-provided function which is, in my opinion, a part of kafka message handling ?

davidmelia commented 1 year ago

Hi any more update on this or any workarounds while we wait for a fix? I'm keen to upgrade to the latest Spring Boot but this is a blocker. Thanks

sobychacko commented 1 year ago

@davidmelia Did you try this workaround proposed by @jonatan-ivanov here? The problem is that you are using a reactive function with a non-reactive Kafka binder. As indicated in the referenced Micrometer issue, they fixed the underlying bug by closing the scope when the user function is exited. There is no async handoff to the reactive parts. cc @olegz do we have any workarounds from Spring Cloud Function where the Flux conversion occurs? Looks tricky, though.

davidmelia commented 1 year ago

@sobychacko thanks for the quick reply.

tap introduces a different problem - if you check out https://github.com/davidmelia/spring-boot-kafka-consumer-tracing/tree/boot_3.0.9_trace_issue_potential_fix you see the tid changes from producer to consumer where as it should be preserved to aid tracing.

[aid=spring-boot-kafka-consumer-tracing,tid=64d4944d66b2f77156c2d3d561f2bb63,sid=56c2d3d561f2bb63,cusname=] 67017 --- [ctor-http-nio-2] com.example.demo.KafkaController         : Message sent to binding = GenericMessage [payload={Key=Value @ 2023-08-10T07:39:57.398323Z}, headers={kafka_messageKey=5e438da6-9e75-4880-aabe-55f95b0adb09, id=5efa3f28-b0eb-2969-1745-f02b09350670, timestamp=1691653197398}]
[aid=spring-boot-kafka-consumer-tracing,tid=64d494416d4f5d9504e21127161c87b1,sid=04e21127161c87b1,cusname=] 67017 --- [container-0-C-1] c.e.demo.consumer.TestMessagesConsumer   : Kafka Binder: This log statement has the trace id
[aid=spring-boot-kafka-consumer-tracing,tid=64d494416d4f5d9504e21127161c87b1,sid=04e21127161c87b1,cusname=] 67017 --- [     parallel-3] c.e.demo.consumer.TestMessagesConsumer   : Kafka Binder: This log statement also has the trace id in micrometer 1.10.8 but NOT in micrometer 1.10.9

Regarding your comment "you are using a reactive function with a non-reactive Kafka binder" we would love to use the reactive binder but this also has an outstanding trace id issue: https://github.com/spring-cloud/spring-cloud-stream/issues/2647

olegz commented 1 year ago

@davidmelia While we're looking, I wanted to remind you that:

  1. Reactive functions are not your typical message handlers where the unit of work is message. In reactive functions unit of work is Flux itself and such function is invoked only once during the initialisation. After that the framework (SCF or Stream) plays no role at all.
  2. As @sobychacko mentioned using reactive functions with non-reactive binders is almost meaningless as you are not benefitting from any reactive features other than API itself
olegz commented 1 year ago

We just had a team discussion on this so here is what's going to happen.

By default, the scope is closed as soon as function is executed. That is the correct behavior. The bug that existed in micrometer did not do that, hence the fix. For non-reactive function it is obviously an intended behavior. But for the reactive function we do not know what the intended behavior is or should be, since we do not know user's intentions with regard to the scope of observation. Single message within the Flux? Several messages after buffering? What if there was no messages at all during let's say allotted time frame for buffering? I can go on. . . But what I wanted to emphasise is that with reactive functions we can't assume anything. That is the same reasons why we do not provide error handling nor retries for reactive function and instead asking user to fall back on reactive API itself for that functionality (see docs here). And that is what we're going to recommend to do for Observability as well.

We will get together in few weeks at SpringOne and will compile both documentation and sample project and test cases demonstrating how to handle Observability manually when implementing Flux stream (effectively an observability extension to reactive API), but we will not be providing any support for reactive functions in the context of Stream for reasons outlined above.

davidmelia commented 1 year ago

@olegz in summary do you mean:

1) The none reactive kafka binder will not support automatic tid propagation per item in a flux.

2) The reactive kafka binder will also not support automatic tid propagation per item in a flux.

3) You will provide examples, for the reactive kafka binder use case, to propagate tids via manual code.

Collectively it would be great to be clear on what use cases SCS does and doesn't solve. For example should I just be using the Spring reactive kafka template directly.

olegz commented 1 year ago

1 - Yes 3 - Yes, for the non-reactive binders (Rabbit and Kafka) As of the #2 and in general for the reactive binders, I would delegate this to @sobychacko

sobychacko commented 1 year ago

@davidmelia After the SpringOne conference, we will look into how we can support it from the reactive Kafka binder perspective. Thanks!

olegz commented 1 year ago

I will update the docs and publish sample shortly, but after discussing it with observability team here is what you do. . .

And now in your reactive code you use the tap method to inject Micrometer observation registry. Note, that you are in complete control of unit of observation (i.e. single item or set of items or. . .). For example (complete example), here the unit of observation is a single item (message) as if it was an imperative function.

@SpringBootApplication
public class DemoStreamApplication {

    Logger logger = LoggerFactory.getLogger(DemoStreamApplication.class);

    public static void main(String[] args) {
        Hooks.enableAutomaticContextPropagation();
        SpringApplication.run(DemoStreamApplication.class, args);
    }

    @Bean
    public Function<Flux<String>, Flux<String>> uppercase(ObservationRegistry registry) {
        return flux -> flux.flatMap(item -> {
            return Mono.just(item)
                             .map(value -> value.toUpperCase())
                             .doOnNext(v -> logger.info(v))
                             .tap(Micrometer.observation(registry));
        });
    }
}

Also, note the Hooks.enableAutomaticContextPropagation();

cmergenthaler commented 1 year ago

Hi @olegz, thanks for the detailed docs about observation! Just to make sure: The behaviour for observation is the same regardless whether the binder is reactive or non-reactive? So we will always need to handle observation manually with .tap? Or is this something that could be added by #2650 for reactive binders?

olegz commented 1 year ago

@cmergenthaler I am not sure what (if anything) is going to be added with reactive Kafka binder, but i want to make sure that everyone understand the actual issue. Sure, for certain cases we could potentially figure some observability support. But we can't do it for all cases and what is even more concerning is that adding support infrastructure to handle one scenario most likely will get in the way of another scenario. Remember, initial support of reactive message handlers did handle retries and error handling at the framework level, but it didn't last for long given the issue it had created for a more advanced reactive use cases (i.e., range(), limitRate()). For example we were able to accomplish this by wrapping each item in the incoming flux into a new Flux.just(message). And while it had satisfied some very basic use cases (use cases that would be better handled with imperative functions anyway), it immediately created a problem for more advanced use cases of range(), limitRate() etc. So, if we can't handle all we can't handle any - that is the mentality, and instead we'd rather work with respective teams (such as observability team in this case) to find an elegant solution to empower user to do exactly what they want to do.