testcontainers / testcontainers-java

Testcontainers is a Java library that supports JUnit tests, providing lightweight, throwaway instances of common databases, Selenium web browsers, or anything else that can run in a Docker container.
https://testcontainers.org
MIT License
8.01k stars 1.65k forks source link

KafkaContainer transactions: Timeout expired while initializing transactional state in 60000ms. #1816

Open ernesthill opened 5 years ago

ernesthill commented 5 years ago

I'm trying create a test that uses kafka transactions. If I use a local instance of Kafka instead of KafkaContainer everything works fine, but when I use KafkaContainer it fails and I see the following: Timeout expired while initializing transactional state in 60000ms. org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms. Attached are the source for my test and the logfile from the run. kafka.log Test.java.txt

kiview commented 5 years ago

I just had a quick check with our KafkaContainerTest and added a transaction example and I see the same behaviour, but with a slightly different error mesage:

Timeout expired after 60000milliseconds while awaiting InitProducerId
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

It happens on producer.initTransactions().

bsideup commented 5 years ago

@gAmUssA any idea? :)

gAmUssA commented 5 years ago

@bsideup @kiview Gents Let me take a look

gAmUssA commented 5 years ago

@ernesthill

you need to configure some of the broker parameters in order transaction state store will be initialized correctly. Here's correct test

NOTE: it's always good to enable log output from the container to debug this kind of issues. I use Slf4jLogConsumer from TC.

@Slf4j
public class ProducerTransactionTest {

  public static KafkaContainer kafka = new KafkaContainer("5.2.1")
      .withLogConsumer(new Slf4jLogConsumer(log));

  @BeforeClass
  public static void prep() {
    // see https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-NewConfigurations
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
    kafka.start();
  }

  @Test
  public void testIt() {
    Properties props = new Properties();
    props.put(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(TRANSACTIONAL_ID_CONFIG, "prod-0");
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    ProducerRecord<String, String> record = new ProducerRecord<>("something", "A message");
    producer.initTransactions();
    producer.beginTransaction();
    producer.send(record);
    producer.commitTransaction();
  }
}

Let me know if you have any questions.

@bsideup @kiview do you want me to send PR with a test for this issue?

ernesthill commented 5 years ago

Thanks for the information.

debop commented 5 years ago

Thanks for this information !!!

seglo commented 5 years ago

This should be part of default Kafka test container config, since (AFAIK) it always runs in a single broker configuration.

zenglian commented 4 years ago

The default value of transaction.state.log.replication.factor is 3 and transaction.state.log.min.isr is 2. So if broker count in your cluster is less than 3, kafka server fails to (automatically) create the topic __transaction_state, thus client got timedout error.

zaanwer commented 4 years ago

I did have this transaction.state.log.replication.factor set to 1 but still seeing this error in producer.initTransaction()

"Timeout expired while initializing transactional state in 60000ms"

zaanwer commented 4 years ago

It went to following code, got the transactionManager.initializeTransactions into result but timing out in last line result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); public void initTransactions() { throwIfNoTransactionManager(); throwIfProducerClosed(); TransactionalRequestResult result = transactionManager.initializeTransactions(); sender.wakeup(); result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); }

zenglian commented 4 years ago

I did have this transaction.state.log.replication.factor set to 1 but still seeing this error in producer.initTransaction()

"Timeout expired while initializing transactional state in 60000ms"

Note there are 2 props to change. Do not try to debug the client code. It’s an server error.

zaanwer commented 4 years ago

I have both properties set and I have numBroker=1.

properties.put("transaction.state.log.replication.factor", Short.valueOf("1")); properties.put("transaction.state.log.min.isr", 1);

ivanprostran commented 4 years ago

I have 10 nodes cluster with the following configuration:

Kafka Stream log (Exactly once enabled, static group membership enabled):

transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

`

2020-01-30 11:23:00 [Processor-StreamThread-1] ERROR o.a.k.s.p.internals.StreamTask - stream-thread [Processor-StreamThread-1] task [0_0] Timeout exception caught when initializing transactions for task 0_0. This might happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter max.block.ms to increase this timeout. .. .. org.apache.kafka.common.errors.TimeoutException: Timeout expired after 240000milliseconds while awaiting InitProducerId

org.apache.kafka.streams.errors.StreamsException: stream-thread [Processor-StreamThread-1] Failed to rebalance. .. .. Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 240000milliseconds while awaiting InitProducerId `

This is a big problem, I don't know what to do. Only solution to this is to restart the cluster.

This happens occasionally and kafka stream application (client) could not be started (i.e. transition to RUNNING state) before the brokers are restarted manually. (I tried to restart client application several times but but the problem was not solved before broker restart)

Additional info:

Kafka client/broker: 2.4.0 Nodes are up&running (alive)

[zk: localhost:2181(CONNECTED) 0] ls /kafka_cluster/brokers/ids
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

I appreciate any help!

Thank you

bsideup commented 4 years ago

@ivanprostran this sounds like a problem with Kafka / your app and does not seem to be related to Testcontainers' Kafka module.

ivanprostran commented 4 years ago

Thank you for the info.

I saw the same error and I am desperate.

ivanprostran commented 4 years ago

I will post it to different group (sorry for the inconvenience)

pancudaniel7 commented 4 years ago

@gAmUssA I've tried your setup regarding: KafkaContainer, to add all necessary env variables for transactional mode but I have the same Timeout expired while initializing transactional state in 60000ms. problem.

I've made a debug on the spring library code and on my side it gets stuck on this.producerFactory.createProducer(); line 275 on KafkaTemplate

gAmUssA commented 4 years ago

@pancudaniel7 could you share a reproducer? Thank you

cemkayan commented 4 years ago

I am also getting same exception. I am using 'KafkaTemplate' with 'executeInTransaction'

props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-0"); // I am really not clear what this is for. I set it once in the producerConfig. Then create KafkaTemplate and use it many times...

This is how I call Kafka template:

        kafkaTemplate.setTransactionIdPrefix(message.getGuid()); // I am also not sure about this line. I tried to put a unique value to prevent same message to be created twice.

        kafkaTemplate.executeInTransaction(kafkaTemplate -> {

        ListenableFuture<SendResult<String, Event>> future = kafkaTemplate.send(topic, message);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Event>>() {

            @Override
            public void onSuccess(SendResult<String, Event> result) {
            }

            @Override
            public void onFailure(Throwable ex) {
            }
        });

        return null;
        });

This is what I receive

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 10000milliseconds while awaiting InitProducerId
maver1ck commented 4 years ago

Same problem here :(

I'm using Flink job with FlinkKafkaProducer.Semantic.EXACTLY_ONCE.

When using same job without Test Containers everything is working fine.

bsideup commented 4 years ago

@maver1ck the transactions are not enabled by default, see @gAmUssA's answer: https://github.com/testcontainers/testcontainers-java/issues/1816#issuecomment-529992060

maver1ck commented 4 years ago

@bsideup checking this

EDIT: It works. Thanks.

MrMikeFloyd commented 3 years ago

@ernesthill

you need to configure some of the broker parameters in order transaction state store will be initialized correctly. Here's correct test

NOTE: it's always good to enable log output from the container to debug this kind of issues. I use Slf4jLogConsumer from TC.

@Slf4j
public class ProducerTransactionTest {

  public static KafkaContainer kafka = new KafkaContainer("5.2.1")
      .withLogConsumer(new Slf4jLogConsumer(log));

  @BeforeClass
  public static void prep() {
    // see https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-NewConfigurations
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
    kafka.start();
  }

  @Test
  public void testIt() {
    Properties props = new Properties();
    props.put(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(TRANSACTIONAL_ID_CONFIG, "prod-0");
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    ProducerRecord<String, String> record = new ProducerRecord<>("something", "A message");
    producer.initTransactions();
    producer.beginTransaction();
    producer.send(record);
    producer.commitTransaction();
  }
}

Let me know if you have any questions.

@bsideup @kiview do you want me to send PR with a test for this issue?

This did the trick for me 👆❤️

In case anyone has the same issue using EmbeddedKafka in Spring Boot test, the values can be set when defining the config for the embedded Kafka instance as follows:

@EmbeddedKafka(topics = "kafka-test", ports = 9099, brokerProperties = {
    "transaction.state.log.replication.factor=1",
    "transaction.state.log.min.isr=1"
})
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
    properties = {
        "spring.kafka.bootstrap-servers=localhost:9099"
    })
class KafkaEmbeddedIT {
...
}
kcotzen commented 3 years ago

Hi everyone, Unfortunately I have the same error:

2020-11-26 11:27:08.246  INFO [poc-test,,,] 506391 --- [ producer-tx-3] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-tx-3, transactionalId=tx-3] Cluster ID: -nEH5zcySOSTk7pnaSsZOg
2020-11-26 11:28:08.238 ERROR [poc-test,,,] 506391 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Failed to obtain partition information

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId 

In the console output I see this:

2020-11-26 11:27:08.218  INFO [poc-test,,,] 506391 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = -1
    batch.size = 16384
    bootstrap.servers = [PLAINTEXT://localhost:33193]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-tx-3
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = true
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 1
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = tx-3
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

My configuration of KafkaContainer :

        lateinit var kafka: KafkaContainer

        init {
            configureKafka()
        }

        private fun configureKafka() {
            kafka = KafkaContainer("5.3.2-1")
            kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "2")
            kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
            kafka.start()
        }

And my configuration in the application.yml looks like this:

spring:
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: tx-
          producer-properties:
            retries: 1
            acks: all

I'm not getting it to start up successfully, has anyone been able to overcome this? thanks in advance

maver1ck commented 3 years ago

kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "2") Why 2?

kcotzen commented 3 years ago

kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "2") Why 2?

If I use :

            kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "3")
            kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "2")

The error is the same. If I use 1 for both, I got this:

2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
bsideup commented 3 years ago

@kcotzen it should be 1, unless you start & connect two brokers

kcotzen commented 3 years ago

I used value 1 for both, and I got this error:

2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
kcotzen commented 3 years ago

I made some modifications(I removed use of chainedTransactionManager), and it works Zero errors in console output, but when I use chainedTransactionManager :

2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

sounds weird. Any ideas please?

kcotzen commented 3 years ago

well, for some reason the problem was caused by the chainedTransactionManager definition, I think the console output is very weird and led me to confusion. Thanks anyway.

daoanhvu commented 2 months ago

I got the same issue while trying to apply transaction producers. I fixed it by overriding these two settings at server side: transaction.state.log.min.isr = 1 transaction.state.log.replication.factor = 1

if you are using Kafka container with docker then most likely you will go with: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1