Closed spring-projects-issues closed 4 years ago
Marcin Grzejszczak commented
Hi!
I guess a combination of your requirements will be done via Sleuth. There is a work in progress done for this issue (https://github.com/spring-cloud/spring-cloud-sleuth/issues/344) also on the reactor side (https://github.com/reactor/reactor-core/pull/447). That way reactor will be able to pass the context within a flux. Sleuth will be able to reuse that context to pass tracing information. The next step in Sleuth will be to stop propagating spans but start propagating a context whose part of which would be a span (https://github.com/spring-cloud/spring-cloud-sleuth/issues/143). Then I guess all of your requirements would be satisfied. A similar requirement comes from the spring-metrics project. So we will have to somehow combine all of these into a single project that will propagate context within the process / network boundaries.
Rossen Stoyanchev commented
Doron Gold thanks for raising this.
Since in Reactive Web a request is not bound to a single processing thread..
Indeed for this reason it is a feature that could only come from the reactive library which is aware of how an async flow was constructed and can correlate at the time of execution. As Marcin Grzejszczak mentioned there is a Reactor feature in progress coming in Reactor 3.1 M3 and once there is enough of it ready I'll update this ticket with some more details. Note also that this could only work if the same reactive library is in use which in the case of WebFlux means using Reactor. That said I'm not aware of such a feature in RxJava to begin with.
Rossen Stoyanchev commented
I'm scheduling for #20239 and we'll see what if we anything we need in the Spring Framework in addition to the Reactor context feature.
Doron Gold commented
I believe that the related work being done on reactor-core and other spring projects (such as spring-sleuth) will undoubtedly help. However, I think that in addition to that, there should be a high-level, generic solution, similar to RequestContextHolder in spring-mvc.
+We have a very basic and general use-case for this:+ As a micro-service which is part of a micro-service architecture I want to be able to receive various request headers, save them as request context, then pass them on to other micro-services to which I call. Without having the request context ability, at the point where I call other services (using WebClient) I can’t pass on to these services the same headers that I received. The content of these headers doesn’t really matter, it can be trace ID for logging, currently logged in user, or anything else. Furthermore, additional such headers may be added in the future as our system evolves. The point is that a single user-request initiates a flow that spans across multiple micro-services and It’s essential to be able to retain data for crosscutting concerns. This data should not be lost neither between method calls nor between http calls to other services.
Rossen Stoyanchev commented
Any solution in WebFlux around this will have to rely on underlying support from Reactor. There are tickets in reactor-core and work in progress but to validate the API smaldini and I have been experimenting in a sample project created specifically for this ticket.
You will see a WebFilter
inserts context and later a service (invoked from a controller) accesses the context. Note the feature expects the context-accessing component to operate in the same Mono/Flux chain as the context-inserting component. This is the case with WebFlux but it does mean that the service has to return a Flux/Mono and/or compose on a Flux/Mono input argument (e.g. request body). Take a look at what we have currently give us some feedback based on your use cases.
Rossen Stoyanchev commented
Moving to GA to allow more time for feedback. At any rate this is currently not Spring-specific, i.e. it's a Reactor feature. Doron Gold it would be great hear what you run into so we can evolve and harden the API..
Doron Gold commented
Hi,
We integrated this feature into our project.
At the top level we have a WebFilter
that builds a context.
It basically takes data from the request headers, builds a AcmeRequestContext
object and puts it in the context. Like so:
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
// build acmeRequestContext
Mono<Void> filteredChain = chain.filter(exchange);
return mono.contextStart(context -> context.put(AcmeRequestContext.class, acmeRequestContext));
}
At the bottom level we have several clientServices, one for each microservice we have to communicate with.
A clientService holds a WebClient
which is instantiated and configured at the constructor.
We configure the WebClient
with an ExchangeFilterFunction
that takes the context from the current Mono (the one returned by WebClient
) and adds its content to the outgoing request header.
It looks like so:
/**
* A client for the Acme Telemetry microservice
*/
@Component
public class AcmeTelemetryClientService {
private WebClient webClient;
@Autowired
public AcmeTelemetryClientService (ContextExchangeFilterFunction contextExchangeFilterFunction) {
webClient = WebClient.builder()
.filter(contextExchangeFilterFunction)
.build();
}
}
/**
* Adds a header to the outgoing request.
* The header key is {@link AcmeRequestContext#CONTEXT_HEADER_KEY}
* The header value is the content of {@link Mono#currentContext()} of the {@code Mono} that is returned by this filter.
* <p>
* The purpose of this filter is to propagate the request context
* from one microservice to another microservice.
* <p>
* This filter should be included on each {@link org.springframework.web.reactive.function.client.WebClient} that
* sends requests from one acme microservice to another acme microservice.
*/
@Component
public class ContextExchangeFilterFunction implements ExchangeFilterFunction {
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
return Mono.currentContext()
.flatMap(context -> {
ClientRequest newRequest = ClientRequest.from(request)
.header(AcmeRequestContext.CONTEXT_HEADER_KEY,
context.get(AcmeRequestContext.class)
.toJson())
.build();
return next.exchange(newRequest);
});
}
}
This works very well.
It allows for passing the context from the top level (WebFilter
) to the bottom level (WebClient
) without even having the developer that writes the business logic aware of this.
It suits most of our use cases, since we usually have a single Mono chain from top to bottom:
each method returns a Mono and the calling method continues the chain by using Mono operators.
However, there is one particular use-case which is very prevalent: logger invocations.
Because a logger invocation does not receive a Mono and is not part of a Mono chain, we still have the problem that the logger can't implicitly "know" what the current context is.
So for every call to the logger we have to wrap the entire block of code with Mono.currentContext()
.
Something like this:
//boilerplate wrap to get the current context
Mono.currentContext()
.flatMap(context -> {
// code (potentially long and complicated) that returns a mono
logger.info(context.get(AcmeRequestContext.class), "something happened");
//more code
logger.info(context.get(AcmeRequestContext.class), "something else happened");
}
}
Notice that in addition to the boilerplate, we have to implement a custom logger, because we have to pass in the context. The boilerplate discourages developers from logging as much as possible (which is bad for maintenance).
With Spring MVC, we simply used the MDC or NDC features of log4j to insert the relevant request data (request ID, etc.) into ThreadLocal. We did it in a HandlerInterceptor
, which is the equivalent of WebFilter
It would be really great if you could find a way to have a logger implicitly able to include context data.
I would at least hope for not having to implement a custom logger myself (with all signatures having the additional context
parameter).
Rossen Stoyanchev commented
Thanks for the comments.
I'm setting this for after 5.0 since we need more to time to discuss. I'll get back to you on your question. Good to know the context feature gets you pretty far.
Ryan Dunckel commented
Have there been any follow up discussions on this and/or alternative solutions proposed? I'm stuck in the same situation that @dorongold
summarized well above. I also looked into log4j2's support of a custom "ContextDataInjector", but, without the logger being part of the subscriber chain, I can't find a way to feed the reactor context into the log context.
Alfred Thomas commented
I have the same requirement as Doron and Ryan, and I have been using Doron's code as a base for my own. But I have the same problem as Ryan, as in I don't have an elegant solution for getting the context to slf4j. I have to set trace-ids and I previously used a filter with MDC to accomplish this.
Ryan Dunckel commented
Bumping this again.... any update or more thoughts on this?
Michael Kohout commented
Bump. Are there any simple adapters or examples out there where people are attaching values to a MDC/ThreadLocal for logging?
Marcin Grzejszczak commented
I still don't understand why wouldn't Sleuth be a proper solution to this. We already set logging values there and pass data via threads etc.
Simon Baslé commented
I have looked for a solution that would be simpler to use than the hook and scheduler decorator solutions and have come up with something that can probably cover 80% of use cases (with less hidden caveats).
It involves using the Context
via doOnEach
, with a bit of logging statement boilerplate. See my blog for a presentation of said solution: https://simonbasle.github.io/2018/02/contextual-logging-with-reactor-context-and-mdc/
VP commented
i use Schedulers.setFactory and override Factory.decorateExecutorService to provide a ScheduledExecutorService that propagates MDC.
Schedulers.setFactory(new Factory() {
@Override
public ScheduledExecutorService decorateExecutorService(String schedulerType,
Supplier<? extends ScheduledExecutorService> actual) {
// guava WrappingScheduledExecutorService
return new WrappingScheduledExecutorService(actual.get()) {
@Override
protected <T> Callable<T> wrapTask(Callable<T> callable) {
return new MDCCallable(callable);
}
};
}
});
static class MDCCallable<T> implements Callable<T> {
private Map<String, String> ctxMap;
private Callable<T> target;
public MDCCallable(Callable<T> target) {
this.target = target;
ctxMap = MDC.getCopyOfContextMap();
}
@Override
public T call() throws Exception {
MDC.setContextMap(ctxMap);
return target.call();
}
}
any thoughts?
rick gong commented
There is a good solution, thanks to spring-cloud-sleuth-core:
Marcin Grzejszczak commented
I keep saying that (about the Sleuth solution) but I get constantly ignored by the reporter so maybe after @rickgong
's comment it might be considered as a proper answer.
raghunktn commented
Marcin Grzejszczak I am working on a project which does not use spring-cloud. how would the sleuth solution be helpfull on my case?
Rossen Stoyanchev commented
Note that for 5.2 we're going to address #21746. I suppose this issue could be considered superseded by it? Although I'm not entirely sure since there have been many comments.
There is a working solution to put data from reactor context to mdc using hooks by @ivansenic https://www.novatec-gmbh.de/en/blog/how-can-the-mdc-context-be-used-in-the-reactive-spring-applications
And a java version for this solution: https://github.com/archie-swif/webflux-mdc/tree/master/src/main/java/com/example/webfluxmdc
There is a working solution to put data from reactor context to mdc using hooks by @ivansenic https://www.novatec-gmbh.de/en/blog/how-can-the-mdc-context-be-used-in-the-reactive-spring-applications
We recognized that the solution above has a problem that a context is not correctly propagated in case of exceptions in the reactive sequence. Just as a note for people to know.
We recognized that the solution above has a problem that a context is not correctly propagated in case of exceptions in the reactive sequence.
@ivansenic could you please elaborate on this problem or point us to a more correct solution?
@ivansenic could you please elaborate on this problem or point us to a more correct solution?
I think the best person to explain this is @carlosbarragan as he was the one to point the problem to me after I wrote my blog. He might have additional solutions as well.
@carlosbarragan Carlos can you elaborate, please?
@torstenwerner @ivansenic @carlosbarragan I have tested few common error scenarios, with or w/o error handling. It seems to be working fine in this sample
Do you recall by chance what was the faulty case? Thank you!
@archie-swif If I recall correctly, the problem with this approach arises when an error occurs immediately in a call.
mono.map{}
.flatMap{}
.onErrorMap{} // if an error occurs before one of the maps operations is called, the context is not copied
@archie-swif I think a possible solution would be to try to copy the context on MdcContextLifter.onError()
I haven't tried myself, but that could probably work
FYI: @ivansenic
@archie-swif
I also tested this a bit and @carlosbarragan was correct. The Mono.onErrorResume()
is not copying the MDC context correctly. And yes, to overcome this you should also copy the context to MDC on MdcContextLifter.onError()
.
Thank you @ivansenic !
👍 to @ivansenic's approach (gratitude to you and @archie-swif as well). We used this in conjunction with a WebFilter
to copy request context information (conserved headers, correlation ids, etc) in to the Context
like so:
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange)
.subscriberContext((context) -> {
context = context.put("foo", "bar")
return context;
});
}
Looks promising so far.
@scottjohnson @archie-swif please be aware that the context should remain as small as possible. Instead of adding a new key-value entry, you should simply add a new key that references a map. That is what is recommended in the JavaDocs of the Context.
So you need to do something like this:
.subscriberContext((context) -> {
Map<String, String> map = new HashMap();
map.put("foo","bar");
context = context.put("myContextKey",map)
return context;
});
Could this be an option to retrain the MDC context?
.subscriberContext((context) -> {
context = context.put("mdc", MDC.getCopyOfContextMap())
return context;
});
👍 to @ivansenic's approach (gratitude to you and @archie-swif as well). We used this in conjunction with a
WebFilter
to copy request context information (conserved headers, correlation ids, etc) in to theContext
like so:public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) { return chain.filter(exchange) .subscriberContext((context) -> { context = context.put("foo", "bar") return context; }); }
Looks promising so far.
Can you share some example, I am trying to access context value in some services class but getting Empty context
Mono.subscriberContext().flatMap(c -> {
c.getOrDefault("version", "1");
return c.get("version");
}).subscribe();
@SudBisht I guess, it's impossible to do outside of WebFlux controllers. @ivansenic and @archie-swif do you know how to use it outside of controllers. At the first glance, it looks very ugly solution to pass a map everywhere from controller to low level abstractions.
@SudBisht you have to be sure that you are retrieving the Context
from the same stream. Where are you calling Mono.subscriberContext()
and, more important, where are you creating the context?
@SudBisht Did you get a way out for this one ? I'm stuck with the same issue.
@carlosbarragan When you say that we should be retrieving the Context
from the same stream, do you mean we should be populating the Context
and fetching it in a service in the same reactive chain ?
Thanks everyone. The original request here has been fulfilled through a combination of the Reactor Context API and the ServerWebExchangeContextFilter
from #21746 that makes ServerWebExchange
available in all parts of the reactive execution chain.
In regards to logging with MDC, please check this earlier comment and the linked blog post from a member of the Reactor team. I've also created https://github.com/reactor/reactor-core/issues/1985 to for Reactor to provide official guidance. If you have further comments on the topic, please comment there.
@torstenwerner @ivansenic @carlosbarragan @archie-swif We have followed https://github.com/archie-swif/webflux-mdc/tree/master/src/main/java/com/example/webfluxmdc and we are logging a context called correlationId which is being sent in the header of a particular api request. But also for other simultaneous api calls with no correlationId header, the same correlationId gets logged. For ex: When we hit GET /offers endpoint with header correlationId 'abc’ , if there are simultaneous api calls for GET /metrics endpoint without any header correlationId, we get the same correlationId ‘abc’ for /metrics calls also. How can we get rid of this issue?
In continuation to my first comment- Also we are using the HttpWebHandlerAdapter debug logs(one for api path, other for success/error code) as inbound logs for our service, but we are getting the context for only 1 of the 2 HttpWebHandlerAdapter logs. Does anyone know how do I ensure that both the logs get the context?
@ankitp-kr The code you went for was written a long time ago, I am not sure if there were any changes in the reactor api and impl since then.. You should check first the official MDC guide as stated in the repo: https://projectreactor.io/docs/core/release/reference/#faq.mdc. You can also add logging statements in each subscriber method to print the current context, I guess there is some cleanup missing..
@torstenwerner @ivansenic @carlosbarragan @archie-swif We have followed https://github.com/archie-swif/webflux-mdc/tree/master/src/main/java/com/example/webfluxmdc and we are logging a context called correlationId which is being sent in the header of a particular api request. But also for other simultaneous api calls with no correlationId header, the same correlationId gets logged. For ex: When we hit GET /offers endpoint with header correlationId 'abc’ , if there are simultaneous api calls for GET /metrics endpoint without any header correlationId, we get the same correlationId ‘abc’ for /metrics calls also. How can we get rid of this issue?
Hi @ankitp-kr ,
I'm also trying to use MdcContextLifter
proposed by @archie-swif , since unfortunately the approach recommended in https://projectreactor.io/docs/core/release/reference/#faq.mdc is not sufficient for my use case ( I need to produce some logging that come from internal libraries invoked within each operator, and I would like to have this libraries log the MDC variables as well)
So I've opted for a similar approach as the one @archie-swif used, but using a closeable MDC for adding my correlationId, could you maybe try this to see if you get rid of the problem in where simultaneous invocations, one with correlation ID and one without it, would print same value?
class MdcContextLifter<T> implements CoreSubscriber<T> {
CoreSubscriber<T> coreSubscriber;
public MdcContextLifter(CoreSubscriber<T> coreSubscriber) {
this.coreSubscriber = coreSubscriber;
}
@Override
public void onSubscribe(Subscription subscription) {
coreSubscriber.onSubscribe(subscription);
}
@Override
public void onNext(T obj) {
injectMdc(() -> coreSubscriber.onNext(obj));
}
@Override
public void onError(Throwable t) {
injectMdc(() -> coreSubscriber.onError(t));
}
@Override
public void onComplete() {
injectMdc(() -> coreSubscriber.onComplete());
}
@Override
public Context currentContext() {
return coreSubscriber.currentContext();
}
/**
* Adding correlationId in MDC as closeable statement.
* @param task
*/
private void injectMdc(Runnable task) {
final Object mdcVal = coreSubscriber.currentContext().getOrDefault("correlationId", null);
if(mdcVal != null ) {
try(MDC.MDCCloseable ignored = MDC.putCloseable("correlationId", mdcVal.toString())) {
task.run();
}
} else {
task.run();
}
}
}
@torstenwerner @ivansenic @carlosbarragan @archie-swif We have followed https://github.com/archie-swif/webflux-mdc/tree/master/src/main/java/com/example/webfluxmdc and we are logging a context called correlationId which is being sent in the header of a particular api request. But also for other simultaneous api calls with no correlationId header, the same correlationId gets logged. For ex: When we hit GET /offers endpoint with header correlationId 'abc’ , if there are simultaneous api calls for GET /metrics endpoint without any header correlationId, we get the same correlationId ‘abc’ for /metrics calls also. How can we get rid of this issue?
Hi @ankitp-kr ,
I'm also trying to use
MdcContextLifter
proposed by @archie-swif , since unfortunately the approach recommended in https://projectreactor.io/docs/core/release/reference/#faq.mdc is not sufficient for my use case ( I need to produce some logging that come from internal libraries invoked within each operator, and I would like to have this libraries log the MDC variables as well)So I've opted for a similar approach as the one @archie-swif used, but using a closeable MDC for adding my correlationId, could you maybe try this to see if you get rid of the problem in where simultaneous invocations, one with correlation ID and one without it, would print same value?
class MdcContextLifter<T> implements CoreSubscriber<T> { CoreSubscriber<T> coreSubscriber; public MdcContextLifter(CoreSubscriber<T> coreSubscriber) { this.coreSubscriber = coreSubscriber; } @Override public void onSubscribe(Subscription subscription) { coreSubscriber.onSubscribe(subscription); } @Override public void onNext(T obj) { injectMdc(() -> coreSubscriber.onNext(obj)); } @Override public void onError(Throwable t) { injectMdc(() -> coreSubscriber.onError(t)); } @Override public void onComplete() { injectMdc(() -> coreSubscriber.onComplete()); } @Override public Context currentContext() { return coreSubscriber.currentContext(); } /** * Adding correlationId in MDC as closeable statement. * @param task */ private void injectMdc(Runnable task) { final Object mdcVal = coreSubscriber.currentContext().getOrDefault("correlationId", null); if(mdcVal != null ) { try(MDC.MDCCloseable ignored = MDC.putCloseable("correlationId", mdcVal.toString())) { task.run(); } } else { task.run(); } } }
The server does not start with this change:
Sorry, its starting now , must be something in my environment. Can some one please share 'when' the MDC gets cleared before the processing of the next request in the solution proposed by @archie-swif
Doron Gold opened SPR-15680 and commented
It is necessary to have a way to associate attributes with the current request and be able to fetch these attributes during the request lifetime.
The ability to set and get such attributes is essential for use-cases such as:
In webmvc this is possible by calling a static methods on
org.springframework.web.context.request.RequestContextHolder
. A developer could implement HandlerInterceptor which intercepts all incoming requests and sets contextual attributes on RequestContextHolder. These attributes are then accessible from anywhere in the code (via a static method) during the request lifetime.Since in Reactive Web a request is not bound to a single processing thread, a simple use of ThreadLocal (what RequestContextHolder does) is not enough. The Spring framework should offer a more sophisticated solution.
A simple but extremely intrusive workaround would be to pass in ServerWebExchange to all methods in all components - starting from controller endpoints down to all services. But even if this is done, there is no good way to have a logger take attributes form the request (for the purpose of including trace/correlation ID) without implementing a wrapper around the logger class that receives an extra parameter.
Reference URL: https://stackoverflow.com/questions/43975761/how-to-get-the-context-of-the-current-request-in-spring-webflux
Issue Links:
21746 Create a WebFilter for ServerWebExchange Reactor Context
20108 Upgrade to Reactor 3.1 RC1 (including Reactive Streams 1.0.1)
19 votes, 34 watchers