Closed christopher-gilbert closed 5 years ago
I am fine with your suggestion of a simple delegating facade, but I believe it applies to the Serializer
s too.
Further to my original suggestion, if we go with a single factory, I think it should only be for the key/value serializer/deserializer (don't include the streams SerDe
s because kafka-streams
is only an optional dependency and we don't want to make it a hard dep.
Hi Gary,
As Monday is a public holiday in the UK, and I was meant to be racing but I'm injured, I've had a chance to get started on this. My initial KafkaConsumer/ProducerFactory approach that just takes in suppliers of Deserializers/Serializers is fairly trivial to implement but not so client friendly, as Spring doesn't tend to take an approach of passing in functions so it is a bit incongruous.
I have started on a factory approach as suggested, but also with an annotation driven implementation. I'll describe the details in terms of Consumer/Deserializer as I'm starting with that but will extend to Producer/Serializer if it seems like a good approach. The factory interface has methods to get Key/Value Serializers/Deserializers, but actually 8 methods, allowing a KafkaConsumerFactory bean name to be optionally passed in.
DefaultKafkaConsumerFactory will delegate as described before to the new type of KafkaConsumerFactory (I'm considering the best name for it), passing in an implementation of the SerializerDeserializerFactory that just implements 2 methods by returning the single key and value Deserializer instances (or null if they are not set).
Alternatively, anyone is at liberty to create their own SerializerDeserializerFactory and give it to an instance of the new type of KafkaConsumerFactory when creating a KafkaListenerContainerFactory. But a simpler approach for the user is to annotate Deserializer beans or component classes as KeyDeserializer or ValueDeserializer and just set the KafkaListenerContainerFactory's consumerFactory as an instance of the new type of KafkaConsumerFactory.
The annotations take an optional list of KafkaConsumerFactory bean names so that if necessary you can apply different deserializers to different KafkaConsumerFactories, and I hope I can figure out how to apply meta-annotations so they are automatically prototype scoped beans (I can't see a downside here of enforcing prototype behaviour - if you really don't want separate instances per KafkaConsumer then just use DefaultKafkaConsumerFactory). Instances of the new type of KafkaConsumerFactory will create KafkaConsumers with Deserializers from the factory, which may be specific to that KafkaConsumerFactory instance (which is BeanNameAware), default if a default is in the factory and no specific one is defined, or null if none is defined at all, in which case we fall back to default Kafka client code which tries to create an instance of a property specified Deserializer class.
I'm looking at Deserializers/ConsumerFactory first and will share my fork once there is something vaguely presentable to see, so you can comment on the approach. Broadly speaking this takes inspiration from the way the KafkaListener Annotation is processed into the framework - a new BeanPostProcessor that is declared in KafkaBootstrapConfiguration would instantiate an AnnotationDrivenSerializerDeserializerFactory and then when postProcessing KafkaListenerContainerFactory beans, set that factory on their KafkaConsumerFactory beans if they are instances of the new type AND their SerializerDeserializerFactory hasn't already been explicitly set. The BeanPostProcessor class will also implement a lifecycle hook that runs after all beanDefinitions are in the factory but before ListenerContainers are started (need to find the right hook) in which it would find beans of type Serializer and Deserializer in the beanFactory and if annotated, add their names to the AnnotationDrivenSerializerDeserializerFactory, mapped to specific KafkaConsumerFactories if present on the annotation, else mapped as default, so the factory can get prototype instances of the correct bean from the BeanFactory. Additionally in this hook I can check that any KafkaConsumerFactory bean names specified in annotations do actually exist in the bean factory and are the right type, and also warn of any double mappings.
Will see how I get on today - and will try to avoid falling down a code rabbithole! It should all be reasonably straightforward.
As you can see above, I have pushed a few commits to a branch in my fork. There is plenty of tidying up to do, a bit more work to finish the consumer side of things, and tests to add and fix, to check if this actually works.
https://github.com/christopher-gilbert/spring-kafka/tree/GH-1080-factory
@christopher-gilbert Thanks, please go ahead and create a PR from your branch when you are ready and we'll start the review process.
Cheers Gary, just finishing off the producer side of things - slightly more tricky because there's an inner class referenced outside so the delegate approach won't work without potential impact. Subclassing does work and all tests pass so I will do that. Only issue in my mind is the resulting inconsistency between default consumer factory, which delegates and default producer factory, which extends. I think this is probably acceptable as the two sides have many other differences anyway - it is tempting to refactor the default consumer to extend as well for consistency of approach but I'm not going to do that now - instead I'll raise the pull request when done (hopefully this weekend) and take on any comments from there.
Thanks, Chris
I have just looking into all the Deserializer
implementation in our classpath and none of them does something meaningful in their close()
implementation.
So, we might not even consider this as an issue. If that one is there, than a recommendation to go with properties configuration instead of an explicit instance injection. That way a new instance is going to be create by Kafka Client from the provided class every time we want a new instance of the Consumer
.
This discussion and contribution is too long to digest that i would go with a simple warning in JavaDocs and Docs in regards of the injected Deserializer
s into the DefaultKafkaConsumerFactory
.
For any peculiar case we always can provide our own KafkaConsumerFactory
implementation instead.
Sorry, that my vision contradicts what you have discussed here, but I'd like to consider all the possible ways before we make a final decision what and how to break.
Thanks for understanding.
Hi Artem,
I understand your point, and agree that all options should be considered, but I would strongly argue that there is an issue, though there are much simpler ways to address this.
Although you do not have any Deserializers with a meaningful close method, the fact is that Deserializers and Serializers are closable and so any implementation is perfectly correct if they take some action that renders them unusable after closure.
Spring Kafka code currently has Default Consumer (and Producer factories) that contain significant behaviour, but limit the user's ability to choose Deserializers that may be uncommon but are perfectly valid (ie ones that implement close meaningfully AND do not have no-arg constuctors) while not providing any extension points or hook points to get around that problem. We could provide our own KafkaConsumerFactory implementation, but would lose that valuable behaviour (and ithere is even more in DefaultKafkaProducerFactory).
I do have a current problem - I have to make use of an encrypting wrapper Deserializer that does not have a no-arg constructor (it needs the wrapped Deserilializer to be passed in) and does have a close implementation. I don't own that implementation but have to use it. I could (and have) discussed with the owning team ways of avoiding closing the way they have, but they quite reasonably argue that they are just closing a Closable class, and the actual problem is with spring kafka.
The approach I took in this pull request did cause quite extensive movement of existing code in order to allow different factory implementations, and the failed test is indicative of a need to extend rather than delegate in the consumer factory to keep the test code backward compatible. I can completely understand that anyone would be highly reluctant to merge in large changes from an unknown contributor with little presence on GitHub - especially when their first pull request contains a test failure!
Could I suggest an alternative much simpler, almost trivial, change, which was my initial thought - just add one overloaded constructor to default producer and consumer factories that takes in suppliers of Deserializers/Serializers. In that way users have the existing options of passing in single instances to be shared, allowing instances to be created from configuration, and a new option of implementing supplier functions - this would cover all valid uses.
If that sounds reasonable, I can ditch my current branch, create a new branch with that small change and raise a pull request for that.
Many thanks, Chris
To be honest I haven't looked into your changes because I really see that simpler solution with Supplier
option in the DefaultKafkaConsumerFactory
😄
Can we fix that for now and then in the separate issue we can discuss whatever you would like to improve yet.
All other your arguments are absolutely valid and there is nothing to object on my side unless a simpler solution approach with the Supplier
😉
Thank you for understanding!
Thanks Artem,
I'll put together the simple fix in a separate branch and raise a pull request - thanks for the discussion.
Best wishes, Chris
This relates to an issue I posted on StackOverflow. Essentially, DefaultKafkaConsumerFactory has a single instance of each deserializer (key and value) which is shared between all consumers that it creates. The problem is that consumers have a lifecycle which cascades down to the deserializers - if a consumer is closed then it calls close on the deserializers, and as a consequence a consumer can be subsequently created by the factory with a closed deserializer.
My initial thought for addressing this in a backward compatible way would be to move the bulk of code from DefaultKafkaConsumerFactory into a new class which allows clients to set a
Supplier<Deserializer>
rather than a Deserializer. DefaultKafkaConsumerFactory would retain the same interface and behaviour by delegating to the new class passing a Supplier that just returns the single Deserializer instance.Gary Russell commented on StackOverflow with a different approach involving a factory for DeSerializers/Serializers and SerDes which is a bigger solution than that required for the single issue raised, but would obviously have a wider applicability and be potentially more useful. If I submit a contribution, I will carefully consider the pros and cons of each approach - any comments on the subject are very welcome.