spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Set custom topic configs in NewTopic object at KafkaTopicProvisioner.class #330

Closed moscovig closed 6 years ago

moscovig commented 6 years ago

Currently it is impossible to pass any custom param to the topic creation method, and set the configs in the NewTopic object.

We are trying to set the message.format.version config in a topic level, when creating a topic.

Topic creation source code at KafkaTopicProvisioner class

NewTopic newTopic = new NewTopic(topicName, effectivePartitionCount,
(short) configurationProperties.getReplicationFactor());
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));

It would be helpful to define a yml configuration params for topics, such as it is with the producer and consumer.

Thanks

artembilan commented 6 years ago

Can't KafkaAdmin and NewTopic as a @Bean help you so far as a workaround? https://docs.spring.io/spring-kafka/docs/2.1.4.RELEASE/reference/html/_reference.html#_configuring_topics

garyrussell commented 6 years ago

@artembilan Good suggestion, but that will only work if the binder is at the top level, it won't work with multiple binders (via environment).

artembilan commented 6 years ago

Well, my idea here to create topics on Kafka broker in advance. So, that is exactly what end-user can do in his/her application context. When Kafka Binder comes to its configuration, it will realize that topic is already there and will do nothing.

Not sure how user-defined beans on the matter is related to the child application context of the binder... That part isn't clear to me, @garyrussell . I don't talk here about what we have to do as a fix. That is a workaround suggestion strictly for end-user request.

I might miss something though, won't mind

Thanks

garyrussell commented 6 years ago

Let's say he has two Kafka clusters and 2 binders; the main application context (where your suggested NewTopic @Beans would be) has no Kafka configuration.

I agree it's a good workaround for the simple case, though, but this will not be hard to fix.

moscovig commented 6 years ago

We can create the Topics like you suggested @artembilan but we already have the KafkaTopicProvisioner bean alive and it contains live and none accessible AdminClient in it, the logic is there but it doesn't support the topic level parameters...

artembilan commented 6 years ago

OK. Let's fix it on the framework level and forget my suggestion!

doron-levi commented 6 years ago

Thanks @artembilan, can you have a look at the same time at KafkaTopicProvisioner and its AdminClient relation?

Can the AdminClient be a Bean so it might be reused in other use cases? alternately can the client be auto closed after operations (or on idle time or some other policy) ? I think I have observed it "spiting" errors to the log in some cases, it probably doing stuff in the background...