confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
126 stars 1.04k forks source link

Cannot create stream because getting timeout while initializing transaction to the KSQL command topic #7953

Open wowo opened 3 years ago

wowo commented 3 years ago

Describe the bug Cannot create stream because getting timeout while initializing transaction to the KSQL command topic. I'm trying to create stream, sometimes it succeeds, most of the time fails. I guess stream definition doesn't really matter, but here it is:

To Reproduce The version of KSQL confluentinc/ksqldb-server:0.19.0 (running on Google Kubernetes Engine). Kafka runs on virtual machines

CREATE OR REPLACE STREAM WALLET_BALANCE WITH (KAFKA_TOPIC='WALLET_BALANCE', VALUE_FORMAT='JSON')
AS SELECT
    'WALLET_BALANCE' MESSAGE_TYPE,
    EVENTS.CLIENTID CLIENTID,
    CAST(EXTRACTJSONFIELD(EVENTS.MESSAGE, '$.Balance') AS DOUBLE)  BALANCE,
    EXTRACTJSONFIELD(EVENTS.MESSAGE, '$.CurrencyId') CURRENCY,
    ROWTIME ROW_TIME,
    TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss', 'Europe/London') ROW_TIME_FORMATTED
FROM EVENTS EVENTS
WHERE (EVENTS.MESSAGETYPE = 'WalletBalance')
EMIT CHANGES;

Expected behavior Stream created.

Actual behaviour CLI output:

Timeout while initializing transaction to the KSQL command topic.
If you're running a single Kafka broker, ensure that the following configs are set to 1 on the broker:
- transaction.state.log.replication.factor
- transaction.state.log.min.isr
- offsets.topic.replication.factor
Caused by: Timeout expired after 60000 milliseconds while awaiting
    InitProducerId

Exception in logs:

io.confluent.ksql.util.KsqlServerException: Timeout while initializing transaction to the KSQL command topic.
    at io.confluent.ksql.rest.server.computation.DistributingExecutor.execute(DistributingExecutor.java:132)
    at io.confluent.ksql.rest.server.execution.RequestHandler.lambda$executeStatement$0(RequestHandler.java:122) 
    at io.confluent.ksql.rest.server.execution.RequestHandler.executeStatement(RequestHandler.java:125)
    at io.confluent.ksql.rest.server.execution.RequestHandler.execute(RequestHandler.java:99)
    at io.confluent.ksql.rest.server.resources.KsqlResource.handleKsqlStatements(KsqlResource.java:304) 
    at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeKsqlRequest$2(KsqlServerEndpoints.java:191)
    at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOldApiEndpointOnWorker$23(KsqlServerEndpoints.java:342) 
    at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOnWorker$22(KsqlServerEndpoints.java:328)
    at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:313)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000 milliseconds while awaiting InitProducerId

Command topic details:

/opt/bitnami/kafka/bin/kafka-topics.sh --zookeeper 10.164.0.32:2181 --describe --topic '_confluent-ksql-default__command_topic'
Topic: _confluent-ksql-default__command_topic   PartitionCount: 1       ReplicationFactor: 1    Configs: cleanup.policy=delete,min.insync.replicas=1,retention.ms=-1,unclean.leader.election.enable=false
        Topic: _confluent-ksql-default__command_topic   Partition: 0    Leader: 1002    Replicas: 1002  Isr: 1002

Additional context I'm running 3 node kafka cluster. All three brokers passed in KSQL_BOOTSTRAP_SERVERS (previously had one value there, now passed all three). All existing streams are working fine.

wowo commented 3 years ago

Added stack trace from logs ☝️

wowo commented 3 years ago

Also previously I was getting error like this: Failed to get offsets by times in 30000ms

wowo commented 3 years ago

Update: I've run opt/bitnami/kafka/bin/kafka-reassign-partitions.sh --zookeeper 10.164.0.32:2181 --reassignment-json-file increase-replication-factor.json --execute and increased replication factor of the command topic to 3. First 3 streams created without problem, now getting it again...

colinhicks commented 3 years ago

@wowo, there are a number of environmental factors that could be causing this symptom. The caused by error message, Timeout expired after 60000 milliseconds while awaiting InitProducerId, is a signal to look for exceptions in the logs for your Kafka brokers and to check whether your Kafka cluster has any partitions that are under the minimum ISR.

Was the number of brokers in the Kafka cluster increased at some point?

wowo commented 3 years ago

@colinhicks Thanks for answer and sorry for late reply. No, it always has been 3 brokers in the cluster. I've recentyl added another 2 ips into KSQL configuration, as a remedy fo the problem but it didn't help (and I think it doesn't really matter)

wowo commented 3 years ago

Another bug which happens now "There is a newer producer with the same transactionalId which fences the current one." related to this...

wowo commented 3 years ago

Also drop stream <any of them> works immediatelly...

wowo commented 3 years ago

Another problems when I run describe <stream> extended:

[2021-08-27 11:04:05,458] ERROR Failed to list Kafka consumer groups offsets
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting
    for a node assignment. Call: listConsumerGroupOffsets
Caused by: Timed out waiting for a node assignment. Call:
    listConsumerGroupOffsets (io.confluent.ksql.cli.console.Console:344)

I'm suspecting some problems with Zookeeper, but restart didn't help...

colinhicks commented 3 years ago

The drop stream command does not attempt to Kafka, whereas the other commands do. These symptoms point toward the ksqlDB cluster not being able to connect to the brokers.

colinhicks commented 3 years ago

One suggestion is to more directly test the network route-ability between the ksqlDB hosts and the brokers.

For example, running telnet from the ksql host(s) to the brokers should succeed:

ksql-host1:~$ telnet <broker1-dns-name-or-ip> 9092
Trying <ip address>...
Connected to <broker1-dns-name-or-ip>.
Escape character is '^]'.
wowo commented 3 years ago

@colinhicks I did it, telnet works just fine

ping 10.164.0.30
64 bytes from 10.164.0.30: seq=811 ttl=63 time=0.354 ms
64 bytes from 10.164.0.30: seq=812 ttl=63 time=0.277 ms
^C
--- 10.164.0.30 ping statistics ---
813 packets transmitted, 813 packets received, 0% packet loss
round-trip min/avg/max = 0.157/0.275/1.549 ms

The networking layer looks okay to me, but I'm open to investigate more.

Btw. I've created SO thread as well https://stackoverflow.com/questions/68953691/kafka-ksql-cant-create-streams-due-to-timeouts

wowo commented 3 years ago

Also I'm getting following outputs when querying rest API:

$ curl http://localhost:8088/info
{"KsqlServerInfo":{"version":"0.19.0","kafkaClusterId":"jjJyaiWBQPaI3_TrFSQmxw","ksqlServiceId":"prod-ksqldb-server","serverStatus":"ERROR"}}
$ curl http://localhost:8088/healthcheck
{"isHealthy":false,"details":{"metastore":{"isHealthy":true},"kafka":{"isHealthy":true},"commandRunner":{"isHealthy":false}}}
wowo commented 3 years ago

I've found a workaround by using ksql.queries.file and running it in a Headless mode. It works for me as I don't really need interactive version.

anoo-muthuswaami commented 2 years ago

To the broker, I set the below replication factor to 1 and it worked with ksqldb in interactive mode.

- transaction.state.log.replication.factor
- transaction.state.log.min.isr
- offsets.topic.replication.factor

Reference - https://github.com/confluentinc/cp-all-in-one/blob/7.2.2-post/cp-all-in-one/docker-compose.yml Look at the broker configuration