spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.16k stars 1.54k forks source link

Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION #433

Closed agrawal93 closed 6 years ago

agrawal93 commented 7 years ago

To add transaction support in my application, I followed the below mentioned approach:

  1. Configuration File

@Configuration @EnableKafka @EnableTransactionManagement public class KafkaProducerConfig {

@Autowired
private KafkaConfiguration kafkaConfiguration; // default configurations from application.properties

@Bean
public Map producerConfigurations() {
    Map configurations = new HashMap(kafkaConfiguration.getProducer());
    configurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
    configurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
    return configurations;
}

@Bean
public DefaultKafkaProducerFactory producerFactory() {
    DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerConfigurations());
    producerFactory.setTransactionIdPrefix("transaction.prefix.");
    return producerFactory;
}

@Bean
public KafkaTransactionManager kafkaTransactionManager() {
    KafkaTransactionManager transactionManager = new KafkaTransactionManager(producerFactory());
    transactionManager.setNestedTransactionAllowed(true);
    return transactionManager;
}

@Bean
public KafkaTemplate kafkaTemplate() {
    return new KafkaTemplate(producerFactory());
}

}

2. Implementation (Call to the below mentioned method)
```java
    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Transactional
    public void publishAsTransaction(String topic, String messages[]) {
        for(String message : messages) {
            kafkaTemplate.send(topic, message);
        }
    }

After calling the publishAsTransaction method with appropriate topic and messages[], the first run is successful with all messages getting published to the topic. But, every subsequent run after that gives me the error Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION.

Is there anything wrong with my approach? Kindly guide me on the same.

garyrussell commented 7 years ago

What version are you testing with? Can you get a DEBUG log? I just ran this test and everything worked as expected...

@Test
public void testDeclarativeIntegration() throws Exception {
    AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(DeclarativeConfig.class,
            EmbeddedConfig.class);
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka);
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
    cf.setKeyDeserializer(new StringDeserializer());
    Consumer<String, String> consumer = cf.createConsumer();
    embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "fiz");
    Tx2 tx2 = ctx.getBean(Tx2.class);
    tx2.anotherTxMethod();
    ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer, "fiz");
    assertThat(record.value()).isEqualTo("buz");
    tx2.anotherTxMethod();
    record = KafkaTestUtils.getSingleRecord(consumer, "fiz");
    assertThat(record.value()).isEqualTo("buz");
    ctx.close();
}

public static class Tx2 {

    @SuppressWarnings("rawtypes")
    private final KafkaTemplate template;

    @SuppressWarnings("rawtypes")
    public Tx2(KafkaTemplate template) {
        this.template = template;
    }

    @SuppressWarnings("unchecked")
    @Transactional
    public void anotherTxMethod() {
        template.send("fiz", "buz");
    }

}
11:28:56.422 DEBUG [main][org.springframework.kafka.transaction.KafkaTransactionManager] Creating new transaction with name [org.springframework.kafka.core.KafkaTemplateTransactionTests$Tx2.anotherTxMethod]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; ''
11:28:58.936 DEBUG [main][org.springframework.kafka.transaction.KafkaTransactionManager] Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@327c7bea]]
11:28:59.184 DEBUG [main][org.springframework.kafka.transaction.KafkaTransactionManager] Initiating transaction commit
11:28:59.350 DEBUG [main][org.springframework.kafka.transaction.KafkaTransactionManager] Creating new transaction with name [org.springframework.kafka.core.KafkaTemplateTransactionTests$Tx2.anotherTxMethod]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; ''
11:28:59.350 DEBUG [main][org.springframework.kafka.transaction.KafkaTransactionManager] Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@327c7bea]]
11:28:59.350 DEBUG [main][org.springframework.kafka.transaction.KafkaTransactionManager] Initiating transaction commit
agrawal93 commented 7 years ago

Hi Gary,

Thanks for your quick response.

Please find the requested details below:

  1. Kafka Version: 0.11.0.0-cp1
  2. Spring-Kafka Version: 1.3.0.RC1
  3. Log for first attempt on valid set of transactions:
    2017-09-23 23:11:36.582  INFO 12000 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Instantiated a transactional producer.
    2017-09-23 23:11:36.582  INFO 12000 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Overriding the default max.in.flight.requests.per.connection to 1 since idempontence is enabled.
    2017-09-23 23:11:36.590 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name bufferpool-wait-time
    2017-09-23 23:11:36.598 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name buffer-exhausted-records
    2017-09-23 23:11:36.602 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.clients.Metadata        : Updated cluster metadata version 1 to Cluster(id = null, nodes = [ec2-xxxx.eu-west-1.compute.amazonaws.com:9092 (id: -1 rack: null)], partitions = [])
    2017-09-23 23:11:36.654 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name produce-throttle-time
    2017-09-23 23:11:36.670 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name connections-closed:
    2017-09-23 23:11:36.670 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name connections-created:
    2017-09-23 23:11:36.670 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name bytes-sent-received:
    2017-09-23 23:11:36.670 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name bytes-sent:
    2017-09-23 23:11:36.674 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name bytes-received:
    2017-09-23 23:11:36.674 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name select-time:
    2017-09-23 23:11:36.674 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name io-time:
    2017-09-23 23:11:36.682 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name batch-size
    2017-09-23 23:11:36.682 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name compression-rate
    2017-09-23 23:11:36.682 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name queue-time
    2017-09-23 23:11:36.682 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name request-time
    2017-09-23 23:11:36.682 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name records-per-request
    2017-09-23 23:11:36.682 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name record-retries
    2017-09-23 23:11:36.686 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name errors
    2017-09-23 23:11:36.686 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name record-size-max
    2017-09-23 23:11:36.686 DEBUG 12000 --- [nio-8080-exec-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name batch-split-rate
    2017-09-23 23:11:36.690  WARN 12000 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : The configuration 'producer.type' was supplied but isn't a known config.
    2017-09-23 23:11:36.690  WARN 12000 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : The configuration 'auto.commit.interval.ms' was supplied but isn't a known config.
    2017-09-23 23:11:36.690  WARN 12000 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : The configuration 'block.on.buffer.full' was supplied but isn't a known config.
    2017-09-23 23:11:36.690  WARN 12000 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : The configuration 'schema.registry.url' was supplied but isn't a known config.
    2017-09-23 23:11:36.694  INFO 12000 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.0
    2017-09-23 23:11:36.694  INFO 12000 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : cb8625948210849f
    2017-09-23 23:11:36.702 DEBUG 12000 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : Starting Kafka producer I/O thread.
    2017-09-23 23:11:36.702 DEBUG 12000 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Kafka producer started
    2017-09-23 23:11:36.734 DEBUG 12000 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.0] Transition from state UNINITIALIZED to INITIALIZING
    2017-09-23 23:11:36.734  INFO 12000 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.0] ProducerId set to -1 with epoch -1
    2017-09-23 23:11:36.742 DEBUG 12000 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.0] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=transaction.prefix.0, transactionTimeoutMs=60000)
    2017-09-23 23:11:36.746 DEBUG 12000 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.0] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=transaction.prefix.0, coordinatorType=TRANSACTION)
    2017-09-23 23:11:36.746 DEBUG 12000 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.0] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=transaction.prefix.0, transactionTimeoutMs=60000)
    2017-09-23 23:11:36.859 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Initiating connection to node -1 at ec2-xxxx.eu-west-1.compute.amazonaws.com:9092.
    2017-09-23 23:11:37.081 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--1.bytes-sent
    2017-09-23 23:11:37.085 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--1.bytes-received
    2017-09-23 23:11:37.085 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--1.latency
    2017-09-23 23:11:37.085 DEBUG 12000 --- [ad | producer-1] o.apache.kafka.common.network.Selector   : Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
    2017-09-23 23:11:37.089 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Completed connection to node -1.  Fetching API versions.
    2017-09-23 23:11:37.089 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Initiating API versions fetch from node -1.
    2017-09-23 23:11:37.524 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Recorded API versions for node -1: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
    2017-09-23 23:11:37.524 DEBUG 12000 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [TransactionalId transaction.prefix.0] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=transaction.prefix.0, coordinatorType=TRANSACTION) to node ec2-xxxx.eu-west-1.compute.amazonaws.com:9092 (id: -1 rack: null)
    2017-09-23 23:11:37.717 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Initiating connection to node 2 at ec2-xxxx.eu-west-1.compute.amazonaws.com:9093.
    2017-09-23 23:11:37.889 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node-2.bytes-sent
    2017-09-23 23:11:37.889 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node-2.bytes-received
    2017-09-23 23:11:37.889 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node-2.latency
    2017-09-23 23:11:37.889 DEBUG 12000 --- [ad | producer-1] o.apache.kafka.common.network.Selector   : Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2
    2017-09-23 23:11:37.889 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Completed connection to node 2.  Fetching API versions.
    2017-09-23 23:11:37.889 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Initiating API versions fetch from node 2.
    2017-09-23 23:11:37.967 DEBUG 12000 --- [ad(kafka1:2181)] org.apache.zookeeper.ClientCnxn          : Got ping response for sessionid: 0x15e99d378c20175 after 182ms
    2017-09-23 23:11:38.050 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Recorded API versions for node 2: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
    2017-09-23 23:11:38.154 DEBUG 12000 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [TransactionalId transaction.prefix.0] Sending transactional request (type=InitProducerIdRequest, transactionalId=transaction.prefix.0, transactionTimeoutMs=60000) to node ec2-xxxx.eu-west-1.compute.amazonaws.com:9093 (id: 2 rack: null)
    2017-09-23 23:11:38.407  INFO 12000 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.0] ProducerId set to 19001 with epoch 10
    2017-09-23 23:11:38.407 DEBUG 12000 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.0] Transition from state INITIALIZING to READY
    2017-09-23 23:11:38.407 DEBUG 12000 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.0] Transition from state READY to IN_TRANSACTION
    2017-09-23 23:11:38.427 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Sending metadata request (type=MetadataRequest, topics=TEST_TOPIC) to node -1
    2017-09-23 23:11:38.637 DEBUG 12000 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Updated cluster metadata version 2 to Cluster(id = uep9RADnSWqQ3gWLpwlAbA, nodes = [ec2-xxxx.eu-west-1.compute.amazonaws.com:9092 (id: 1 rack: null), ec2-xxxx.eu-west-1.compute.amazonaws.com:9093 (id: 2 rack: null), ec2-xxxx.eu-west-1.compute.amazonaws.com:9094 (id: 3 rack: null)], partitions = [Partition(topic = TEST_TOPIC, partition = 0, leader = 1, replicas = [1], isr = [1])])
    2017-09-23 23:11:38.661 DEBUG 12000 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.0] Begin adding new partition TEST_TOPIC-0 to transaction
    2017-09-23 23:11:38.681 DEBUG 12000 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.0] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=transaction.prefix.0, producerId=19001, producerEpoch=10, partitions=[TEST_TOPIC-0])
    2017-09-23 23:11:38.681 DEBUG 12000 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [TransactionalId transaction.prefix.0] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=transaction.prefix.0, producerId=19001, producerEpoch=10, partitions=[TEST_TOPIC-0]) to node ec2-xxxx.eu-west-1.compute.amazonaws.com:9093 (id: 2 rack: null)
    2017-09-23 23:11:38.709 DEBUG 12000 --- [nio-8080-exec-1] m.m.a.RequestResponseBodyMethodProcessor : Written [Successfully Published] as "text/plain" using [org.springframework.http.converter.StringHttpMessageConverter@3b24087d]
    2017-09-23 23:11:38.709 DEBUG 12000 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Null ModelAndView returned to DispatcherServlet with name 'dispatcherServlet': assuming HandlerAdapter completed request handling
    2017-09-23 23:11:38.709 DEBUG 12000 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Successfully completed request
  4. Log for the second attempt on the same:
    
    2017-09-23 23:15:36.564 DEBUG 12000 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet        : Could not complete request

org.apache.kafka.common.KafkaException: TransactionalId transaction.prefix.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:497) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:491) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:185) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:546) ~[kafka-clients-0.11.0.0.jar:na] at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:272) ~[spring-kafka-1.3.0.RC1.jar:na] at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:60) ~[spring-kafka-1.3.0.RC1.jar:na] at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:368) ~[spring-kafka-1.3.0.RC1.jar:na] at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:310) ~[spring-kafka-1.3.0.RC1.jar:na] at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:166) ~[spring-kafka-1.3.0.RC1.jar:na] at com.agrawal93.test.controller.KafkaController.publishTransactionAnnotation(KafkaController.java:62) ~[classes/:na] at com.agrawal93.test.controller.KafkaController.publishMessageInValidTransaction(KafkaController.java:41) ~[classes/:na] at com.agrawal93.test.controller.KafkaController$$FastClassBySpringCGLIB$$d6e50f0a.invoke() ~[classes/:na] at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:669) ~[spring-aop-4.3.10.RELEASE.jar:4.3.10.RELEASE] at com.agrawal93.test.controller.KafkaController$$EnhancerBySpringCGLIB$$e424e0f4.publishMessageInValidTransaction() ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_111] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_111] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_111] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_111] at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:967) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:901) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970) [spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861) [spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846) [spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) [tomcat-embed-websocket-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.boot.web.filter.ApplicationContextHeaderFilter.doFilterInternal(ApplicationContextHeaderFilter.java:55) [spring-boot-1.5.6.RELEASE.jar:1.5.6.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.boot.actuate.trace.WebRequestTraceFilter.doFilterInternal(WebRequestTraceFilter.java:110) [spring-boot-actuator-1.5.6.RELEASE.jar:1.5.6.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:208) [spring-security-web-4.2.3.RELEASE.jar:4.2.3.RELEASE] at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:177) [spring-security-web-4.2.3.RELEASE.jar:4.2.3.RELEASE] at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346) [spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262) [spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) [spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:105) [spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) [spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197) [spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.boot.actuate.autoconfigure.MetricsFilter.doFilterInternal(MetricsFilter.java:106) [spring-boot-actuator-1.5.6.RELEASE.jar:1.5.6.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) [spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:478) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:80) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:799) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1455) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.16.jar:8.5.16] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111] at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.16.jar:8.5.16] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]

2017-09-23 23:15:36.568 DEBUG 12000 --- [nio-8080-exec-3] o.s.b.w.f.OrderedRequestContextFilter : Cleared thread-bound request context: org.apache.catalina.connector.RequestFacade@54575fa3 2017-09-23 23:15:36.568 ERROR 12000 --- [nio-8080-exec-3] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.KafkaException: TransactionalId transaction.prefix.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION] with root cause

org.apache.kafka.common.KafkaException: TransactionalId transaction.prefix.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:497) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:491) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:185) ~[kafka-clients-0.11.0.0.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:546) ~[kafka-clients-0.11.0.0.jar:na] at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:272) ~[spring-kafka-1.3.0.RC1.jar:na] at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:60) ~[spring-kafka-1.3.0.RC1.jar:na] at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:368) ~[spring-kafka-1.3.0.RC1.jar:na] at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:310) ~[spring-kafka-1.3.0.RC1.jar:na] at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:166) ~[spring-kafka-1.3.0.RC1.jar:na] at com.agrawal93.test.controller.KafkaController.publishTransactionAnnotation(KafkaController.java:62) ~[classes/:na] at com.agrawal93.test.controller.KafkaController.publishMessageInValidTransaction(KafkaController.java:41) ~[classes/:na] at com.agrawal93.test.controller.KafkaController$$FastClassBySpringCGLIB$$d6e50f0a.invoke() ~[classes/:na] at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:669) ~[spring-aop-4.3.10.RELEASE.jar:4.3.10.RELEASE] at com.agrawal93.test.controller.KafkaController$$EnhancerBySpringCGLIB$$e424e0f4.publishMessageInValidTransaction() ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_111] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_111] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_111] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_111] at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:967) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:901) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:861) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846) ~[spring-webmvc-4.3.10.RELEASE.jar:4.3.10.RELEASE] at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.boot.web.filter.ApplicationContextHeaderFilter.doFilterInternal(ApplicationContextHeaderFilter.java:55) ~[spring-boot-1.5.6.RELEASE.jar:1.5.6.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.boot.actuate.trace.WebRequestTraceFilter.doFilterInternal(WebRequestTraceFilter.java:110) ~[spring-boot-actuator-1.5.6.RELEASE.jar:1.5.6.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:208) ~[spring-security-web-4.2.3.RELEASE.jar:4.2.3.RELEASE] at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:177) ~[spring-security-web-4.2.3.RELEASE.jar:4.2.3.RELEASE] at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:105) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:197) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.springframework.boot.actuate.autoconfigure.MetricsFilter.doFilterInternal(MetricsFilter.java:106) ~[spring-boot-actuator-1.5.6.RELEASE.jar:1.5.6.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-4.3.10.RELEASE.jar:4.3.10.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:478) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:80) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:799) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1455) [tomcat-embed-core-8.5.16.jar:8.5.16] at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.16.jar:8.5.16] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_111] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_111] at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.16.jar:8.5.16] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_111]

2017-09-23 23:15:36.584 DEBUG 12000 --- [nio-8080-exec-3] o.a.c.c.C.[Tomcat].[localhost] : Processing ErrorPage[errorCode=0, location=/error] 2017-09-23 23:15:36.588 DEBUG 12000 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet : DispatcherServlet with name 'dispatcherServlet' processing GET request for [/error] 2017-09-23 23:15:36.588 DEBUG 12000 --- [nio-8080-exec-3] o.s.b.a.e.mvc.EndpointHandlerMapping : Looking up handler method for path /error 2017-09-23 23:15:36.592 DEBUG 12000 --- [nio-8080-exec-3] o.s.b.a.e.mvc.EndpointHandlerMapping : Did not find handler method for [/error] 2017-09-23 23:15:36.592 DEBUG 12000 --- [nio-8080-exec-3] s.w.s.m.m.a.RequestMappingHandlerMapping : Looking up handler method for path /error 2017-09-23 23:15:36.592 DEBUG 12000 --- [nio-8080-exec-3] s.w.s.m.m.a.RequestMappingHandlerMapping : Returning handler method [public org.springframework.http.ResponseEntity<java.util.Map<java.lang.String, java.lang.Object>> org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest)] 2017-09-23 23:15:36.592 DEBUG 12000 --- [nio-8080-exec-3] o.s.b.f.s.DefaultListableBeanFactory : Returning cached instance of singleton bean 'basicErrorController' 2017-09-23 23:15:36.592 DEBUG 12000 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet : Last-Modified value for [/error] is: -1 2017-09-23 23:15:36.636 DEBUG 12000 --- [nio-8080-exec-3] o.s.w.s.m.m.a.HttpEntityMethodProcessor : Written [{timestamp=Sat Sep 23 23:15:36 IST 2017, status=500, error=Internal Server Error, exception=org.apache.kafka.common.KafkaException, message=TransactionalId transaction.prefix.0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION, path=/kafka/transaction/$valid}] as "application/json" using [org.springframework.http.converter.json.MappingJackson2HttpMessageConverter@5042e3d0] 2017-09-23 23:15:36.636 DEBUG 12000 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet : Null ModelAndView returned to DispatcherServlet with name 'dispatcherServlet': assuming HandlerAdapter completed request handling 2017-09-23 23:15:36.636 DEBUG 12000 --- [nio-8080-exec-3] o.s.web.servlet.DispatcherServlet : Successfully completed request



Also, I noticed that during the first attempt, all 5 messages are written to Kafka. But, even on failure of the second attempt, there is one log getting registered to Kafka. Maybe I am going wrong somewhere?
garyrussell commented 7 years ago

I need DEBUG logging enabled for at least org.springframework.kafka and org.springframework.transaction too.

agrawal93 commented 6 years ago

I tried moving that piece of code to a separate project and it is working fine. I am not facing any issues with that. Maybe in my main project, I'm using it wrong.

Apologies for inconvenience. I'll close the issue.

garyrussell commented 6 years ago

Thanks; but if you determine the root cause, please post here in case there's something we can do in the framework to detect a mis-configuration.

agrawal93 commented 6 years ago

Hi @garyrussell,

Actually, this was an issue with synchronization. When I configured my KafkaTransactionManager's synchronization to SYNCHRONIZATION_ALWAYS, it all worked out for me.

But, now, if I remove the @Transactional annotation from the method, I was expecting it to work in a non-transactional mode. Though, I experienced the same issue as I mentioned before, i.e. the first run is successful, but the subsequent runs give the error: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION

I am attaching the trace logs for org.apache.kafka.clients.producer, org.springframework.kafka and org.springframework.transaction below:

2017-10-13 11:04:04.731  INFO 9160 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Instantiated a transactional producer.
2017-10-13 11:04:04.731  INFO 9160 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Overriding the default max.in.flight.requests.per.connection to 1 since idempontence is enabled.
2017-10-13 11:04:04.739  WARN 9160 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : The configuration 'producer.type' was supplied but isn't a known config.
2017-10-13 11:04:04.739  WARN 9160 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : The configuration 'auto.commit.interval.ms' was supplied but isn't a known config.
2017-10-13 11:04:04.739 DEBUG 9160 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender  : Starting Kafka producer I/O thread.
2017-10-13 11:04:04.739  WARN 9160 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : The configuration 'block.on.buffer.full' was supplied but isn't a known config.
2017-10-13 11:04:04.743  INFO 9160 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.0
2017-10-13 11:04:04.743  INFO 9160 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : cb8625948210849f
2017-10-13 11:04:04.743 DEBUG 9160 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Kafka producer started
2017-10-13 11:04:04.743 DEBUG 9160 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Transition from state UNINITIALIZED to INITIALIZING
2017-10-13 11:04:04.743  INFO 9160 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] ProducerId set to -1 with epoch -1
2017-10-13 11:04:04.743 DEBUG 9160 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=transaction.prefix.1507872798892.0, transactionTimeoutMs=60000)
2017-10-13 11:04:04.747 TRACE 9160 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Request (type=InitProducerIdRequest, transactionalId=transaction.prefix.1507872798892.0, transactionTimeoutMs=60000) dequeued for sending
2017-10-13 11:04:04.747 DEBUG 9160 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=transaction.prefix.1507872798892.0, coordinatorType=TRANSACTION)
2017-10-13 11:04:04.747 DEBUG 9160 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=transaction.prefix.1507872798892.0, transactionTimeoutMs=60000)
2017-10-13 11:04:04.847 TRACE 9160 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Request (type=FindCoordinatorRequest, coordinatorKey=transaction.prefix.1507872798892.0, coordinatorType=TRANSACTION) dequeued for sending
2017-10-13 11:04:05.257 DEBUG 9160 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender  : [TransactionalId transaction.prefix.1507872798892.0] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=transaction.prefix.1507872798892.0, coordinatorType=TRANSACTION) to node ec2-xxxx.compute.amazonaws.com:9094 (id: -3 rack: null)
2017-10-13 11:04:05.462 TRACE 9160 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Received transactional response FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=ec2-xxxx.compute.amazonaws.com:9093 (id: 2 rack: null)) for request (type=FindCoordinatorRequest, coordinatorKey=transaction.prefix.1507872798892.0, coordinatorType=TRANSACTION)
2017-10-13 11:04:05.462 TRACE 9160 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Request (type=InitProducerIdRequest, transactionalId=transaction.prefix.1507872798892.0, transactionTimeoutMs=60000) dequeued for sending
2017-10-13 11:04:05.974 DEBUG 9160 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender  : [TransactionalId transaction.prefix.1507872798892.0] Sending transactional request (type=InitProducerIdRequest, transactionalId=transaction.prefix.1507872798892.0, transactionTimeoutMs=60000) to node ec2-xxxx.compute.amazonaws.com:9093 (id: 2 rack: null)
2017-10-13 11:04:06.180 TRACE 9160 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Received transactional response InitProducerIdResponse(error=NONE, producerId=19011, producerEpoch=0, throttleTimeMs=0) for request (type=InitProducerIdRequest, transactionalId=transaction.prefix.1507872798892.0, transactionTimeoutMs=60000)
2017-10-13 11:04:06.180  INFO 9160 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] ProducerId set to 19011 with epoch 0
2017-10-13 11:04:06.180 DEBUG 9160 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Transition from state INITIALIZING to READY
2017-10-13 11:04:06.180 DEBUG 9160 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Transition from state READY to IN_TRANSACTION
2017-10-13 11:04:06.188 TRACE 9160 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Bound value [org.springframework.kafka.core.KafkaResourceHolder@54b81934] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@30506c0d] to thread [http-nio-8080-exec-1]
2017-10-13 11:04:06.188 TRACE 9160 --- [nio-8080-exec-1] o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord(topic=ATEST-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=H, timestamp=null)
2017-10-13 11:04:06.208 TRACE 9160 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Requesting metadata update for topic ATEST-1.
2017-10-13 11:04:07.204 TRACE 9160 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Sending record ProducerRecord(topic=ATEST-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=H, timestamp=null) with callback org.springframework.kafka.core.KafkaTemplate$1@b0c3ef to topic ATEST-1 partition 0
2017-10-13 11:04:07.208 DEBUG 9160 --- [nio-8080-exec-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Begin adding new partition ATEST-1-0 to transaction
2017-10-13 11:04:07.208 TRACE 9160 --- [nio-8080-exec-1] o.a.k.c.p.internals.RecordAccumulator    : Allocating a new 16384 byte message buffer for topic ATEST-1 partition 0
2017-10-13 11:04:07.216 TRACE 9160 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Waking up the sender since topic ATEST-1 partition 0 is either full or getting a new batch
2017-10-13 11:04:07.220 TRACE 9160 --- [nio-8080-exec-1] o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=ATEST-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=H, timestamp=null)
2017-10-13 11:04:07.220 TRACE 9160 --- [nio-8080-exec-1] .s.t.s.TransactionSynchronizationManager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@54b81934] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@30506c0d] bound to thread [http-nio-8080-exec-1]
2017-10-13 11:04:07.220 TRACE 9160 --- [nio-8080-exec-1] o.s.kafka.core.KafkaTemplate             : Sending: ProducerRecord(topic=ATEST-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=H, timestamp=null)
2017-10-13 11:04:07.220 DEBUG 9160 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=transaction.prefix.1507872798892.0, producerId=19011, producerEpoch=0, partitions=[ATEST-1-0])
2017-10-13 11:04:07.220 TRACE 9160 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Request (type=AddPartitionsToTxnRequest, transactionalId=transaction.prefix.1507872798892.0, producerId=19011, producerEpoch=0, partitions=[ATEST-1-0]) dequeued for sending
2017-10-13 11:04:07.220 DEBUG 9160 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender  : [TransactionalId transaction.prefix.1507872798892.0] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=transaction.prefix.1507872798892.0, producerId=19011, producerEpoch=0, partitions=[ATEST-1-0]) to node ec2-xxxx.compute.amazonaws.com:9093 (id: 2 rack: null)
2017-10-13 11:04:07.220 TRACE 9160 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : Sending record ProducerRecord(topic=ATEST-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=H, timestamp=null) with callback org.springframework.kafka.core.KafkaTemplate$1@180560ed to topic ATEST-1 partition 0
2017-10-13 11:04:07.220 TRACE 9160 --- [nio-8080-exec-1] o.s.kafka.core.KafkaTemplate             : Sent: ProducerRecord(topic=ATEST-1, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=H, timestamp=null)
2017-10-13 11:04:07.408 TRACE 9160 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Received transactional response AddPartitionsToTxnResponse(errors={ATEST-1-0=NONE}, throttleTimeMs=0) for request (type=AddPartitionsToTxnRequest, transactionalId=transaction.prefix.1507872798892.0, producerId=19011, producerEpoch=0, partitions=[ATEST-1-0])
2017-10-13 11:04:07.408 DEBUG 9160 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [TransactionalId transaction.prefix.1507872798892.0] Successfully added partitions [ATEST-1-0] to transaction
2017-10-13 11:04:07.412 DEBUG 9160 --- [ad | producer-2] o.a.k.c.p.internals.RecordAccumulator    : Assigning sequence number 0 from producer (producerId=19011, epoch=0) to dequeued batch from partition ATEST-1-0 bound for ec2-xxxx.compute.amazonaws.com:9093 (id: 2 rack: null).
2017-10-13 11:04:07.416 TRACE 9160 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender  : Nodes with data ready to send: [ec2-xxxx.compute.amazonaws.com:9093 (id: 2 rack: null)]
2017-10-13 11:04:07.424 TRACE 9160 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender  : Sent produce request to 2: (type=ProduceRequest, magic=2, acks=-1, timeout=30000, partitionRecords=({ATEST-1-0=[(record=DefaultRecord(offset=0, timestamp=1507872847204, key=0 bytes, value=7 bytes)), (record=DefaultRecord(offset=1, timestamp=1507872847220, key=0 bytes, value=7 bytes))]}), transactionalId='transaction.prefix.1507872798892.0'
2017-10-13 11:04:07.612 TRACE 9160 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender  : Received produce response from node 2 with correlation id 6
2017-10-13 11:04:07.612 DEBUG 9160 --- [ad | producer-2] o.a.k.clients.producer.internals.Sender  : Incremented sequence number for topic-partition ATEST-1-0 to 2
2017-10-13 11:04:07.612 TRACE 9160 --- [ad | producer-2] o.a.k.c.p.internals.ProducerBatch        : Successfully produced messages to ATEST-1-0 with base offset 41.

From what I understood, the transactional producer starts the first run as a new transaction, and takes it to IN_TRANSACTION state. But never commits or aborts the transaction, so the producer remains in the IN_TRANSACTION state. Even if it is a new transaction and it doesn't commit, I'm still able to see the published record in kafka. So, I'm quite unsure of what exactly is happening.

Maybe you could help me understand if I am going wrong somewhere?

garyrussell commented 6 years ago

But, now, if I remove the @Transactional annotation from the method, I was expecting it to work in a non-transactional mode.

You also have to configure the producer factory to disable transactions otherwise the transaction will be started but not committed.

I am not sure if we can detect such a mis-configuration, but we will take a look.

Even if it is a new transaction and it doesn't commit, I'm still able to see the published record in kafka. So, I'm quite unsure of what exactly is happening.

I don't know why the messages would be visible since the tx is not committed. I guess you could test with a pure kafka producer (no spring-kafka) and if you get the same results, ask the kafka guys; if you get different results, report it here.

tomaszszlek commented 6 years ago

Hi @garyrussell,

I have exactly the same exception as @agrawal93. I use: Spring-Kafka : 2.1.0.RELEASE kafka-clients + kafka version: 1.0.0 Spring Boot: 2.0.0.M3 / M7 (tried both)

I use KafkaTransactionManager having one listener with method annotated with @Transactional (onEvent) My issues occurs randomly in integration tests where I use not transactional KafkaTemplate to send messages to Transactional Kafka Listeners.

[UPDATED with more logs from TransactionManager]

13:12:46.852 [DEBUG] [TestEventLogger]     2017-12-06 13:12:46.852  INFO --- [intContainer#4-0-C-1] o.a.k.c.p.internals.TransactionManager   DEV:427e021a-f607-4c20-9329-e06fc1ced14c : [Producer clientId=producer-2, transactionalId=MyApplication-transaction-0] ProducerId set to -1 with epoch -1
13:12:46.854 [DEBUG] [TestEventLogger]     2017-12-06 13:12:46.853 DEBUG --- [intContainer#4-0-C-1] o.a.k.c.p.internals.TransactionManager   DEV:427e021a-f607-4c20-9329-e06fc1ced14c : [Producer clientId=producer-2, transactionalId=MyApplication-transaction-0] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=MyApplication-transaction-0, transactionTimeoutMs=60000)
13:12:46.854 [DEBUG] [TestEventLogger]     2017-12-06 13:12:46.854 DEBUG --- [-thread | producer-2] o.a.k.c.p.internals.TransactionManager    : [Producer clientId=producer-2, transactionalId=MyApplication-transaction-0] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=MyApplication-transaction-0, coordinatorType=TRANSACTION)
13:12:46.855 [DEBUG] [TestEventLogger]     2017-12-06 13:12:46.854 DEBUG --- [-thread | producer-2] o.a.k.c.p.internals.TransactionManager    : [Producer clientId=producer-2, transactionalId=MyApplication-transaction-0] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=MyApplication-transaction-0, transactionTimeoutMs=60000)
13:12:47.065 [DEBUG] [TestEventLogger]     2017-12-06 13:12:47.065  INFO --- [-thread | producer-2] o.a.k.c.p.internals.TransactionManager    : [Producer clientId=producer-2, transactionalId=MyApplication-transaction-0] ProducerId set to 0 with epoch 6
13:12:47.065 [DEBUG] [TestEventLogger]     2017-12-06 13:12:47.065 DEBUG --- [-thread | producer-2] o.a.k.c.p.internals.TransactionManager    : [Producer clientId=producer-2, transactionalId=MyApplication-transaction-0] Transition from state INITIALIZING to READY
13:12:47.065 [DEBUG] [TestEventLogger]     2017-12-06 13:12:47.065 DEBUG --- [intContainer#4-0-C-1] o.a.k.c.p.internals.TransactionManager   DEV:427e021a-f607-4c20-9329-e06fc1ced14c : [Producer clientId=producer-2, transactionalId=MyApplication-transaction-0] Transition from state READY to IN_TRANSACTION
13:12:47.070 [DEBUG] [TestEventLogger]     2017-12-06 13:12:47.069 DEBUG --- [intContainer#4-0-C-1] o.a.k.c.p.internals.TransactionManager   DEV:427e021a-f607-4c20-9329-e06fc1ced14c : [Producer clientId=producer-2, transactionalId=MyApplication-transaction-0] Begin adding new partition topic1-0 to transaction
13:12:47.072 [DEBUG] [TestEventLogger]     2017-12-06 13:12:47.071 DEBUG --- [-thread | producer-2] o.a.k.c.p.internals.TransactionManager    : [Producer clientId=producer-2, transactionalId=MyApplication-transaction-0] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=MyApplication-transaction-0, producerId=0, producerEpoch=6, partitions=[topic1-0])
13:12:47.078 [DEBUG] [TestEventLogger]     2017-12-06 13:12:47.077 DEBUG --- [-thread | producer-2] o.a.k.c.p.internals.TransactionManager    : [Producer clientId=producer-2, transactionalId=MyApplication-transaction-0] Successfully added partitions [topic1-0] to transaction
13:12:47.100 [DEBUG] [TestEventLogger]     2017-12-06 13:12:47.100 DEBUG --- [intContainer#4-0-C-1] o.a.k.c.p.internals.TransactionManager   DEV:427e021a-f607-4c20-9329-e06fc1ced14c : [Producer clientId=producer-2, transactionalId=MyApplication-transaction-0] Begin adding new partition topic2-0 to transaction
13:12:47.102 [DEBUG] [TestEventLogger]     2017-12-06 13:12:47.102 DEBUG --- [-thread | producer-2] o.a.k.c.p.internals.TransactionManager    : [Producer clientId=producer-2, transactionalId=MyApplication-transaction-0] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=MyApplication-transaction-0, producerId=0, producerEpoch=6, partitions=[topic2-0])
13:12:47.108 [DEBUG] [TestEventLogger]     2017-12-06 13:12:47.107 DEBUG --- [-thread | producer-2] o.a.k.c.p.internals.TransactionManager    : [Producer clientId=producer-2, transactionalId=MyApplication-transaction-0] Successfully added partitions [topic2-0] to transaction
13:12:47.400 [DEBUG] [TestEventLogger]     org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.Listener.onEvent(com.example.Request)' threw exception; nested exception is org.apache.kafka.common.KafkaException: TransactionalId MyApplication-transaction-0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
13:12:47.400 [DEBUG] [TestEventLogger]          at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:255)
13:12:47.400 [DEBUG] [TestEventLogger]          at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:80)
13:12:47.400 [DEBUG] [TestEventLogger]          at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
13:12:47.400 [DEBUG] [TestEventLogger]          at org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter.onMessage(FilteringMessageListenerAdapter.java:71)
13:12:47.400 [DEBUG] [TestEventLogger]          at org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter.onMessage(FilteringMessageListenerAdapter.java:36)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:963)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:943)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:894)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:763)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:646)
13:12:47.401 [DEBUG] [TestEventLogger]          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
13:12:47.401 [DEBUG] [TestEventLogger]          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
13:12:47.401 [DEBUG] [TestEventLogger]          at java.lang.Thread.run(Thread.java:745)
13:12:47.401 [DEBUG] [TestEventLogger]     Caused by: org.apache.kafka.common.KafkaException: TransactionalId MyApplication-transaction-0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
13:12:47.401 [DEBUG] [TestEventLogger]          at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:755)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:749)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:215)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:564)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:285)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:60)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:368)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:310)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:197)
13:12:47.401 [DEBUG] [TestEventLogger]          at com.example.Service.process(Service.java:67)
13:12:47.401 [DEBUG] [TestEventLogger]          at com.example.Listener.onEvent(Listener.java:58)
13:12:47.401 [DEBUG] [TestEventLogger]          at com.example.Listener$$FastClassBySpringCGLIB$$43759362.invoke(<generated>)
13:12:47.402 [DEBUG] [TestEventLogger]          at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
13:12:47.402 [DEBUG] [TestEventLogger]          at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:747)
13:12:47.402 [DEBUG] [TestEventLogger]          at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
13:12:47.402 [DEBUG] [TestEventLogger]          at org.springframework.aop.aspectj.AspectJAfterAdvice.invoke(AspectJAfterAdvice.java:47)
13:12:47.402 [DEBUG] [TestEventLogger]          at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:174)
13:12:47.402 [DEBUG] [TestEventLogger]          at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
13:12:47.402 [DEBUG] [TestEventLogger]          at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
13:12:47.402 [DEBUG] [TestEventLogger]          at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689)
13:12:47.402 [DEBUG] [TestEventLogger]          at com.example.Listener$$EnhancerBySpringCGLIB$$48828413.onEvent(<generated>)
13:12:47.402 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
13:12:47.402 [DEBUG] [TestEventLogger]          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
13:12:47.402 [DEBUG] [TestEventLogger]          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
13:12:47.402 [DEBUG] [TestEventLogger]          at java.lang.reflect.Method.invoke(Method.java:497)
13:12:47.403 [DEBUG] [TestEventLogger]          at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
13:12:47.403 [DEBUG] [TestEventLogger]          at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
13:12:47.403 [DEBUG] [TestEventLogger]          at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
13:12:47.403 [DEBUG] [TestEventLogger]          at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:236)
13:12:47.403 [DEBUG] [TestEventLogger]          ... 12 common frames omitted

I will be thankful for any advice.

artembilan commented 6 years ago
13:12:47.401 [DEBUG] [TestEventLogger]          at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:60)
13:12:47.401 [DEBUG] [TestEventLogger]          at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:368)

That is possible only if we have producerFactory.transactionCapable(). But at the same time you say:

I use not transactional KafkaTemplate

Looks like that's not true and your DefaultKafkaProducerFactory is configured with the transactionIdPrefix. I may guess that you use Spring Boot and specify there producer.transactionIdPrefix application property.

Any thoughts from your side, please?

tomaszszlek commented 6 years ago

Sorry if I was not clear enough. My application looks like follows:

@Configuration
@EnableTransactionManagement
@EnableKafka
public class ProdKafkaConfiguration {
    @Bean
    public ProducerFactory<String, String> kafkaProducerFactory() {
        Map<String, Object> props = ImmutableMap.<String, Object> builder()
                .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers).build();
        DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(
                props);
        defaultKafkaProducerFactory.setTransactionIdPrefix("MyApplication-transaction-");
        return defaultKafkaProducerFactory;
    }

    @Bean
    public KafkaTemplate<String, String> myKafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = createTemplate();
        return kafkaTemplate;
    }
}

so Kafka producer has transaction enabled. Then I have listener (one of many, the rest of them have exactly the same construction) that listen on Kafka topic:

    @Component
    @Transactional
    class Listener {

        @Autowired
        Service service;

        @KafkaListener(topics = "topic1")
        void onEvent(Request request) {
            service.process(request);
        }
    }

The service method use KafkaTemplate declared in ProdKafkaConfiguration:

@Service
class Service {
    @Autowired
    @Qualifier("myKafkaTemplate")
    KafkaTemplate<String, String> myKafkaTemplate;

    void process(Request request) {
        myKafkaTemplate.send(...);

    }
}

Then I have set of integration test where I use TestKafkaConfiguration with test kafka template to send message from test to my production kafka listener (Listener):

public class TestKafkaConfiguration {
    @Bean
    public ProducerFactory<String, String> kafkaProducerFactory() {
        Map<String, Object> props = ImmutableMap.<String, Object> builder()
                .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers).build();
        DefaultKafkaProducerFactory<String, String> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(props); //please note that transactions are not enabled
        return defaultKafkaProducerFactory;
    }

    @Bean
    public KafkaTemplate<String, String> myTestKafkaTemplate() {
        KafkaTemplate<String, String> testKafkaTemplate = ...
        return testKafkaTemplate;
    }
}

and finally usage in test:

@RunWith(SpringRunner.class)
@SpringBootTest
@Import(TestKafkaConfiguration.class)
public class ListenerTest {

    @Autowired
    @Qualifier("myTestKafkaTemplate")
    KafkaTemplate<String, String> myTestKafkaTemplate;

    @Test
    public void test() throws Exception {
myTestKafkaTemplate.send(...)
...
// then I am reading messages using KafkaMessageListenerContainer and doing assertions
   }
}

One important thing : I am not using Embedded Kafka but run integration tests against the real Kafka. In my opinion it looks like for some reason, when the test is finished transaction stay in IN_TRANSACTION state so when another test runs is throws exception about invalid transition as mentioned in previous post.

artembilan commented 6 years ago

Please, wrap all of that into the simple Spring Boot project and share with us via GitHub. We will clone it locally and play. That's too much code to tackle via just comments. (Well, as usual for any Kafka application...)

hajlaoui-nader commented 6 years ago

Hello, @agrawal93 i have the same issue, have you found a solution ? thank you !

garyrussell commented 6 years ago

@hajlaoui-nader - you need to provide configuration and DEBUG logs.

hajlaoui-nader commented 6 years ago

Hello @garyrussel,

i'm using spring boot 1.5.2 and spring kafka 2.0.0

here's my conf

```
@Value("${kafka.bootstrap-servers}")
private String bootstrapServer;

@Value("${kafka.transactionid.prefix}")
private String transactionIdPrefix;

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    DefaultKafkaProducerFactory<String, String> producer = new DefaultKafkaProducerFactory<>(configProps);
    producer.setTransactionIdPrefix(transactionIdPrefix);
    return producer;
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public KafkaTransactionManager transactionManager() {
    return new KafkaTransactionManager(producerFactory());
}

@Bean
public JpaTransactionManager transactionManager(EntityManagerFactory em) {
    return new JpaTransactionManager(em);
}
the service :

    ```@Slf4j
    @Component
    public class CommandeEventService {

        @Transactional
        public void send(final CommandeEvent event) {
            CommandeEvent savedEvent = commandeRequestEventRepository.save(event);
            CommandeEventAck commandeEventAck = CommandeEventAck.initCommandeEventAck(savedEvent,
                    CommandeEventAck.AckStatus.PENDING);
            commandeEventAckRepository.save(commandeEventAck);

        kafkaEventRepository.send(savedEvent, commandeEventAck.getEventId());
        }
    }

my kafka repo :

    @Slf4j
    @Repository
    public class KafkaEventRepository {

        public void send(CommandeEvent event, UUID eventId) {
            log.info("sending event='{}' to topic='{}'", event, topicName);
            kafkaTemplate.send(topicName,
                serializeEvent(event, eventId));
        }

    }

Scenario : i send a first message (ok no problems), i send a second message then a third and i get this trace :

logs after sending first message :


2018-01-03 15:30:14.341  INFO [correlationId:] 5483 --- [nio-8091-exec-5] d.red.repository.KafkaEventRepository    : sending event='CommandeEvent(id=1, date=2011-11-12 01:15:00.0, numeroCommande=179257761, eventType=xxx, data={"idPdv":1139,"idCommande":"20624794","idPreparateur":"10"})' to topic='Event'
2018-01-03 15:30:14.365  INFO [correlationId:] 5483 --- [nio-8091-exec-5] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [localhost:9092]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = commandesEvent0
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2018-01-03 15:30:14.379  INFO [correlationId:] 5483 --- [nio-8091-exec-5] o.a.k.clients.producer.KafkaProducer     : Instantiated a transactional producer.
2018-01-03 15:30:14.379  INFO [correlationId:] 5483 --- [nio-8091-exec-5] o.a.k.clients.producer.KafkaProducer     : Overriding the default retries config to the recommended value of 2147483647 since the idempotent producer is enabled.
2018-01-03 15:30:14.379  INFO [correlationId:] 5483 --- [nio-8091-exec-5] o.a.k.clients.producer.KafkaProducer     : Overriding the default max.in.flight.requests.per.connection to 1 since idempontence is enabled.
2018-01-03 15:30:14.379  INFO [correlationId:] 5483 --- [nio-8091-exec-5] o.a.k.clients.producer.KafkaProducer     : Overriding the default acks to all since idempotence is enabled.
2018-01-03 15:30:14.383 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name bufferpool-wait-time
2018-01-03 15:30:14.386 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name buffer-exhausted-records
2018-01-03 15:30:14.388 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.clients.Metadata        : Updated cluster metadata version 1 to Cluster(id = null, nodes = [localhost:9092 (id: -1 rack: null)], partitions = [])
2018-01-03 15:30:14.392 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name produce-throttle-time
2018-01-03 15:30:14.397 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name connections-closed:
2018-01-03 15:30:14.397 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name connections-created:
2018-01-03 15:30:14.397 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name bytes-sent-received:
2018-01-03 15:30:14.398 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name bytes-sent:
2018-01-03 15:30:14.398 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name bytes-received:
2018-01-03 15:30:14.398 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name select-time:
2018-01-03 15:30:14.398 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name io-time:
2018-01-03 15:30:14.402 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name batch-size
2018-01-03 15:30:14.402 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name compression-rate
2018-01-03 15:30:14.402 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name queue-time
2018-01-03 15:30:14.402 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name request-time
2018-01-03 15:30:14.402 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name records-per-request
2018-01-03 15:30:14.403 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name record-retries
2018-01-03 15:30:14.403 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name errors
2018-01-03 15:30:14.403 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name record-size-max
2018-01-03 15:30:14.404 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] org.apache.kafka.common.metrics.Metrics  : Added sensor with name batch-split-rate
2018-01-03 15:30:14.405 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : Starting Kafka producer I/O thread.
2018-01-03 15:30:14.407  INFO [correlationId:] 5483 --- [nio-8091-exec-5] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.11.0.0
2018-01-03 15:30:14.407  INFO [correlationId:] 5483 --- [nio-8091-exec-5] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : cb8625948210849f
2018-01-03 15:30:14.407 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] o.a.k.clients.producer.KafkaProducer     : Kafka producer started
2018-01-03 15:30:14.408 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Transition from state UNINITIALIZED to INITIALIZING
2018-01-03 15:30:14.409  INFO [correlationId:] 5483 --- [nio-8091-exec-5] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] ProducerId set to -1 with epoch -1
2018-01-03 15:30:14.412 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=commandesEvent0, transactionTimeoutMs=60000)
2018-01-03 15:30:14.414 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=commandesEvent0, coordinatorType=TRANSACTION)
2018-01-03 15:30:14.414 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=commandesEvent0, transactionTimeoutMs=60000)
2018-01-03 15:30:14.519 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Initiating connection to node -1 at localhost:9092.
2018-01-03 15:30:14.543 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--1.bytes-sent
2018-01-03 15:30:14.544 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--1.bytes-received
2018-01-03 15:30:14.545 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node--1.latency
2018-01-03 15:30:14.546 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.apache.kafka.common.network.Selector   : Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
2018-01-03 15:30:14.548 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Completed connection to node -1.  Fetching API versions.
2018-01-03 15:30:14.549 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Initiating API versions fetch from node -1.
2018-01-03 15:30:14.642 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Recorded API versions for node -1: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
2018-01-03 15:30:14.643 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [TransactionalId commandesEvent0] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=commandesEvent0, coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null)
2018-01-03 15:30:14.646 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Initiating connection to node 1 at 10.224.89.189:9092.
2018-01-03 15:30:14.647 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node-1.bytes-sent
2018-01-03 15:30:14.647 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node-1.bytes-received
2018-01-03 15:30:14.647 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name node-1.latency
2018-01-03 15:30:14.647 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.apache.kafka.common.network.Selector   : Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1
2018-01-03 15:30:14.647 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Completed connection to node 1.  Fetching API versions.
2018-01-03 15:30:14.647 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Initiating API versions fetch from node 1.
2018-01-03 15:30:14.648 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Recorded API versions for node 1: (Produce(0): 0 to 3 [usable: 3], Fetch(1): 0 to 5 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 4 [usable: 4], LeaderAndIsr(4): 0 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0])
2018-01-03 15:30:14.748 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [TransactionalId commandesEvent0] Sending transactional request (type=InitProducerIdRequest, transactionalId=commandesEvent0, transactionTimeoutMs=60000) to node 10.224.89.189:9092 (id: 1 rack: null)
2018-01-03 15:30:14.756  INFO [correlationId:] 5483 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] ProducerId set to 0 with epoch 17
2018-01-03 15:30:14.756 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Transition from state INITIALIZING to READY
2018-01-03 15:30:14.760 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Transition from state READY to IN_TRANSACTION
2018-01-03 15:30:14.778 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : Sending metadata request (type=MetadataRequest, topics=Event) to node -1
2018-01-03 15:30:14.786 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Updated cluster metadata version 2 to Cluster(id = ILhaPwluTWiEAkhuk6rtwg, nodes = [10.224.89.189:9092 (id: 1 rack: null)], partitions = [Partition(topic = Event, partition = 0, leader = 1, replicas = [1], isr = [1])])
2018-01-03 15:30:14.805 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Begin adding new partition Event-0 to transaction
2018-01-03 15:30:14.828 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=commandesEvent0, producerId=0, producerEpoch=17, partitions=[Event-0])
2018-01-03 15:30:14.828 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [TransactionalId commandesEvent0] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=commandesEvent0, producerId=0, producerEpoch=17, partitions=[Event-0]) to node 10.224.89.189:9092 (id: 1 rack: null)
2018-01-03 15:30:14.833 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Successfully added partitions [Event-0] to transaction
2018-01-03 15:30:14.840 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.c.p.internals.RecordAccumulator    : Assigning sequence number 0 from producer (producerId=0, epoch=17) to dequeued batch from partition Event-0 bound for 10.224.89.189:9092 (id: 1 rack: null).
2018-01-03 15:30:14.850 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name topic.Event.records-per-batch
2018-01-03 15:30:14.852 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name topic.Event.bytes
2018-01-03 15:30:14.854 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name topic.Event.compression-rate
2018-01-03 15:30:14.854 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name topic.Event.record-retries
2018-01-03 15:30:14.854 DEBUG [correlationId:] 5483 --- [ad | producer-1] org.apache.kafka.common.metrics.Metrics  : Added sensor with name topic.Event.record-errors
2018-01-03 15:30:14.877 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : Incremented sequence number for topic-partition Event-0 to 1
2018-01-03 15:30:14.885 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION
2018-01-03 15:30:14.887 DEBUG [correlationId:] 5483 --- [nio-8091-exec-5] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Enqueuing transactional request (type=EndTxnRequest, transactionalId=commandesEvent0, producerId=0, producerEpoch=17, result=COMMIT)
2018-01-03 15:30:14.888 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [TransactionalId commandesEvent0] Sending transactional request (type=EndTxnRequest, transactionalId=commandesEvent0, producerId=0, producerEpoch=17, result=COMMIT) to node 10.224.89.189:9092 (id: 1 rack: null)
2018-01-03 15:30:14.894 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Transition from state COMMITTING_TRANSACTION to READY

logs after second message :


2018-01-03 15:33:22.101 DEBUG [correlationId:] 5483 --- [nio-8091-exec-7] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Transition from state READY to IN_TRANSACTION
2018-01-03 15:33:22.101 DEBUG [correlationId:] 5483 --- [nio-8091-exec-7] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Begin adding new partition Event-0 to transaction
2018-01-03 15:33:22.101 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=commandesEvent0, producerId=0, producerEpoch=17, partitions=[Event-0])
2018-01-03 15:33:22.102 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : [TransactionalId commandesEvent0] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=commandesEvent0, producerId=0, producerEpoch=17, partitions=[Event-0]) to node 10.224.89.189:9092 (id: 1 rack: null)
2018-01-03 15:33:22.106 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [TransactionalId commandesEvent0] Successfully added partitions [Event-0] to transaction
2018-01-03 15:33:22.106 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.c.p.internals.RecordAccumulator    : Assigning sequence number 1 from producer (producerId=0, epoch=17) to dequeued batch from partition Event-0 bound for 10.224.89.189:9092 (id: 1 rack: null).
2018-01-03 15:33:22.109 DEBUG [correlationId:] 5483 --- [ad | producer-1] o.a.k.clients.producer.internals.Sender  : Incremented sequence number for topic-partition Event-0 to 2

and logs after the third :


2018-01-03 15:34:19.243 ERROR [correlationId:] 5483 --- [nio-8091-exec-3] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.KafkaException: TransactionalId commandesEvent0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION] with root cause

org.apache.kafka.common.KafkaException: TransactionalId commandesEvent0: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
    at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:497) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:491) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:185) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:546) ~[kafka-clients-0.11.0.0.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:272) ~[spring-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:60) ~[spring-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:368) ~[spring-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:310) ~[spring-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:166) ~[spring-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]

Thank you for your help !

garyrussell commented 6 years ago

Your transaction managers both have the same bean name transactionManager. So you don't really have a KafkaTransactionManager bean.

The JPA transaction is started by the @Transactional but there's no logic to commit the Kafka transaction.

You either need to use a ChainedTransactionManager to encompass both TMs, or use kafkaTemplate.executeInTransaction(...).

However, in the latter case, the kafka Tx will commit before your serializeEvent unless you also do that within the scope of the executeInTransaction.

The reason you see the first event in a consumer is because the default isolation is READ_UNCOMMITTED.

hajlaoui-nader commented 6 years ago

@garyrussell thank you for your help! I tried both solutions, I tried ChainedTransactionManager and i got the same errors, however kafkaTemplate.executeInTransaction(....) works perfectly :ok_hand:

tomaszszlek commented 6 years ago

Hi @artembilan. Sorry for late response. I created sample application which have exactly the same config as in my application: kafka_transactions_demo You would need to run real Kafka cluster in order to run test as it doesn't use Embedded Kafka. Unfortunately after many tries I wasn't able to reproduce the exception mentioned in my first post using this sample application but maybe you will notice some misconfiguration that causes invalid transition exception in my another app. Thank you in advance for help.

artembilan commented 6 years ago

@tomaszszlek ,

See @garyrussell 's and @hajlaoui-nader 's comments above.

The problem with this "invalid TX status" that we reuse a producer for the next TX when the previous one has not been committed. So, executeInTransaction() should be used.

Although I think we have to do some TX sync registration for the @Transactional. Since we start TX on producer we have to commit/abort it in the end anyway...

tomaszszlek commented 6 years ago

Hi, I thought that by using @Transactional Spring will commit transaction at the end of transactional method. It is described in documentation:

You can use the KafkaTransactionManager with normal Spring transaction support (@Transactional, TransactionTemplate etc). If a transaction is active, any KafkaTemplate operations performed within the scope of the transaction will use the transaction’s Producer. The manager will commit or rollback the transaction depending on success or failure.

Please note that I don't use db transactions at all.

garyrussell commented 6 years ago

We have test cases that prove @Transactional works; there must be something missing from your configuration.

artembilan commented 6 years ago

I think the situation that it works only if we have KafkaTransactionManager, when at the same time we start a new transaction from the producer even if TxManager not KafkaTransactionManager.

Is that your case @tomaszszlek and @hajlaoui-nader ?

garyrussell commented 6 years ago

@Transactional on the listener should begin/commit the transaction ok.

However, for this app, you should be injecting the TM into the kafkaListenerContainerFactory so the container can send the topic offsets to the transaction.

When using @Transactional, the container knows nothing about the transaction so it can't send the offsets to it and will perform a non-transactional offset commit.

tomaszszlek commented 6 years ago

Hi @garyrussell, thank you very much for explanation. I wasn't aware that Spring won't inject Kafka TM into my listener automatically. After I injected TM into kafkaListenerContainerFactory exception disappeared and application works fine. If anyone is interested, this is the needed piece of code: factory.getContainerProperties().setTransactionManager(kafkaTransactionManager());

@hajlaoui-nader I am wondering why ChainedTransactionManager didn't work for you. Maybe you should explicitly indicate TM to use with @Transactional by specifying it in annotation param @Transactional(value="chainedTM") ?

hajlaoui-nader commented 6 years ago

@tomaszszlek I set the correct value in my transaction annotation, i've done some debugging, i see in my ChainedTransactionManager both JpaTransactionManager and KafkaTransactionManager, but when rollback is triggered, only one TM does the rollback.

@tomaszszlek can you please provide your example ?

tomaszszlek commented 6 years ago

Does it make difference if you declare JpaTransactionManager and KafkaTransactionManager in different order ? I would like to use ChainedTransactionManager in another project and I am wondering if I need to know some trick to make it work with Spring Data and Kafka. @hajlaoui-nader I put link to my sample project in previous post: kafka_transactions_demo however this demo project do not use DB transactions at all.

hajlaoui-nader commented 6 years ago

I think the order matters : here's the code in ChainedTransactionManager (rollback)

for (PlatformTransactionManager transactionManager : reverse(transactionManagers)) {
            try {
                multiTransactionStatus.rollback(transactionManager);
            } catch (Exception ex) {
                if (rollbackException == null) {
                    rollbackException = ex;
                    rollbackExceptionTransactionManager = transactionManager;
                } else {
                    LOGGER.warn("Rollback exception (" + transactionManager + ") " + ex.getMessage(), ex);
                }
            }
        }

notice thereverse(transactionManagers).

garyrussell commented 6 years ago

FYI - this works fine for me...

@SpringBootApplication
@EnableTransactionManagement
public class Kgh433Application {

    public static void main(String[] args) {
        ConfigurableApplicationContext ctx = SpringApplication.run(Kgh433Application.class, args);
        Map<String, PlatformTransactionManager> tms = ctx.getBeansOfType(PlatformTransactionManager.class);
        System.out.println(tms);
        ctx.close();
    }

    @Bean
    public ApplicationRunner runner(Foo foo) {
        return args -> foo.sendToKafkaAndDB();
    }

    @Bean
    public JpaTransactionManager transactionManager() {
        return new JpaTransactionManager();
    }

    @Bean
    public ChainedTransactionManager chainedTxM(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
        return new ChainedTransactionManager(jpa, kafka);
    }

    @Component
    public static class Foo {

        @Autowired
        private KafkaTemplate<Object, Object> template;

        @Autowired
        private MessageRepo repo;

        @Transactional(transactionManager = "chainedTxM")
        public void sendToKafkaAndDB() throws Exception {
            this.repo.save(new Message("foo", 1L));
            System.out.println(this.template.send("kgh433", "bar").get());
            System.out.println(this.template.send("kgh433", "baz").get());
        }

    }

}
spring.jpa.generate-ddl=true
spring.kafka.producer.transaction-id-prefix=txId
tomaszszlek commented 6 years ago

Hi @garyrussell it works for me too, both DB an Kafka transactions are rolled back when exception was thrown in transactional method. Thanks again for your help guys.

msfkos commented 5 years ago

Hi @garyrussell . I also am facing the same issue.

Spring boot version : 2.0.0.RELEASE Spring kafka version: 2.1.4.RELEASE Kafka version : 2.11_1.0.1

Code is here

garyrussell commented 5 years ago

Don't add comments to an existing issue - especially one that has been closed for nearly a year; open a new issue if you believe there is a bug; if you simply have a question, ask it on Stack Overflow with the spring-kafka tag.