linkedin / brooklin

An extensible distributed system for reliable nearline data streaming at scale
BSD 2-Clause "Simplified" License
919 stars 137 forks source link

InstanceAlreadyExistsException: kafka.producer:type=app-info,id=brooklin-producer-1 #674

Open rantav opened 4 years ago

rantav commented 4 years ago

Subject of the issue

When creating kafka mirroring tasks there's an exception regarding jmx. It seems to be a race condition b/c it doesn't always happen, and tends to happen more when maxTasks is higher.

[2019-11-27 14:35:37,129] WARN Error registering AppInfo mbean (org.apache.kafka.common.utils.AppInfoParser)
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=brooklin-producer-1
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:451)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:304)
    at com.linkedin.datastream.kafka.factory.SimpleKafkaProducerFactory.createProducer(SimpleKafkaProducerFactory.java:27)
    at com.linkedin.datastream.kafka.KafkaProducerWrapper.initializeProducer(KafkaProducerWrapper.java:176)
    at com.linkedin.datastream.kafka.KafkaProducerWrapper.maybeGetKafkaProducer(KafkaProducerWrapper.java:147)
    at com.linkedin.datastream.kafka.KafkaProducerWrapper.send(KafkaProducerWrapper.java:194)
    at com.linkedin.datastream.kafka.KafkaTransportProvider.send(KafkaTransportProvider.java:151)
    at com.linkedin.datastream.server.EventProducer.send(EventProducer.java:191)
    at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.sendDatastreamProducerRecord(AbstractKafkaBasedConnectorTask.java:274)
    at com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorTask.sendDatastreamProducerRecord(KafkaMirrorMakerConnectorTask.java:246)
    at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.translateAndSendBatch(AbstractKafkaBasedConnectorTask.java:229)
    at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.processRecords(AbstractKafkaBasedConnectorTask.java:481)
    at com.linkedin.datastream.connectors.kafka.AbstractKafkaBasedConnectorTask.run(AbstractKafkaBasedConnectorTask.java:315)
    at java.lang.Thread.run(Thread.java:748)

Your environment

I am using this docker image dongjinleekr/brooklin:1.0.0-2, which internally uses brooklin 1.0.0

Steps to reproduce

Start a mirror task with maxTasks 50 (other values could also work, my guess is that higher the value the bigger the chance)

For example:

 bin/brooklin-rest-client.sh -o CREATE -u http://localhost:32311/ -n mirror -s "kafka://x.x.x.x.:9092/topic5" -c kafkaMirroringConnector -t kafkaTransportProvider -m '{"owner":"test-user","system.reuseExistingDestination":"false","maxTasks":"50"}' 2>/dev/null

Expected behaviour

There should not be an error in the log and the mbean(s) should successfully register n

Actual behaviour

There is an exception in the log and I'm guessing (but didn't check) that the mbean is not properly registered.

sposec commented 4 years ago

I am looking into this as I have the same problem. Seems related to the attribution of ClientID / GroupID.

youngfuture commented 4 years ago

I had the same problem yesterday I had the same problem yesterday while I restart my application