confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
102 stars 1.04k forks source link

Create Stream fails with "The broker does not support INIT_PRODUCER_ID" #7508

Closed lloiacono closed 3 years ago

lloiacono commented 3 years ago

I'm trying to create a stream from an existing topic and I get the following error:

Could not write the statement 'CREATE STREAM customers_stream (
  id BIGINT,
  contact_id BIGINT,
  email STRING,
  first_name STRING,
  last_name STRING,
  age INTEGER
) WITH (
  kafka_topic='customers',
  value_format='json'
);' into the command topic: The broker does not support INIT_PRODUCER_ID
Caused by: The broker does not support INIT_PRODUCER_ID

ksqldb version: 6.1.1 kafka version: kafka_2.13-2.8.0 without Zookeeper

I've searched for the error, but found no information on how to fix it or troubleshoot. My kafka server.properties are the defaults (except for listener and advertised listener)

Am I missing some configuration on the broker side?

In ksqldb logs I get this when trying to create the stream

[2021-05-12 15:56:34,460] WARN Error registering AppInfo mbean (org.apache.kafka.common.utils.AppInfoParser)
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-default_
    at java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
    at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
    at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
    at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
    at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
    at java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:435)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
    at io.confluent.ksql.rest.server.computation.CommandStore.createTransactionalProducer(CommandStore.java:299)
    at io.confluent.ksql.rest.server.computation.DistributingExecutor.execute(DistributingExecutor.java:127)
    at io.confluent.ksql.rest.server.execution.RequestHandler.lambda$executeStatement$0(RequestHandler.java:123)
    at io.confluent.ksql.rest.server.execution.RequestHandler.executeStatement(RequestHandler.java:126)
    at io.confluent.ksql.rest.server.execution.RequestHandler.execute(RequestHandler.java:100)
    at io.confluent.ksql.rest.server.resources.KsqlResource.handleKsqlStatements(KsqlResource.java:275)
    at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeKsqlRequest$2(KsqlServerEndpoints.java:164)
    at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOldApiEndpointOnWorker$22(KsqlServerEndpoints.java:302)
    at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOnWorker$21(KsqlServerEndpoints.java:288)
    at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:313)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:829)
[2021-05-12 15:56:34,466] INFO [Producer clientId=producer-default_, transactionalId=default_] Invoking InitProducerId for the first time in order to acquire a producer ID (org.apache.kafka.clients.producer.internals.TransactionManager)
[2021-05-12 15:56:34,470] INFO [Producer clientId=producer-default_, transactionalId=default_] Cluster ID: 2fx2KW7BQBi1IOMfYExX-g (org.apache.kafka.clients.Metadata)
[2021-05-12 15:56:34,475] INFO [Producer clientId=producer-default_, transactionalId=default_] Discovered transaction coordinator host.docker.internal:9092 (id: 1 rack: null) (org.apache.kafka.clients.producer.internals.TransactionManager)
[2021-05-12 15:56:34,587] INFO [Producer clientId=producer-default_, transactionalId=default_] Transiting to fatal error state due to org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support INIT_PRODUCER_ID (org.apache.kafka.clients.producer.internals.TransactionManager)
[2021-05-12 15:56:34,588] INFO Processed unsuccessfully: KsqlRequest{ksql='CREATE STREAM customers_stream (
  id BIGINT,
  contact_id BIGINT,
  email STRING,
  first_name STRING,
  last_name STRING,
  age INTEGER
) WITH (
  kafka_topic='customers',
  value_format='json'
);', configOverrides={}, requestProperties={}, commandSequenceNumber=Optional[-1]}, reason: Could not write the statement 'CREATE STREAM customers_stream (
  id BIGINT,
  contact_id BIGINT,
  email STRING,
  first_name STRING,
  last_name STRING,
  age INTEGER
) WITH (
  kafka_topic='customers',
  value_format='json'
);' into the command topic: The broker does not support INIT_PRODUCER_ID (io.confluent.ksql.rest.server.resources.KsqlResource)
[2021-05-12 15:56:34,592] INFO App info kafka.admin.client for adminclient-5 unregistered (org.apache.kafka.common.utils.AppInfoParser)
[2021-05-12 15:56:34,641] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
[2021-05-12 15:56:34,642] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
[2021-05-12 15:56:34,642] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
lloiacono commented 3 years ago

Ok, after reading this README.md I understand why is not working.

Finally, the following Kafka features have not yet been fully implemented: ... Support for transactions and exactly-once semantics ...

I'll close the issue, it took me some time to figure out the problem.