Closed HelvetiaAppDev closed 2 years ago
/cc @cescoffier, @ozangunalp
I need to check why we activate the request context. It's not clear when it should be deactivated, as the record processing is async (at least the acknowledgment will be) and so the thread can be used to manage other records. Basically, I would rather disable the request context when receiving messages.
@mkouba WDYT?
Hm, I cannot find the place where the request context is activated. Let me try to dig deeper...
@cescoffier The request context is activated by SmallRye Context Propagation, and not only once for a given kafka message. I can see the similar stack traces:
io.quarkus.arc.impl.RequestContext.activate(RequestContext.java:119)
io.quarkus.arc.runtime.context.ArcContextProvider$ContextSnapshot.begin(ArcContextProvider.java:151)
io.smallrye.context.impl.SlowActiveContextState.<init>(SlowActiveContextState.java:29)
io.smallrye.context.impl.SlowCapturedContextState.begin(SlowCapturedContextState.java:34)
io.smallrye.context.impl.SlowCapturedContextState.begin(SlowCapturedContextState.java:13)
io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:20)
io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.invokeEventHandler(UniOnItemConsume.java:77)
io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.onItem(UniOnItemConsume.java:42)
io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor.onItem(UniOnItemTransform.java:43)
io.smallrye.mutiny.operators.uni.UniEmitOn$UniEmitOnProcessor.lambda$onItem$0(UniEmitOn.java:34)
io.smallrye.reactive.messaging.kafka.commit.KafkaThrottledLatestProcessedCommit.lambda$received$6(KafkaThrottledLatestProcessedCommit.java:199)
That's why I wasn't able to find the reason in reactive messaging itself.
When context propagation is enabled, we capture the context in the subscription and restore it in each 'Mutiny stage'. We could try to configure context propagation to not propagate the request scope using Smallrye specific annotations, but I'm not totally sure where I would need to put them.
@FroMage any idea?
Any update on this? Should we go forward assuming, that we cannot use RequestContext (the way we thought) when processing Kafka records?
No progress, but you're right, you should not use the RequestContext when processing records as there is no real "request".
This is probably a very subjective topic, but we do feel that there should be a new request for every record that is processed. That just seems logical and useful to us. We'd appreciate it, if you guys could find a way for this to work - thank you.
We have the same problem here and I have the same opinion as @HelvetiaAppDev: It would feel natural to have a new request for every consumed record.
So for the moment, is the recommended workaround to terminate and reactivate the request scoped context at every record-consumation with Arc.container().requestContext().terminate()
and Arc.container().requestContext().activate()
?
We have tried to use "context", they lead to terrible effects, especially in performance. We recommend using Message metadata, as they are propagated along the pipeline.
Sorry, I don't get what you mean by
We recommend using Message metadata, as they are propagated along the pipeline.
. My use case is: The Kafka-consumer calls our business-logic. In this business-logic, some values are read from a @RequestScoped
-bean. I need to extract this info from the header of the Kafka-record and fill it to this bean. It's a "generic mechanism" and I don't want to pass the info as a parameter.
Maybe you could explain your solution a little bit more?
What you do won't work, because you get the request context from the very beginning of the app (and that's a bug/leak that we need to fix).
Message metadata are objects associated with a message (see https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/3.13/concepts.html). They are propagated during the processing (if you do not break the chain).
If I understand you right, @cescoffier, it is not recommended to save metadata of a message in a request context?
What would then be the recommended way of doing data access control during message processing for an authenticated user, that is contained in the message header?
My straight forward approach would be:
What are the issues with that implementation?
In your first comment you state, that the ack of the message is async, thus the thread might process other records. Would these other records overwrite the request context of the message to ack? Shouldn't context propagation handle that for us?
Thanks in advance for more information on this.
@anchou-bockhorn The rule is simple: do not use the request scope when dealing with Kafka messages. It will not do what you expect it do to.
Currently, the only way to propagate data in your pipeline is to inject a custom metadata in the message. You can do thins like that:
@Incoming("in")
@Outgoing("out")
public Message<T> addMetadata(Message<T> m) {
return m.addMetadata(new MyMetadataWithMyData(...));
}
Then, you can retrieve it using:
message.getMetadata(MyMetadataWithMyData.class).orElse(null);
We are working on another solution to propagate data implicitly using "context locals", but that's a work in progress and I cannot give an ETA yet.
@cescoffier Sorry to bother you again, but I'm not sure if we are talking about the same. I'm talking about @Incoming
only, not outgoing of any kind. And I'm not sure what exactly this has to do with metadata. Consider the following snippet:
@ApplicationScoped
public class MyConsumer {
@Inject
MyBusinessService service;
@Inject
MyRequestScopedBean requestScopedBean;
@Incoming("my-channel")
@ActivateRequestContext
public void process(ConsumerRecord<Void, String> msg) {
requestScopedBean.setValueFromHeaders(msg.headers());
service.doSomethingUseful(msg.value());
}
}
MyBusinessService
(or any subsidiary bean) would then read values from MyRequestScopedBean
. If that would really be impossible, so how would all the other things like transactions, etc. work in MyBusinessService
? And how would the metadata of the message help in that situation?
Thanks again for your answer.
@wicksim I'm not totally sure what it will do now. Initially, it was taking the wrong request scoped. Now, it may create a new one and disable it after the process method. @mkouba can you confirm?
This should be fixed now with #27802.
To access request scoped beans, one will need to explicitly use @ActivateRequestContext
on incoming methods. Note that the request context will be active only during the method call, and will be destroyed when the processing completes.
Hello, I know it's a closed issue but I would like to understand why what are the reason of hitting a performance issue if we try to activate the request context from the beginning of flow :
We have tried to use "context", they lead to terrible effects, especially in performance. We recommend using Message metadata, as they are propagated along the pipeline.
Also how is it different to use the Context API ? And what type of references can we put in this Context ?
In a nutshell, we have a user API that is supposed to work seamlessly with quarkus-resteasy and also with quarkus-reactive-messaging. This implementation uses @RequestScoped object and we are expecting the same programming experience for both. So it mean that quarkus-reactive-messaging does not offer a constant CDI programming experience in Quarkus ?
Regards Kevin
Unfortunately, it's a bit more convoluted than this. We have been thinking of adding a "MessageScope" in reactive messaging, but the request scope got severely used in the past (when used in reactive messaging), and it caused a lot of issues. Depending on the signature, the request scope can be very different, and for methods consuming multis
, it's the request scope of the application (so it is only terminated when the application stops). It has been the source of many bugs and issues. Thus, we decided to have something slightly different to avoid confusion. You can enable the request scope per method and it will be terminated when the method completes.
Many thanks for you answer.
One subsidiary question : can we store heap references in the Context ? Explanation : We have a set of PublisherDecorator that need to define metadata that can be consumed with CDI beans and other @Incoming/@Outgoing that inject those beans and use it. I would like to use the Context to store/retrieve references to rebuild each requestcontext CDI bean before it gets injected in a method using @ActivateRequestContext: the Context has you said, would be well computed/propagated for any kind of methods. Annoying : it's probable than a RequestScoped Bean creation is not a free operation and would be seen as a bottleneck at some point.
Regards
If you are using the Context API you wouldn't need to use @ActivateRequestContext
in messaging methods right?
You can keep object references in Context, it is not serialized and if you avoid keeping references to it you shouldn't have any leaks.
Do not forget that we cannot guarantee that it won't be accessed concurrently.
Typically, it will happen if you use @Blocking(ordered=false) or @RunOnVirtualThread.
If you are using the Context API you wouldn't need to use
@ActivateRequestContext
in messaging methods right?
Yes, it could be done by using the Context (this is what I intent to do behind the scene), but I would like to use CDI to have consistency in our code base.
Do not forget that we cannot guarantee that it won't be accessed concurrently. Typically, it will happen if you use @Blocking(ordered=false) or @RunOnVirtualThread. Ok I bet this is by design.
And I just saw we probably can't use it with Subscriber/PublisherDecorator because they handles Multis to implement the decoration, reading at this part of the documentation it says it doesn't cope with multis.
I could use an interceptor but I don't know the name of the channel in advance.
It depends when / where you want to access it. If your callback is invoked during on "onItem" event, the context will be set and accessible. If you try to access it during the assembly time, it will not work (as there is no event):
public Multi<X> reshape(Multi<X> multi) {
// context not accessible here
return multi.onItem().transform(x -> {
// context accessible here
)};
}
You know why it can be confusing :-D
ok nice, thanks for the precision. Very helpful
Ok so we finally we implemented the activation of the request context in a PublisherDecorator and closes it inside SubscriberDecorator (before sending the message into a connector) using the RequestContext API :
We saw that the execution of multi@#transform is done using a DuplicatedContext and this context is maintained by Quarkus all along the execution (inbound connector) -> (outbound connector) of the same message. In case we have a @nonblocking execution, I assume we will be provided with a brand new duplicated context during the execution, so the request scope is not propagated and raises an error -> which is fine.
So on a pure technical point of view: do you know who is propagating the request context in my case ?
If it is context propagation we may fall into this below consideration and we know we are paying a price for that :
The MicroProfile Context Propagation specification addresses this issue. It saves and restores the values stored in thread locals whenever we switch to another processing unit. However, that model is expensive. Context locals (also known as duplicated context) is another way to do this and requires fewer mechanics.
Describe the bug
When Kafka processor receives record the request context gets automatically activated. The problem is the data of RequestScope objects are not cleared.
Expected behavior
Every record received via Kafka starts with no/new request context.
Actual behavior
Every record received via Kafka uses same request context. When increasing the counter variable at the request context object the counter increases with every record. Expected behaviour would be the counter increases from zero to one for each record.
How to Reproduce?
use kafka-quickstart project
When creating new quotes via the UI and processing them in class QuotesProcessor in method process(String quoteRequest) using following comand:
Arc.container().requestContext().isActive()
you can see the request context is active, but thats wrong, he should not be active. Thats why you can see the increase of the counter in class Counter when receiving multiple quotes.Manually deactivating and activating the context in class QuotesProcessor in method process(String quoteRequest) works:
Arc.container().requestContext().deactivate();
Arc.container().requestContext().activate();
Anyway it should work also without adding these rows.kafka-quickstart.zip
Output of
uname -a
orver
No response
Output of
java -version
Java(TM) SE Runtime Environment 18.9 (build 11.0.5+10-LTS)
GraalVM version (if different from Java)
No response
Quarkus version or git rev
2.4.1.Final
Build tool (ie. output of
mvnw --version
orgradlew --version
)maven 3.6.3
Additional information
No response