strimzi / strimzi-kafka-oauth

OAuth2 support for Apache Kafka® to work with many OAuth2 authorization servers
Apache License 2.0
144 stars 89 forks source link

Error validating authz permissions #58

Open uwburn opened 4 years ago

uwburn commented 4 years ago

Hello, i'm trying to setup a Kafka based system, and i'm trying to use Strimzi oauth to deal with authentication and authorization.

The system is going to include both Java based services and Node.JS services, while the auth server is Keycloak (access token lifespan 1h). Kafka is executed using Confluent Platform images in docker (tested both 5.4.0 and 5.5.0), which i customized to include Strimzi jars (0.5.0).

I have successfully setted everything up, as i fire up the services they do authenticate successfully and the do get the correct authorization to access topics and groups, data is correctly exchanged with Kafka.

Unfortunately, after sometime (a few hours, more than the token lifespan), i start get warnings on Kafka, being unable to validate the authorization anymore:

Kafka logs

[2020-05-29 19:30:45,976] WARN Unexpected status while fetching authorization data - will retry next time: POST request to https://<omitted> failed with status 401: {"error":"invalid_grant","error_description":"Invalid bearer token"} (io.strimzi.kafka.oauth.server.authorizer.KeycloakRBACAuthorizer)
[2020-05-29 19:30:45,976] DEBUG Authorization DENIED - user: User:service-account-kafka-rest, cluster: kafka-cluster, operation: Describe, resource: Topic:<omitted>,
 permissions: null (io.strimzi.kafka.oauth.server.authorizer.KeycloakRBACAuthorizer.deny)

The services stop exchanging data, just to start again some hours later. Do you have any idea on what is causing this? I looked after #41 but i'm sure Keycloak hasn't restarted. I'm pretty sure it has to do with an expired token, but i'm not really sure how it's happening and how to stop it.

I also tried to enable connections.max.reauth.ms, that triggers Kakfa to request a reauthentication on the connection, but when the reauthentication is triggered i'm getting the another error in the client logs:

Java client (schema registry)

[2020-05-29 09:34:39,002] INFO Initiating re-login for service-account-schema-registry, logout() still needs to be called on a previous login = true (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)
[2020-05-29 09:34:39,101] INFO [Principal=service-account-schema-registry]: Expiring credential valid from Fri May 29 09:34:39 UTC 2020 to Fri May 29 09:49:39 UTC 2020 (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)
[2020-05-29 09:34:39,102] INFO [Principal=service-account-schema-registry]: Proposed refresh time of Fri May 29 09:47:15 UTC 2020 extends into the desired buffer time of 300 seconds before expiration, so refresh it at the desired buffer begin point, at Fri May 29 09:44:39 UTC 2020 (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)
[2020-05-29 09:34:39,102] INFO [Principal=:service-account-schema-registry]: Expiring credential re-login sleeping until: Fri May 29 09:44:39 UTC 2020 (org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin)

[2020-05-29 09:37:35,276] INFO [Consumer clientId=KafkaStore-reader-_schemas, groupId=schema-registry-schema-registry-8081] Failed re-authentication with kafka-1/10.0.2.46 (Cannot change principals during re-authentication from User.service-account-schema-registry: User.service-account-schema-registry) (org.apache.kafka.common.network.Selector)
[2020-05-29 09:37:35,281] ERROR [Consumer clientId=KafkaStore-reader-_schemas, groupId=schema-registry-schema-registry-8081] Connection to node 1 (kafka-1/10.0.2.46:9092) failed authentication due to: Cannot change principals during re-authentication from User.service-account-schema-registry: User.service-account-schema-registry (org.apache.kafka.clients.NetworkClient)
[2020-05-29 09:37:35,287] INFO [Consumer clientId=KafkaStore-reader-_schemas, groupId=schema-registry-schema-registry-8081] Error sending fetch request (sessionId=2105046314, epoch=1539) to node 1: {}. (org.apache.kafka.clients.FetchSessionHandler)
org.apache.kafka.common.errors.SaslAuthenticationException: Cannot change principals during re-authentication from User.service-account-schema-registry: User.service-account-schema-registry

NodeJS client (own service)

2020-05-29T10:11:49.412Z [KAFKAJS]    ERROR: KafkaJSSASLAuthenticationError: SASL OAUTHBEARER authentication failed: Cannot change principals during re-authentication from User.<omitted> User.<omitted>

Is the option connections.max.reauth.ms supported at the current stage?

scholzj commented 4 years ago

Any ideas @mstruk?

mstruk commented 4 years ago

It looks like an expired token yes. Once the access token expires it wont be accepted by Keycloak when trying to fetch the list of grants for authorization. What seems to happen here is that the new session is auccessfully authenticated - Kafka broker accepts the token as valid using local JWT signature check, but due to a slight time mismatch or some other slight expiration rule difference the same token is then no longer accepted by Keycloak a few moments later when the first operation is executed in the new session.

What does your client configuration and broker configuration look like for java client and for JS client (what library do you use for JS OAuth support)?

Do you specify the access token directly (oauth.access.token) or do you let strimzi-kafka-oauth fetch it for you from Keycloak by using clientId and secret (oauth.client.id, oauth.client.secret)?

The connections.max.reauth.ms option should work as documented in Kafka documentation, but I haven't tried it. It sounds like a server-side option, and should be automatically handled by SASL OAUTHBEARER client code.

The thing is - if you directly set oauth.access.token then re-authentication can not succeed because it can only ever return the same access token that may not be valid any more.

At the moment the only suggestion would be to prolong the access token lifetime in Keycloak realm configuration, and periodically close the connections - reinitialising the producers / consumers on the client - before the access token expires.

There are some improvements in the works with regards to the access token expiration, how it is better handled on the server, communicated to the clients and how the clients can reliably detect it and recover from it.

uwburn commented 4 years ago

Thanks for the prompt reply.

At the moment i worked around it by lowering the token lifespan to 45 minutes, it doesn't happen anymore, so it's definitely an expired token. I need to check the system times on the two hosts (Keycloak and Kafka), i would be surprised to see them skewed by more than 10 minutes as they are pretty common Ubuntu boxes in the same datacenter.

Here's my config for Kafka (docker-compose/swarm file):

version: '3.5'

services:
  zookeeper-1:
    image: <omitted>/cp-oauth/zookeeper:${CP_VERSION}
    deploy:
      restart_policy:
        condition: on-failure
        delay: 30s
        max_attempts: 10
      placement:
        constraints:
          - node.labels.kafka-1 == true
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    volumes:
      - zookeeper-1_data:/var/lib/zookeeper
    networks:
      kafka_wide:
  kafka-1:
    image: <omitted>/cp-oauth/kafka:${CP_VERSION}
    deploy:
      restart_policy:
        condition: on-failure
        delay: 30s
        max_attempts: 10
      placement:
        constraints:
          - node.labels.kafka-1 == true
    depends_on:
      - zookeeper-1
    ports:
      - "9010:9010"
      - "9020:9020"
    environment:
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO,kafka.authorizer=DEBUG,io.strimzi=INFO,io.strimzi.kafka.oauth.server.authorizer.KeycloakRBACAuthorizer.deny=DEBUG"
      KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper-1:2181"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_LISTENERS: DOCKERSASL://:9092,EXTA://:9010,EXTB://:9020
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKERSASL:SASL_PLAINTEXT,EXTA:SASL_PLAINTEXT,EXTB:SASL_PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: DOCKERSASL://kafka-1:9092,EXTA://<omitted>:9010,EXTB://<omitted>:9020
      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKERSASL
      KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: OAUTHBEARER
      KAFKA_LISTENER_NAME_DOCKERSASL_OAUTHBEARER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
      KAFKA_LISTENER_NAME_DOCKERSASL_OAUTHBEARER_SASL_LOGIN_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
      KAFKA_LISTENER_NAME_DOCKERSASL_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
      KAFKA_LISTENER_NAME_EXTA_OAUTHBEARER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
      KAFKA_LISTENER_NAME_EXTA_OAUTHBEARER_SASL_LOGIN_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
      KAFKA_LISTENER_NAME_EXTA_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
      KAFKA_LISTENER_NAME_EXTB_OAUTHBEARER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
      KAFKA_LISTENER_NAME_EXTB_OAUTHBEARER_SASL_LOGIN_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
      KAFKA_LISTENER_NAME_EXTB_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler
      #KAFKA_CONNECTIONS_MAX_REAUTH_MS: 3540000
      OAUTH_CLIENT_ID: "kafka"
      OAUTH_CLIENT_SECRET: "<omitted>"
      OAUTH_TOKEN_ENDPOINT_URI: "https://<omitted>/protocol/openid-connect/token"
      OAUTH_VALID_ISSUER_URI: "https://<omitted>"
      OAUTH_JWKS_ENDPOINT_URI: "https://<omitted>/protocol/openid-connect/certs"
      OAUTH_USERNAME_CLAIM: preferred_username
      KAFKA_AUTHORIZER_CLASS_NAME: io.strimzi.kafka.oauth.server.authorizer.KeycloakRBACAuthorizer
      KAFKA_PRINCIPAL_BUILDER_CLASS: io.strimzi.kafka.oauth.server.authorizer.JwtKafkaPrincipalBuilder
      KAFKA_STRIMZI_AUTHORIZATION_KAFKA_CLUSTER_NAME: kafka-cluster
      KAFKA_STRIMZI_AUTHORIZATION_DELEGATE_TO_KAFKA_ACL: "false"
      KAFKA_SUPER_USERS: User:service-account-kafka
    volumes:
      - kafka-1_data:/var/lib/kafka
    networks:
      kafka_wide:
    secrets:
      - source: kafka-1_cli
        target: /etc/kafka/cli.properties

volumes:
  kafka-1_data:
  zookeeper-1_data:

networks:
  kafka_wide:
    external: true
    name: kafka_wide

secrets:
  kafka-1_cli:
    file: kafka-1/kafka-cli.properties

and Kafka REST/schema-registry (docker-compose/swarm file):

version: '3.5'

services:
  schema-registry:
    image: <omitted>/cp-oauth/schema-registry:${CP_VERSION}
    deploy:
      restart_policy:
        condition: on-failure
        delay: 30s
        max_attempts: 10
      placement:
        constraints:
          - node.labels.kafka-services == true
    environment:
      SCHEMA_REGISTRY_HOST_NAME: "schema-registry"
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: SASL_PLAINTEXT://kafka-1:9092
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SASL_PLAINTEXT
      SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: OAUTHBEARER
      SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
      SCHEMA_REGISTRY_KAFKASTORE_SASL_LOGIN_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
      OAUTH_CLIENT_ID: "schema-registry"
      OAUTH_CLIENT_SECRET: "<omitted>"
      OAUTH_TOKEN_ENDPOINT_URI: "https://<omitted>/protocol/openid-connect/token"
      OAUTH_USERNAME_CLAIM: preferred_username
    networks:
      kafka_wide:
  rest-proxy:
    image: <omitted>/cp-oauth/kafka-rest:${CP_VERSION}
    deploy:
      restart_policy:
        condition: on-failure
        delay: 30s
        max_attempts: 10
      placement:
        constraints:
          - node.labels.kafka-services == true
    depends_on:
      - schema-registry
    environment:
      KAFKA_REST_HOST_NAME: "rest-proxy"
      KAFKA_REST_BOOTSTRAP_SERVERS: "kafka-1:9092"
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KAFKA_REST_CLIENT_SECURITY_PROTOCOL: SASL_PLAINTEXT
      KAFKA_REST_CLIENT_SASL_MECHANISM: OAUTHBEARER
      KAFKA_REST_CLIENT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
      KAFKA_REST_CLIENT_SASL_LOGIN_CALLBACK_HANDLER_CLASS: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
      OAUTH_CLIENT_ID: "kafka-rest"
      OAUTH_CLIENT_SECRET: "<omitted>"
      OAUTH_TOKEN_ENDPOINT_URI: "https://<omitted>/protocol/openid-connect/token"
      OAUTH_USERNAME_CLAIM: preferred_username
    networks:
      kafka_wide:
  topics-ui:
    image: landoop/kafka-topics-ui
    deploy:
      restart_policy:
        condition: on-failure
        delay: 30s
        max_attempts: 10
      placement:
        constraints:
          - node.labels.kafka-services == true
    depends_on:
      - rest-proxy
    environment:
      KAFKA_REST_PROXY_URL: "http://rest-proxy:8082"
      PROXY: "true"
    networks:
      kafka_wide:
  schema-registry-ui:
    image: landoop/schema-registry-ui
    deploy:
      restart_policy:
        condition: on-failure
        delay: 30s
        max_attempts: 10
      placement:
        constraints:
          - node.labels.kafka-services == true
    depends_on:
      - rest-proxy
      - schema-registry
    environment:
      SCHEMAREGISTRY_URL: "http://schema-registry:8081"
      PROXY: "true"
    networks:
      kafka_wide:

networks:
  kafka_wide:
    external: true
    name: kafka_wide

As you can see it's using clientid/secret to refresh the token.

For Node.JS i'm using Kafka.JS, OAUTHBEARER support is still not in stable, as i'm contributing to add it.

I will try to put in place a version with some additional logs when the permission fetch fails, just to take a look at the token.

As for connections.max.reauth.ms, i suspect it might be an incorrect implementation of .equals in JwtKafkaPrincipal, as in the end it goes for a strict equality over the JWT, while probably that should be allowed to change, just checking over the principal type and principal name. I will try to patch also that and see how it goes.

uwburn commented 4 years ago

I think i sorted out the first part of the problem... i managed to extract the failing token from the logs, while seemingly legit, trying to introspect it resulted in a failing token. I think that happens because i mistakenly forgot in Keycloak the SSO session idle to 30 minutes, while the access token lifespan to 60 minutes, so if the token was left unused for more than 30 minutes, even if not yet expired was rejected by Keycloak.

I will check that no more errors arise and then i will try to look at the connections.max.reauth.ms.

uwburn commented 4 years ago

By removing the equals/hashcode in JwtKafkaPrincipal the errors with connections.max.reauth.ms go away.

I submitted a PR.

Sorry for bothering for what in the end was my fault on the config, at least i hope my contrib will help someone else.

MrFishFinger commented 4 years ago

hi there,

we are also experiencing a similar issue, whereby the token that is passed from a client to the broker, fails the "uma-ticket" introspection by keycloak, because it has expired. This is despite the token issued to the client from keycloak, being valid at the time that the client sends it to the broker.

this results in "TOPIC_AUTHORIZATION_FAILED" in the client, and "401: {"error":"invalid_grant","error_description":"Invalid bearer token"}" errors in the brokers.

checking the payload of those requests that return 401 responses (from broker->keycloak), shows the following:

audience=kafka-client-keycloak&grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Auma-ticket&response_mode=permissions`

decoding the JWT in the authorization header for those requests, shows that it contains the token from the client, and that this token has already expired.

we tried increasing these realm-level token settings in keycloak:

SSO Session Idle:      60mins
Access Token Lifespan: 45mins 

which improved things somewhat, but we still see occasional TOPIC_AUTHORIZATION_FAILED errors...

any other ideas of what we can try?

mstruk commented 4 years ago

@MrFishFinger We're working on some changes to better handle token timeouts. Some of the fixes are already in the master. For now I'd advise to use the latest master with JwtKafkaPrincipal equals() / hashCode() fix, and add connections.max.reauth.ms=3600000 to your Kafka broker's server.properties. This should fix re-authentication which should prevent old access token to be used for fetching grants which is what causes 401 that you see.

MrFishFinger commented 4 years ago

hi @mstruk , thanks for that info.

We tried building the library from master, and deploying it with the client. However wouldn't we also need to deploy the updated library with the kafka brokers when deployed on openshift? And to do that, we would either need to create a custom strimzi kafka broker image - or wait until these fixes get included in the next strimzi release. Is that right, or am i misunderstanding?

thanks!

mstruk commented 4 years ago

@MrFishFinger You're correct about that. You can build a custom Kafka image based on the one released with Strimzi. There are instructions and an example how to do that. See the documentation.

You can then configure in 050-Deployment-strimzi-cluster-operator.yaml that your Kafka image be used. You'll also need some extra magic to set connections.max.reauth.ms in /tmp/strimzi.properties.

MrFishFinger commented 4 years ago

hi @mstruk , just to close the loop: we placed the custom-built-libraries into to a custom kafka image, and deployed this to openshift. Since doing this, we have not seen the TOPIC_AUTHORIZATION_FAILED errors.

thanks for your help, and we look forward to these fixes making it into a release! :)