salesforce / reactive-grpc

Reactive stubs for gRPC
BSD 3-Clause "New" or "Revised" License
824 stars 118 forks source link

[reactor-grpc] Reactor context does not propagate if stub is used #195

Open krakowski opened 4 years ago

krakowski commented 4 years ago

Description

Whenever a stub is used in a chain together with Reactor's Context API the provided context does not propagate correctly. My guess is, that using subscribe() within ClientCalls is preventing the context from propagating up the chain.

Example

package com.salesforce.reactorgrpc;

import com.salesforce.grpc.testing.contrib.NettyGrpcServerRule;
import org.junit.Rule;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.util.context.Context;

import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

public class ReactorContextPropagationTest {

    @Rule
    public NettyGrpcServerRule serverRule = new NettyGrpcServerRule();

    private static class SimpleGreeter extends ReactorGreeterGrpc.GreeterImplBase {
        @Override
        public Mono<HelloResponse> sayHello(Mono<HelloRequest> request) {
            return request.map(HelloRequest::getName)
                    .map(name -> HelloResponse.newBuilder().setMessage("Hello " + name).build());
        }

        @Override
        public Mono<HelloResponse> sayHelloReqStream(Flux<HelloRequest> request) {
            return request.map(HelloRequest::getName)
                    .collect(Collectors.joining("and"))
                    .map(names -> HelloResponse.newBuilder().setMessage("Hello " + names).build());
        }
    }

    @Test
    public void contextDoesNotPropagate() {
        serverRule.getServiceRegistry().addService(new SimpleGreeter());

        ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(serverRule.getChannel());
        Mono<HelloRequest> req = Mono.just(HelloRequest.newBuilder().setName("reactor").build());

        Mono<HelloResponse> resp = req
                // This assertion will fail 
                .doOnEach(signal -> assertThat(signal.getContext().getOrEmpty("key")).isNotEmpty())
                .compose(stub::sayHello)
                // This assertion won't fail
                .doOnEach(signal -> assertThat(signal.getContext().getOrEmpty("key")).isNotEmpty())
                .subscriberContext(Context.of("key", "dummy"));

        StepVerifier.create(resp)
                .expectNextMatches(response -> response.getMessage().equals("Hello reactor"))
                .verifyComplete();
    }
}
krakowski commented 4 years ago

I could fix this in the one-to-one case by using flatMap instead of subscribe, though I'm not sure if this is correct since I removed the SubscribeOnlyOnceLifter.

public static <TRequest, TResponse> Mono<TResponse> oneToOne(
        Mono<TRequest> monoSource,
        BiConsumer<TRequest, StreamObserver<TResponse>> delegate) {
    try {
        return monoSource.flatMap(request -> Mono.create(callback -> {
            delegate.accept(request, new StreamObserver<TResponse>() {
                @Override
                public void onNext(TResponse tResponse) {
                    callback.success(tResponse);
                }

                @Override
                public void onError(Throwable throwable) {
                    callback.error(throwable);
                }

                @Override
                public void onCompleted() {
                    // do nothing
                }
            });
        }));
    } catch (Throwable throwable) {
        return Mono.error(throwable);
    }
}
krakowski commented 4 years ago

I implemented a failing test for this issue.

dyangelo-grullon commented 4 years ago

@rmichela a team of mine is thinking of using this library, but it's concerning that this bug has been opened since October 2019 without a response from the maintainers. Is there any documentation on how to properly propagate the reactor context? Is this an actual bug?

rmichela commented 4 years ago

Is there any documentation on how to properly propagate the reactor context?

I've found the documentation and examples for using reactor context to be sorely lacking.

Is this an actual bug?

I'm not entirely sure. The absence of "me too" reports has me wondering if this is a real issue.

rmichela commented 4 years ago

Do you use reactor context? If so, are you experiencing this issue?

rmichela commented 4 years ago

Following up. Is this still an issue for you?

krakowski commented 4 years ago

Hi all,

just rebased my tests from last year on the current master branch and ran them again.

tests

I use Reactor's Context API to propagate metadata (e.g. JWT or tracing information) inside reactive sequences. Using the gRPC Context API directly within reactive sequences is not possible for me, since the context is stored within a ThreadLocal.

The default implementation will put the current context in a ThreadLocal.

After switching threads the context would thus no longer be accessible.

I also tried to propagate tracing information between separate services over gRPC. For this purpose I wrote a ClientInterceptor which transfers tracing information between Reactor context and gRPC headers (TracingInterceptor). Unfortunately, accessing the Reactor context within the ClientInterceptor always yielded an empty Mono.

krakowski commented 3 years ago

Since I will most likely be using this library again in the near future, I took another look at the issue. Shortly after my last message last year, the following post was written regarding context loss detection, which discusses in which cases the Reactor context can be lost.

Basically, in case of a subscription, the context must be passed on via the CoreSubscriber#currentContext() method, otherwise the interface's default implementation takes effect, which returns an empty context.

In addition, there is a hook that detects the loss of a context and can be enabled using Hooks.enableContextLossTracking();.

Therefore, I think this issue could be solved easily by implementing the mentioned method in ReactorSubscriberAndClientProducer, ReactorSubscriberAndServerProducer and SubscribeOnlyOnceLifter.

@rmichela What do you think?


Just tried to run the included integration tests using mvn -pl :reactor-grpc-test test which does not execute any tests (other modules work fine). Am I missing something? I rarely use Maven.

mvn test -pl :reactor-grpc-test
...
[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] 
[INFO] Results:
[INFO] 
[INFO] Tests run: 0, Failures: 0, Errors: 0, Skipped: 0
Freddv2 commented 2 years ago

Hello, we're having the same issue here. Activating enableContextLossTracking() show that the context was lost at:

.transform(Operators.lift(new SubscribeOnlyOnceLifter<TResponse>()));
    at reactor.core.publisher.ContextTrackingFunctionWrapper.lambda$apply$0(ContextTrackingFunctionWrapper.java:50)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoLift] :
    reactor.core.publisher.Mono.transform(Mono.java:4705)
    com.salesforce.reactorgrpc.stub.ClientCalls.oneToOne(ClientCalls.java:57)
Error has been observed at the following site(s):
    |_     Mono.transform ⇢ at com.salesforce.reactorgrpc.stub.ClientCalls.oneToOne(ClientCalls.java:57)
code-uri commented 1 year ago

Hello everyone,

Any Update one this. any work around?

code-uri commented 1 year ago

Hello again, I tried reproducing it but the test is passing. am I missing something?

code-uri commented 1 year ago

I tried the test again using subscribeOn() the test failed.

krakowski commented 1 year ago

Hey @code-uri,

I rebased my tests from 3 years ago again on top of the current master to check if this was fixed (120 commits since then) and they still seem to fail. You can try out the tests in my personal branch which reproduces this issue.

In my case the results are the same as 3 years ago:

Screenshot from 2022-11-04 13-29-45

I haven't found any workaround for this issue.

denychen commented 10 months ago

Currently on version 1.2.4, and I'm running into this bug as well.

rmichela commented 10 months ago

I've got to be honest, I've tried my best with my he edge cases of Reactor context propagation, and its interactions with gRPC thread pools. But I'm out of my depth.

I could really use some help with this issue.

code-uri commented 10 months ago

I've got to be honest, I've tried my best with my he edge cases of Reactor context propagation, and its interactions with gRPC thread pools. But I'm out of my depth.

I could really use some help with this issue.

I agree that the context propagation is not working. Apart from the test case you mentioned, Could you explain the use case? for example in my case I wanted to pass on the information to grpc layer to in the form of headers like this. I hope this will help.

 ReactorGreeterGrpc.ReactorGreeterStub targetStub = ...
    Mono<HelloRequest> req = Mono.just(HelloRequest.newBuilder().setName("reactor").build());
    req.flatMap(helloRequest -> {
        return Mono.deferContextual(contextView -> {
            Metadata header = new Metadata();
            Metadata.Key<String> key =
                    Metadata.Key.of("KEY", Metadata.ASCII_STRING_MARSHALLER);
            header.put(key, contextView.getOrDefault("key", "dummy"));

            targetStub = targetStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
            targetStub.sayHello(helloRequest);
        }
    }).contextWrite(context -> Context.of("key", "dummy")).block();          
code-uri commented 10 months ago

@rmichela Also reactor now supports ThreadLocal propagation magic you can use that aswell to read the values in from ThreadLocal in grpc layer.

rmichela commented 10 months ago

Is there a doc link you can point me to?