Closed garyrussell closed 7 years ago
will this include honoring @Transactional as well ?
I think so. Since @Transactional
starts a TX in the current thread, that won't be surprise from the producer side to pick up that one and participate in it.
Looks like this issue duplicates #258
@jorgheymans Yes - providing the implementation of PlatformTransactionManager
provides that functionality.
Is there a recommended way to set the "transactionIdPrefix" on the DefaultkafkaProducerFactory? The documentation that was added does not appear to get into that detail.
A transactionIdPrefix
is exactly about ProducerConfig.TRANSACTIONAL_ID_CONFIG
: https://kafka.apache.org/documentation/#producerconfigs:
The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. Note that enable.idempotence must be enabled if a TransactionalId is configured. The default is empty, which means transactions cannot be used.
So, if you familiar with that, it should not be so complicated to determine that this transactionIdPrefix
is for distinguishing one transactional producer from another on the Broker level during troubleshooting.
Therefore you can set it into something like myAwesomeProducer
on one DefaultKafkaProducerFactory
and theirBadProducer
on another. Or this can be based on the IP or other resource to determine the location of your producer. You can build any arbitrary string for this property.
Hope that helps
I am not sure what you mean by
The documentation that was added does not appear to get into that detail.
See the documentation.
Transactions are enabled by providing the
DefaultKafkaProducerFactory
with atransactionIdPrefix
. In that case, instead of managing a single sharedProducer
, the factory maintains a cache of transactional producers. When the userclose()
s a producer, it is returned to the cache for reuse instead of actually being closed. Thetransactional.id
property of each producer istransactionIdPrefix + n
, where n starts with 0 and is incremented for each new producer.
Please explain what you think is "missing" and we'd be happy to clarify it.
I think I was a little hasty with my question, it is more of a spring-boot and spring-kafka question. I am using spring-boot and was relying on the KafkaAutoConfiguration to inject all of the beans needed for me to Autowire my KafkaTemplate. I can see from the spring-kafka documentation that to create the ProducerFactory I need to specify the @Bean method to construct the ProducerFactory and pass the configuration into the Factory:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...
return props;
}
When using the KafkaAutoConfiguration with spring-boot I can set the producer properties in my application.yml and rely on the following method to do the work for me:
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory() {
return new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
}
This is using the KafkaProperties configuration properties class. I was looking for a configuration property I could use to set the transactionIdPrefix on the producer factory, but it does not appear that the spring boot KafkaAutoConfiguration and related configuration properties support that. Therefore I think I need to do something like this:
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaTransactionalProducerFactory<K, V> {
private final KafkaProperties properties;
public KafkaTransactionalProducerFactory(final KafkaProperties properties) {
this.properties = properties;
}
@Bean
public ProducerFactory<K, V> kafkaProducerFactory() {
final DefaultKafkaProducerFactory<K, V> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<K, V>(
this.properties.buildProducerProperties());
defaultKafkaProducerFactory.setTransactionIdPrefix("my.transaction.id");
return defaultKafkaProducerFactory;
}
}
@cortexcompiler ,
I think this is a good catch!
Indeed we should add that transactionIdPrefix
as a configuration property into Spring Boot.
Please, raise an issue there and link it here.
More over I even think that we should auto-configure KafkaTransactionManager
and maybe even inject it into the default ConcurrentKafkaListenerContainerFactory
if the consumer transactional
option is true
, too...
template.executeInTransaction(KafkaOperationsCallback callback)
PlatformTransactionManager
DefaultProducerFactory
to hand out a new Producer instead of a singleton (when transactions are enabled).