spring-cloud / spring-cloud-schema-registry

A schema registry implementation for Spring Cloud Stream
47 stars 27 forks source link

Spring Cloud Stream SSL authentication to Schema Registry- 401 unauthorized #5

Open sabbyanandan opened 5 years ago

sabbyanandan commented 5 years ago

@mraliagha commented on Sun Apr 21 2019

I am trying to use Spring Cloud Stream (which is based on Spring Boot) application with Confluent Schema Registry. The authentication method is SSL and I am not able to pass the connection step for some reason. Either the configuration parameters are incorrect or I am hitting a bug. I would appreciate it if you could help me to realize how to resolve this issue.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: sensor-topic
      schemaRegistryClient:
        endpoint: https://[schema-registry-url]:[port]
      schema:
        avro:
          schema-locations: classpath:avro/sensor.avsc
server.port: 9999
spring.cloud.stream.kafka.streams.binder:
  brokers: https://[kafka-url]:[port]
  configuration:
    bootstrap.servers: SSL://[kafka-url]:[port]
    security.protocol: SSL
    ssl.truststore.location: client.truststore.jks
    ssl.truststore.password: secret
    ssl.keystore.type: PKCS12
    ssl.keystore.location: client.keystore.p12
    ssl.keystore.password: secret
    ssl.key.password: secret
    key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    schema.registry.url: https://[schema-registry-url]:[port]
    basic.auth.credentials.source: USER_INFO
    basic.auth.user.info: [username]:[pass]

Exception:

Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'avroSchemaMessageConverter' defined in class path resource [org/springframework/cloud/stream/schema/avro/AvroMessageConverterAutoConfiguration.class]: Invocation of init method failed; nested exception is java.lang.RuntimeException: Failed to register subject sensor, server replied with status 401
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:593) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:277) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.addCandidateEntry(DefaultListableBeanFactory.java:1463) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.findAutowireCandidates(DefaultListableBeanFactory.java:1427) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveMultipleBeans(DefaultListableBeanFactory.java:1318) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1205) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1167) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:857) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:760) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    ... 171 common frames omitted
Caused by: java.lang.RuntimeException: Failed to register subject sensor, server replied with status 401
    at org.springframework.cloud.stream.schema.client.ConfluentSchemaRegistryClient.register(ConfluentSchemaRegistryClient.java:99) ~[spring-cloud-stream-schema-2.2.0.BUILD-20190416.000801-103.jar:2.2.0.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.registerSchema(AvroSchemaRegistryClientMessageConverter.java:395) ~[spring-cloud-stream-schema-2.2.0.BUILD-20190416.000801-103.jar:2.2.0.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.lambda$afterPropertiesSet$3(AvroSchemaRegistryClientMessageConverter.java:253) ~[spring-cloud-stream-schema-2.2.0.BUILD-20190416.000801-103.jar:2.2.0.BUILD-SNAPSHOT]
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[na:1.8.0_191]
    at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) ~[na:1.8.0_191]
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) ~[na:1.8.0_191]
    at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:270) ~[na:1.8.0_191]
    at java.util.stream.ReferencePipeline$11$1.accept(ReferencePipeline.java:373) ~[na:1.8.0_191]
    at java.util.stream.DistinctOps$1$2.accept(DistinctOps.java:175) ~[na:1.8.0_191]
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) ~[na:1.8.0_191]
    at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948) ~[na:1.8.0_191]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[na:1.8.0_191]
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[na:1.8.0_191]
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[na:1.8.0_191]
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[na:1.8.0_191]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_191]
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[na:1.8.0_191]
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.afterPropertiesSet(AvroSchemaRegistryClientMessageConverter.java:245) ~[spring-cloud-stream-schema-2.2.0.BUILD-20190416.000801-103.jar:2.2.0.BUILD-SNAPSHOT]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1837) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1774) ~[spring-beans-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    ... 185 common frames omitted
Caused by: org.springframework.web.client.HttpClientErrorException$Unauthorized: 401 Unauthorized
    at org.springframework.web.client.HttpClientErrorException.create(HttpClientErrorException.java:81) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:122) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:102) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.client.ResponseErrorHandler.handleError(ResponseErrorHandler.java:63) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:778) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:736) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:670) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:579) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
    at org.springframework.cloud.stream.schema.client.ConfluentSchemaRegistryClient.register(ConfluentSchemaRegistryClient.java:93) ~[spring-cloud-stream-schema-2.2.0.BUILD-20190416.000801-103.jar:2.2.0.BUILD-SNAPSHOT]
    ... 204 common frames omitted

@mraliagha commented on Wed Apr 24 2019

I have gone through more depth to understand what is going on. Please correct me if I am wrong, but it looks like being able to create an SSL connection with providing username and password have not been implemented in Spring Cloud Stream yet.

I could not find anything related to creating an SSL context in the following classes: SchemaRegistryClientConfiguration ConfluentSchemaRegistryClient DefaultSchemaRegistryClient


@olegz commented on Mon Apr 29 2019

That is correct. We need to do some work there on SSL side.


@mraliagha commented on Wed May 01 2019

@olegz Any idea when that will be added?

I can see that one of the constructors accepts RestTemplate. I have tried to see if I can add the SSL context via the RestTemplate customization. It works fine at first, but when it reaches the part that wants to send the msg to Kafka it wants to validate the message with Schema Registry and for that, it uses Confluent Schema Registry client. Unfortunately, the confluent package does not have any details on the SSL setup for the schema registry client. Therefore, I have got stuck and cannot proceed further.


@olegz commented on Wed May 01 2019

@mraliagha Unfortunately we had to push it to 2.3 which will be some time in August. We basically didn't have a chance to look into this for the upcoming release. However, if you can help out and contribute something that works in a form of PR, we can help you polish it and merge. We are currently targeting 2.2 release Monday May 6.


@mraliagha commented on Wed May 01 2019

@olegz I would submit a PR if I could manage to pass the issue that I am facing. I will do my best ...


@olegz commented on Wed May 01 2019

@mraliagha I understand, but what I am also trying to say is do what you think is necessary to make it work. We'll help you to make it right. So if you feel like modifying any code, go for it we can polish it later.

Thank you for volunteering with this!


@mraliagha commented on Tue Aug 13 2019

@olegz We have actually found a workaround for this, but just wondering if this is added to the code base.


@sobychacko commented on Wed Aug 14 2019

@mraliagha This is not yet addressed. Do you mind sharing details about the workaround?


@mraliagha commented on Wed Aug 14 2019

@sobychacko

Of course not. Please find it as follows. I think it might be useful to add it as an example or to the official document for the time being.

ConfluentSchemaRegistryClient has a constructor that accepts a restTemplate object.

final KeyStore keyStore = KeyStore.getInstance("PKCS12");
        keyStore.load(new FileInputStream(
                new File(keystoreLocation)),
            keyStorePass.toCharArray());

        final KeyStore trustStore = KeyStore.getInstance("JKS");
        trustStore.load(new FileInputStream(
                new File(truststoreLocation)),
            trustStorePass.toCharArray());

        TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;

        SSLContext sslContext = SSLContextBuilder
            .create()
            .loadKeyMaterial(keyStore, keypass.toCharArray())
            .loadTrustMaterial(trustStore, acceptingTrustStrategy)
            .build();

        HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
        ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
            httpClient);
        schemaRegistryClient = new ConfluentSchemaRegistryClient(
            new RestTemplate(requestFactory));

To make it work there are some extra configurations that should be passed in conjunction with this code as it looks like it goes beyond the scope of Spring Cloud Stream classes and looks into Spring-Kafka to find the required configurations.

spring:
  kafka: 
    properties:
      basic.auth.credentials.source: USER_INFO
      basic.auth.user.info: ${Schema_Registry_Credentials}
      schema.registry.url: ${Schema_Registry_URL}
alinazemian commented 3 years ago

We have been working to upgrade our Spring Could Stream implementations to the latest version (3.1.1). I was wondering if there have been any updates regarding this and whether we still need to have the workaround in place or not?