strimzi / strimzi-kafka-oauth

OAuth2 support for Apache Kafka® to work with many OAuth2 authorization servers
Apache License 2.0
140 stars 89 forks source link

Kafka brokers don't seem to refresh token after receiving a corrupt token? #120

Closed jrivers96 closed 2 years ago

jrivers96 commented 3 years ago

Summary
We see io.strimzi.kafka.oauth.validator.TokenValidationException: Token validation failed: Unknown signing key in all kafka clients. A rolling update of the cluster's kafka brokers allows kafka clients to connect after restart.

We have seen problems with keycloak and retrieving tokens reliably. My guess is that the token got corrupted during transfer issue 109

The expected behavior is that the kafka brokers will instantly try to retrieve another valid token per the strimzi oauth adapter documentation. ( and looking in the code )

Versions: Strimzi 0.20.1 Kafka 2.6.0

Configuration

        authentication:
          type: oauth
          clientId: kafka-broker
          clientSecret:
            key: secret
            secretName: broker-oauth-secret
          disableTlsHostnameVerification: false
          jwksEndpointUri: REDACT
          validIssuerUri: REDACT
          userNameClaim: preferred_username
          tlsTrustedCertificates:
          - secretName: ca-truststore
            certificate: ca.crt
          jwksExpirySeconds: 960
          jwksRefreshSeconds: 300

Logs:

I'm still collecting the kafka broker logs, but the below log snippet is from any client.

2021-06-30 11:37:07.560 ERROR 7 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [fooa95b20f3-213d-4a76-a02f-7c92dec5d250-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down:
org.apache.kafka.streams.errors.StreamsException: Could not create topic foosymbolStateStore-changelog.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:210) ~[kafka-streams-2.6.0.jar!/:na]
at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:227) ~[kafka-streams-2.6.0.jar!/:na]
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:103) ~[kafka-streams-2.6.0.jar!/:na]

at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareChangelogTopics(StreamsPartitionAssignor.java:686) ~[kafka-streams-2.6.0.jar!/:na]
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:712) ~[kafka-streams-2.6.0.jar!/:na]
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:377) ~[kafka-streams-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:590) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689) ~[kafka-clients-2.6.0.jar!/:na]

at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1301) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) ~[kafka-clients-2.6.0.jar!/:na]
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766) ~[kafka-streams-2.6.0.jar!/:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624) ~[kafka-streams-2.6.0.jar!/:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) ~[kafka-streams-2.6.0.jar!/:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) ~[kafka-streams-2.6.0.jar!/:na]
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed due to an invalid token: io.strimzi.kafka.oauth.validator.TokenValidationException: Token validation failed: Unknown signing key (kid:REDACT)
mstruk commented 3 years ago

@jrivers96 Can you find some of the stacktraces for when the jwks keys refresh was attempted but failed?

Looking at your config I would expect jwks keys refresh to be attempted every 5 minutes, and fail 3 consecutive times after which the currently cached keys are no longer considered valid.

The stacktraces should give some indication of why the refresh is failing.

mstruk commented 3 years ago

It's also possible that your authorization server (e.g. Keycloak) is restarted, and that it's not properly configured to preserve the signing keys between restarts.

jrivers96 commented 3 years ago

That sounds probable. Let me look into that.

The broker isn't suppose to request a new token under that condition?

mstruk commented 3 years ago

The broker will keep requesting periodically to get the latest signing keys. Since Strimzi version 0.23.0 it will actually repeat the fetching of latest signing keys if it encounters an unknown kid. The problem is that the client apps cache the fetched access token and reuse it, so you have to initialise a new KafkaProducer / KafkaConsumer object in order to force the client to obtain a fresh access token from your authorization server. Therefore the jwks keyset cached by Kafka Broker can get out of sync with authorization server for a configured period until it refreshes and obtains the latest keys. At the same time client can get out of sync with the authorization server and hold onto old, now invalid, access token. But then, the Kafka Broker can become synced with authorization server while clients still are not, or new clients can be in-sync with authorization server while the Kafka Broker is not. All these out of sync situations will result in authentication issues. That's why properly configured authorization server that doesn't forget the signing keys is important, and also the clients should request new access token if they encounter the TokenValidationException.

jrivers96 commented 3 years ago

The invalid signing key that the broker was reporting "unknown" is the same one that keycloak is currently using successfully.

After the broker restart, the same signing key worked. Keycloak also did not have any sort of restarting event during that time of the problem and the key did not change.

2021-06-30 12:59:59,874 INFO [SocketServer brokerId=11] Failed authentication with 100-80-53-157.kubelet.kube-system.svc.cluster.local/100.80.53.157 (Authentication failed due to an invalid token: io.strimzi.kafka.oauth.validator.TokenValidationException: Token validation failed: Unknown signing key (kid:REDACT)) (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-11-ListenerName(EXTERNAL1-9094)-SASL_SSL-9]
WARN The cached public key with id 'REDACT' is expired! (io.strimzi.kafka.oauth.validator.JWTSignatureValidator) [data-plane-kafka-network-thread-4-ListenerName(EXTERNAL1-9094)-SASL_SSL-9]
mstruk commented 3 years ago

Sounds like the problem wasn't the keys on Keycloak then, but inability to refresh them in several attempts until jwksExpirySeconds was reached. After that the keys were considered invalid and were removed from local cache.

mstruk commented 3 years ago

Try updating your Strimzi to version 0.24.0. There were some improvements to keys refresh since your version (0.20.1) which may help you diagnose the problem better. Also for your particular case the keys refresh will be attempted much more often once 'unknown kid' starts occurring.

jrivers96 commented 3 years ago

I reviewed the commits you are referring to. Thanks for the advice.

jrivers96 commented 2 years ago

We are still seeing this with the upgrade. I've improved my tools to take a stack trace next time that I see this occur.

I'm wondering if you think readTimeoutSeconds would help this problem? Would a hung read cause the refresh thread to stop working?

mstruk commented 2 years ago

Can you be more specific in terms of what you see vs. what you expect? When an unknown kid is encountered the server will still reject the token. However it will do an out-of-band jwks request to get the new public keys - in a matter of seconds (depending on the config), rather than waiting for the next regular refresh job.

jrivers96 commented 2 years ago

We upgraded to strimzi 0.25 per your recommendations above.

Every few months or so, we see a broker unable to refresh it's keys and we have to "roll the cluster". It appears that the "refresh" thread has hung as we don't see the log output for the refresh process anymore.

The expected behavior would be the broker successfully refreshing the public key as necessary.

I took a look at the following issue and it looks related - https://github.com/strimzi/strimzi-kafka-oauth/commit/9521d2ad86dd980978820180979c2c21e5bff5a0

We are thinking about upgrading to strimzi 0.28.

mstruk commented 2 years ago

Next time try to perform a thread dump on the offending broker.

kubectl exec -ti my-cluster-kafka-0 -- /bin/sh

Then identify the java process by inspecting /proc (since there is no ps in the container):

find /proc/ -maxdepth 1 -name '[0-9]*' -print0 | while read -r -d $'\0' pidpath; do echo ${pidpath} $(cat ${pidpath}/cmdline | tr '\000' ' ' | cut -d' ' -f1) ; done

It's usually a process id around 80.

Then trigger the thread dump (assuming the PID is 80):

kill -QUIT 80

This will print the full thread dump to your log which you can see using the regular kubectl logs my-project-kafka-0.

And certainly upgrade to Strimzi 0.28 where connect and read timeouts are enabled by default and configurable.

jrivers96 commented 2 years ago

The problem occured during an upgrade to our OAuth. It looks like strimzi 0.28 upgrade will help the problem. I'll close this ticket for now.

"pool-7-thread-1" #44 daemon prio=5 os_prio=0 cpu=57969.05ms elapsed=3875305.60s tid=0x00007fec8e00c800 nid=0x286 runnable  [0x00007feb95cf6000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(java.base@11.0.12/Native Method)
    at java.net.SocketInputStream.socketRead(java.base@11.0.12/SocketInputStream.java:115)
    at java.net.SocketInputStream.read(java.base@11.0.12/SocketInputStream.java:168)
    at java.net.SocketInputStream.read(java.base@11.0.12/SocketInputStream.java:140)
    at sun.security.ssl.SSLSocketInputRecord.read(java.base@11.0.12/SSLSocketInputRecord.java:478)
    at sun.security.ssl.SSLSocketInputRecord.readHeader(java.base@11.0.12/SSLSocketInputRecord.java:472)
    at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(java.base@11.0.12/SSLSocketInputRecord.java:70)
    at sun.security.ssl.SSLSocketImpl.readApplicationRecord(java.base@11.0.12/SSLSocketImpl.java:1374)
    at sun.security.ssl.SSLSocketImpl$AppInputStream.read(java.base@11.0.12/SSLSocketImpl.java:985)
    at java.io.BufferedInputStream.fill(java.base@11.0.12/BufferedInputStream.java:252)
    at java.io.BufferedInputStream.read1(java.base@11.0.12/BufferedInputStream.java:292)
    at java.io.BufferedInputStream.read(java.base@11.0.12/BufferedInputStream.java:351)
    - locked <0x00000006e4e2b760> (a java.io.BufferedInputStream)
    at sun.net.www.http.HttpClient.parseHTTPHeader(java.base@11.0.12/HttpClient.java:754)
    at sun.net.www.http.HttpClient.parseHTTP(java.base@11.0.12/HttpClient.java:689)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(java.base@11.0.12/HttpURLConnection.java:1615)
    - locked <0x00000006e4e2b7b8> (a sun.net.www.protocol.https.DelegateHttpsURLConnection)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(java.base@11.0.12/HttpURLConnection.java:1520)
    - locked <0x00000006e4e2b7b8> (a sun.net.www.protocol.https.DelegateHttpsURLConnection)
    at java.net.HttpURLConnection.getResponseCode(java.base@11.0.12/HttpURLConnection.java:527)
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(java.base@11.0.12/HttpsURLConnectionImpl.java:334)
    at io.strimzi.kafka.oauth.common.HttpUtil.handleResponse(HttpUtil.java:149)
    at io.strimzi.kafka.oauth.common.HttpUtil.request(HttpUtil.java:143)
    at io.strimzi.kafka.oauth.common.HttpUtil.request(HttpUtil.java:86)
    at io.strimzi.kafka.oauth.common.HttpUtil.get(HttpUtil.java:50)
    at io.strimzi.kafka.oauth.validator.JWTSignatureValidator.fetchKeys(JWTSignatureValidator.java:232)
    at io.strimzi.kafka.oauth.validator.JWTSignatureValidator.lambda$new$0(JWTSignatureValidator.java:151)
    at io.strimzi.kafka.oauth.validator.JWTSignatureValidator$$Lambda$663/0x000000084058f840.run(Unknown Source)
    at io.strimzi.kafka.oauth.validator.BackOffTaskScheduler$RunnableTask.run(BackOffTaskScheduler.java:142)
    at java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.12/Executors.java:515)
    at java.util.concurrent.FutureTask.run(java.base@11.0.12/FutureTask.java:264)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(java.base@11.0.12/ScheduledThreadPoolExecutor.java:304)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.12/ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.12/ThreadPoolExecutor.java:628)
    at java.lang.Thread.run(java.base@11.0.12/Thread.java:829)