confluentinc / kafka-rest

Confluent REST Proxy for Kafka
https://docs.confluent.io/current/kafka-rest/docs/index.html
Other
45 stars 646 forks source link

Rest Proxy throws KafkaException when configuring metric.reporters #517

Open brianfh opened 5 years ago

brianfh commented 5 years ago

The rest proxy configuration "metric.reporters" should accept a class that implements the interface MetricsReporter in the io.confluent.common.metrics package. This appears to work for the KafkaRestConfigs part of initialization, as my log file contains this:

[2018-12-06 16:02:50,415] INFO KafkaRestConfig values: metric.reporters = [com.redfin.kafka.metrics.KafkaRestProxyMetricsReporter] client.security.protocol = PLAINTEXT bootstrap.servers = PLAINTEXT://localhost:9092 response.mediatype.default = application/vnd.kafka.v1+json ... (io.confluent.kafkarest.KafkaRestConfig:170)

But immediately after this, the ProducerConfig initialization fails, with this stack trace:

[2018-12-06 16:02:51,506] INFO ProducerConfig values: ... max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [com.redfin.kafka.metrics.KafkaRestProxyMetricsReporter] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 ... transactional.id = null value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer (org.apache.kafka.clients.producer.ProducerConfig:223) [2018-12-06 16:02:51,508] INFO Closing the Kafka producer with timeoutMillis = 0 ms. (org.apache.kafka.clients.producer.KafkaProducer:1017) [2018-12-06 16:02:51,511] ERROR Server died unexpectedly: (io.confluent.kafkarest.KafkaRestMain:63) org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:416) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:278) at io.confluent.kafkarest.ProducerPool.buildNoSchemaProducer(ProducerPool.java:116) at io.confluent.kafkarest.ProducerPool.buildBinaryProducer(ProducerPool.java:102) at io.confluent.kafkarest.ProducerPool.(ProducerPool.java:75) at io.confluent.kafkarest.ProducerPool.(ProducerPool.java:64) at io.confluent.kafkarest.ProducerPool.(ProducerPool.java:56) at io.confluent.kafkarest.extension.KafkaRestContextProvider.initialize(KafkaRestContextProvider.java:66) at io.confluent.kafkarest.KafkaRestApplication.setupInjectedResources(KafkaRestApplication.java:100) at io.confluent.kafkarest.KafkaRestApplication.setupResources(KafkaRestApplication.java:82) at io.confluent.kafkarest.KafkaRestApplication.setupResources(KafkaRestApplication.java:45) at io.confluent.rest.Application.createServer(Application.java:157) at io.confluent.rest.Application.start(Application.java:489) at io.confluent.kafkarest.KafkaRestMain.main(KafkaRestMain.java:56) Caused by: java.lang.ClassCastException: class com.redfin.kafka.metrics.KafkaRestProxyMetricsReporter at java.lang.Class.asSubclass(Class.java:3404) at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:300) at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:286) at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:263) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:320) ... 13 more

I believe this happens because the producer expects its metrics reporter to implement the MetricsReporter interface in a different package: org.apache.kafka.common.metrics (instead of io.confluent.common.metrics).

It looks like I can work around this by removing the metric.reporters entry from the Map passed to the KafkaRestProxyMetricsReporter's configure method, but this is extremely hacky! And I'm not sure if this workaround is a cause for #516.

jdm2212 commented 5 years ago

I just hit this same issue -- KafkaProducer expects the metric.reporters key to be a Kafka org.apache.kafka.common.metrics.MetricsReporter, and Rest Proxy wants the metric.reporters key to be a Confluent io.confluent.common.metrics.MetricsReporter.

You can't have a single class implement both interfaces (at least not in a type-safe way) because one of their methods has the same erasure. Specifically, both have a void init(List).