shmoli / ccloud_metrics_source

Kafka connect source connector to grab runtime metrics from Confluent Cloud and publish into Kafka.
Apache License 2.0
9 stars 3 forks source link

Doesnt work Confluent Cloud Schema Registry #3

Closed griga23 closed 4 years ago

griga23 commented 4 years ago

I tried to run this connector with following properties to use CCloud SR:

key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=https://psrc-lgy7n.europe-west3.gcp.confluent.cloud value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=https://psrc-lgy7n.europe-west3.gcp.confluent.cloud basic.auth.credentials.source=USER_INFO basic.auth.user.info=key:pass

griga23 commented 4 years ago

[2020-09-18 15:45:11,059] ERROR Error encountered in task CCloudMetrics-0. Executing stage 'KEY_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where source record is = SourceRecord{sourcePartition={SourceCluster=lkc-wj5rm}, sourceOffset={ts=1600436658}} ConnectRecord{topic='CCloudMetrics', kafkaPartition=null, key=Struct{cluster_id=lkc-wj5rm,topic=CCloudMetrics,name=io.confluent.kafka.server/retained_bytes}, keySchema=Schema{metric.key:STRUCT}, value=Struct{name=io.confluent.kafka.server/retained_bytes,long_name=lkc-wj5rm|CCloudMetrics|io.confluent.kafka.server/retained_bytes,timestamp=1600436580,groupby_name=metric.label.topic,groupby_value=CCloudMetrics,dimensions=Struct{cluster_id=lkc-wj5rm,topic=CCloudMetrics,host=lkc-wj5rm},values=Struct{doubleValue=1.2670301E7}}, valueSchema=Schema{metric.value:STRUCT}, timestamp=1600436658000, headers=ConnectHeaders(headers=)}. (org.apache.kafka.connect.runtime.errors.LogReporter:62) org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic CCloudMetrics : at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:87) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:292) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:292) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:321) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"key","namespace":"metric","fields":[{"name":"cluster_id","type":"string"},{"name":"topic","type":"string"},{"name":"name","type":"string"}],"connect.name":"metric.key"} Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:293) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:495) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:486) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:459) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:206) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:268) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:244) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:74) at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:138) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:84) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$1(WorkerSourceTask.java:292) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:292) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:321) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:245) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

griga23 commented 4 years ago

2020-09-18 15:44:18,653] INFO AvroConverterConfig values: bearer.auth.token = [hidden] proxy.port = -1 schema.reflection = false auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = USER_INFO value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy schema.registry.url = [https://psrc-lgy7n.europe-west3.gcp.confluent.cloud] basic.auth.user.info = [hidden] proxy.host = schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy (io.confluent.connect.avro.AvroConverterConfig:179) [2020-09-18 15:44:18,666] INFO KafkaAvroSerializerConfig values: bearer.auth.token = [hidden] proxy.port = -1 schema.reflection = false auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = USER_INFO value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy schema.registry.url = [https://psrc-lgy7n.europe-west3.gcp.confluent.cloud] basic.auth.user.info = [hidden] proxy.host = schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy (io.confluent.kafka.serializers.KafkaAvroSerializerConfig:179)

griga23 commented 4 years ago

It was configuration issue. Both Key and Value Converters must have valid Schema Registry links