Open k-wall opened 1 year ago
I've been kicking this idea around in my head over the last week. Keen to hear would others think? @racheljpg @devguyio @robobario @SamBarker
What if instead we made mutation separate from filtering? We could generate a set of ApiMessageMutator
s (and probably some Schema
classes so we could provide typesafe mapping/filtering functions), whose job is to apply these mutations to an ApiMessage
. Like:
FetchRequestMutator.builder()
.mapping(FetchRequestSchema.topicName(), function)
.filtering(FetchRequestSchema.nestedRecord().topicData(), filterFunc)
.build().apply(apiMessage)
edit: maybe more like
FetchRequestMutator.builder()
.mappingTopicName(function)
.mutatingNestedRecord().filteringTopicData(filterFunc).end()
.build().apply(apiMessage)
Sort of thing.
As the filters are hot paths through the code (especially Fetch/Produce RPCs), I think we'd want to make sure that the filtering/mutating code performs as well as possible. With *RequestMutator idea you are expressing here, would this tree walk the requests/responses at run time? Would this perform sufficiently well?
One weakness with the current MultiTenant filter implementation is that it completely ignores RPCs versioning. Some RPCs have completely changed shape (for instance OffsetFetchRequest
where the message changed from expressing a single group to many groups at version 8). We don't capture that at all. I think having the on*Request|Response
methods express version specific mutations/filtering would give a lead to a more robust implementation, which is more obvious to reason about. This will be very tricky to get right unless we use code-generation.
I really don't want to hand code something that looks like this:
public void onOffsetFetchRequest(RequestHeaderData data, OffsetFetchRequestData request, KrpcFilterContext context) {
if (data.requestApiVersion() < 8) {
// mutate request.groupId();
...
} else {
// mutate request.groups()
...
}
context.forwardRequest(request);
}
The alternative is have the multitenant filter implementation ignore version. In the case of onOffsetFetchRequest
, it would need to mutate both groups() and groupId() and have adequate null safe guards around null groupId().
public void onOffsetFetchRequest(RequestHeaderData data, OffsetFetchRequestData request, KrpcFilterContext context) {
// handles the < 8 case
Optional.ofNullable(request.groupId()).ifPresent(g -> applyTenantPrefix(context, request::groupId, request::setGroupId, false));
request.groups().forEach(requestGroup -> {
applyTenantPrefix(context, requestGroup::groupId, requestGroup::setGroupId, false);
});
}
As the filters are hot paths through the code (especially Fetch/Produce RPCs), I think we'd want to make sure that the filtering/mutating code performs as well as possible. With *RequestMutator idea you are expressing here, would this tree walk the requests/responses at run time? Would this perform sufficiently well?
What do you mean by walking them? Introspecting the fields at runtime? No.
I was imagining generating code that knows all the fields of each message type and how to mutate/filter them (recursively).
So if you have a mutator function configured for field X, the mutator would have code generated to get X, apply the function to the value and set X with the result. If you don't have a mutator set for a field it doesn't have to do anything for that field.
In the examples, the mutator could be kept in a constant and applied to all messages of that type.
How would your idea look for OffsetFetchRequest with its two expressions of groupid I highlight above? In your idea, would we still hand write onOffsetFetchRequest in the multi-tenant filter?
How would your idea look for OffsetFetchRequest with its two expressions of groupid I highlight above? In your idea, would we still hand write onOffsetFetchRequest in the multi-tenant filter?
Yes, I could imagine making the mutator apiVersion aware but you'd still have to hand write each mutator saying apply transform X at version Y. Or as you said set up a mutator for both fields and it's a no-op if the field is null.
I think I'm missing how your generated filter would handle this apiVersion case, would it be aware from the json schema which fields are relevant at each apiVersion?
I'd be tending towards your idea of doing a null-safe mutate of both fields.
With a filter generation it could potentially handle new RPCs arriving in kafka-clients with string fields called topicName
etc, which is nice. My idea can't do that.
I think I'm missing how your generated filter would handle this apiVersion case, would it be aware from the json schema which fields are relevant at each apiVersion?
Yes, that's the (very embryonic) idea. I'm hoping we could extract the information we need from the model
I am un-convinced by the need for code generation here especially at this stage of development.
I am un-convinced by the need for code generation here especially at this stage of development.
I'm certain not seeing it as a now. I just wanted to put the idea out there.
Do you think we just continue to ignore versions and just try to write agnostic filter code that handles all the fields, regardless of which version of the the RPC they come from? Maybe as the we are only talking about a few fields topic|group|transactional this will work and the RPCs haven't changed that much, maybe it will be okay.
Let's put it in concrete terms using an example. OffsetFetchRequest
used a GroupId
at the top level from versions 0-7 and a Groups
collection beyond. Version agnostic code for this would need to look like this. Note the Optional.ofNullable
so the code can process either form without suffering a NPE.
// 8+
Optional.ofNullable(request.groups()).ifPresent(groups -> groups.forEach(requestGroup -> {
applyTenantPrefix(context, requestGroup::groupId, requestGroup::setGroupId, false);
Optional.ofNullable(requestGroup.topics())
.ifPresent(topics -> topics.forEach(topic -> applyTenantPrefix(context, topic::name, topic::setName, false)));
}));
// 0-7
Optional.of(request.groupId()).ifPresent(groupId -> applyTenantPrefix(context, request::groupId, request::setGroupId, false));
I'm pondering if the multi-tenant filter could be code-generated rather than hand coded. There are patterns emerging:
CreatableTopicResult
)String
)I think we might be able to use a recursive descent algorithm that walks the JSON definitions, emitting the necessary code. We might use JSON paths (as arguments to the code generator) so we can tell it which object paths to target and perhaps a signal about the type of transform to apply or filter to apply.
The mechanism would also generate RPC version aware code (I'm thinking a
switch
statement with a case per version range expressed in the RPC definition).The code generator would need to be sufficient flexible to generate code overriding only those filter methods which need overriding. It would also need to generate a correct set of imports and the
implement
statement.I'm wondering if we'd have the code gen emit an
MultiTenantFilter
interface with theon*Request|Responses
implemented as default methods. The actualMultiTenantTransformationFilter
class would implement that interface. That would allowMultiTenantTransformationFilter
to re-override the filter methods on a case-by-case basis if additional business logic is required. It could then use asuper.onCreateTopicsResponse()
call to access the original code.I think this could reduce the manual effort.