apache / pulsar

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org/
Apache License 2.0
14.24k stars 3.58k forks source link

Unauthorized; error code: 401 when using pulsar-io-kafka connector with schema-registry that requires auth #16181

Open brigen opened 2 years ago

brigen commented 2 years ago

Got com.google.common.util.concurrent.UncheckedExecutionException: java.lang.RuntimeException: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401 when trying to use kafka-source connector with schema registry that requires basic auth

Steps to reproduce

Start the connector that connects to a kafka-broker, but with KafkaAvroDeserializer which goes to schema-registry. Put a basic auth to the schema-registry use className: org.apache.pulsar.io.kafka.KafkaBytesSource config file of pulsar-io-kafka connector: bootstrapServers: "localhost:9092" topic: abcV1 groupId: "group.V1.consumer-1" valueDeserializationClass: io.confluent.kafka.serializers.KafkaAvroDeserializer consumerConfigProperties: client.id: "avs.sit" security.protocol: "SASL_SSL" sasl.mechanism: "PLAIN" acks: "all" client.dns.lookup: use_all_dns_ips sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"password\";" value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer basic.auth.credentials.source: USER_INFO specific.avro.reader: true schema.registry.url: https://your-schmea-registry-url basic.auth.user.info: USER:PASSOWRD

System configuration

Pulsar version: 2.9.2

Already digged the source code and saw that the problem might be here: in the KafkaBytesSource.java line 110

private void initSchemaCache(Properties props) { KafkaAvroDeserializerConfig config = new KafkaAvroDeserializerConfig(props); List urls = config.getSchemaRegistryUrls(); int maxSchemaObject = config.getMaxSchemasPerSubject(); SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(urls, maxSchemaObject); log.info("initializing SchemaRegistry Client, urls:{}, maxSchemasPerSubject: {}", urls, maxSchemaObject); schemaCache = new AvroSchemaCache(schemaRegistryClient); }

The cached schema registry is called without properties basically, and it creates a RestService without properties too, and the default RestService never calls configure, so even thou we are passing basic.auth.user.info they never get passed to RestService so the call towards schema-registry is made without an Authorization header

github-actions[bot] commented 2 years ago

The issue had no activity for 30 days, mark with Stale label.