Closed manish-pcy closed 5 years ago
The usual requirement is to have the kafka transaction rollback if the DB commit fails so that the record is redelivered.
If you want the reverse scenario, add a ListenerContainerCustomizer
bean and modify the ContainerProperties
(get the KTM from the properties, wrap it in a ChainedKafkaTransactionManager
- a subclass of ChainedTM - and set the container property to the chained TM).
@garyrussell, thanks for your reply.
I tried the way you mentioned but call to configure(Object container, String destinationName, String group)
never came though I can see newly create ListenerContainerCustomizer bean is passed here https://github.com/spring-cloud/spring-cloud-stream/blob/2.1.x/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/AbstractMessageChannelBinder.java#L143
Then I checked that usage of ListenerContainerCustomize 's configure method is defined here https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/2.1.x/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java#L490 which is for consumers I think but there are no consumers as the application is producer only.
Regarding your first point also, now I can understand that why you mentioned "redelivered" but this in not consumer application but producer only where some API calls are triggering DB and Kafka calls.
Could you please check and provide any way on producer side to reverse the default commit order.
For producer only transactions, you can start the transaction with @Transactional
; you must use the transactional binder's ProducerFactory
in the KTM so that the send participates in the transaction. getTransactionalProducerFactory()
was added to the binder. It was supposed to be back-ported to 2.1.x but it was not.
@garyrussell thanks again for your reply. Yeah I came across using getTransactionalProducerFactory method but could not find this in 2.1.x. Is there any workaround you can suggest for now ? I there any plan to back port it to 2.1.x? I could not even see it as part of any other new releases also.
Also, does it also mean that Producer only transactions are not possible as of now even using default order(DB commit/rollback first then Kafka) ? I think that's possible because that worked for us though not in desired order but wanted to confirm once.
@y-manish We missed backporting the PR that @garyrussell mentioned above to both 2.1.x
and 2.2.x
branches back when it was originally merged to master
. However, both of those branches are now updated with this back-port and the changes are available on the snapshots. Please try it and see if the issues you raised are resolved now.
@sobychacko , thanks. What snapshot path/name I should use ? like for release version I use as follows : `
`
@y-manish 2.1.4.BUILD-SNAPSHOT
is the latest. It is coming from here.
@sobychacko , thanks for info, I could use snapshot version by defining the spring snapshot repo path. And I think I could get the desired order of commit/rollback. Code :
@Bean(name = "chainedTransactionManager")
public ChainedTransactionManager chainedTransactionManager(
DataSourceTransactionManager dataSourceTransactionManager, BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null, MessageChannel.class))
.getTransactionalProducerFactory();
KafkaTransactionManager<byte[], byte[]> ktm = new KafkaTransactionManager<>(pf);
ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager<Object, Object>(dataSourceTransactionManager, ktm);
}
// usage
@Transactional(transactionManager = "chainedTransactionManager")
public void targetMethod(){
// DB call
// Kafka Produce
}
Will it be available anytime soon as 2.1.4.RELEASE ?
Also in this new version, I could see difference in logging or may be different in config. Previously with 2.1.0.RELEASE, I was getting INFO logs mentioned below for all the topics but now I only get it once :
20269 INFO [restartedMain] org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
Is this expected in new version because of any other improvements ?
It's expected (but the previous behavior is not ) - the producer factory caches the producer(s). Perhaps you had multiple producer factories? Show your previous configuration.
Note that will producer only transactions, the transactional.id must be unique in each application instance.
@y-manish We can look into a possible release. Will let you know.
@garyrussell,
Perhaps you had multiple producer factories
Not sure but I can see this behavior by just changing the spring-cloud-stream-binder-kafka version to new snapshot version, rest of the things are same.
Following are the logs. As described earlier, there are logs(ProducerConfig values) for each dirrerent producer binding and that too with different transactionalId for each producers.
Start up logs for 2.1.0.RELEASE: https://gist.github.com/y-manish/6ba92a212ea20c58e6d170ae25e92623 Start up logs for 2.1.4.BUILD-SNAPSHOT: https://gist.github.com/y-manish/ba27ab4ae2338f390189b1d8f968bd79 application.yaml(same config is used in both of the above scenarios): https://gist.github.com/y-manish/52108244fd5a53d4e45459620a4b927f
Also, yes we will take care to have different transaction-id-prefix for different application instances by using docker container short id :)
I can't explain why you got 3 producers before and only one now. It looks like they were being physically closed for some reason, rather than being cached for reuse.
This is addressed in https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/c0d67a5c1603d76177397a4f33b00d1a8ccb6e4d. We are aiming to cut a point release on 2.1.x and 2.2.x some time this week.
I could not make spring-cloud-stream-binder-kafka work for following use case:
Version : 2.1.x (But I think it should be there in every version supporting transaction)
Because of this there is a chance that DB operation might be successfully without publishing event to Kafka if something fails after DB commit. We can easily handle the scenario where Kafka event is published and DB fails by having some consistency check on consumer side but not the current default case(i.e. DB commit first and then Kafka).
With just spring-kafka project, it seems possible by using a ChainTransactionManager by proving the transaction managers in required order but could not find a way to do the same in spring-cloud-stream-binder-kafka as can't get reference of KafkaTransactionManager from KafkaMessageChannelBinder. https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/2.1.x/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java#L204
Also, we tried an extra @Transaction(propagation_level = Requires_New) for method sending Kafka events which gives us expected result but this fetches extra database connection from pool unnecessarily and degrades performance.
Any help(even any custom code for now) would be appreciated.
PS: We can't switch back to just spring-kafka as lot of development has already been done using spring-cloud-stream-binder-kafka's useful and time saving features.