Apicurio / apicurio-registry

An API/Schema registry - stores APIs and Schemas.
https://www.apicur.io/registry/
Apache License 2.0
609 stars 269 forks source link

Kafka storage health check not failing after runtime exception #5295

Open FediNOP opened 1 month ago

FediNOP commented 1 month ago

Description

Registry Version: 2.5.8.FInal, 2.6.3.Final Persistence type: kafkasql

Environment

Steps to Reproduce

You need somehow disconnect apicurio from kafka in runtime And i found this in KafkaSqlRegistryStorage: image

Disable client authentication in keycloak

  1. Set up access token lifespan for client about 1 minute
  2. Start apicruio and check that kafka consumer is connected and status is stable
  3. Disable client authentication in keycloak
  4. When access token expires, kafka client throws LoginException and disconnects

Expected vs Actual Behaviour

Expected

  1. Fails storage health check
  2. After connection or authentication restoring - reconnect to kafka

Actual Behaviour

  1. Storage health check is up
  2. Pod stays like a zombie and not trying to reconnect to kafka
carlesarnal commented 1 month ago

@FediNOP do you have any stacktrace? I would like to identify exactly where this issue arises for you. I have started working on this, but I would like to have as much info as possible.

FediNOP commented 1 month ago

Here are some logs after disabling authentication in keycloak

2024-10-02 12:09:35 INFO <> [io.strimzi.kafka.oauth.common.HttpUtil] (kafka-expiring-relogin-thread-8410d359-4213-435c-8ca6-642f9db356c8) Action failed on try no. 1: io.strimzi.kafka.oauth.common.HttpException: POST request to https://your-keycloak/auth/realms/your-realm/protocol/openid-connect/token failed with status 401: {"error":"unauthorized_client","error_description":"Invalid client or Invalid client credentials"}
    at io.strimzi.kafka.oauth.common.HttpUtil.handleResponse(HttpUtil.java:535)
    at io.strimzi.kafka.oauth.common.HttpUtil.request(HttpUtil.java:492)
    at io.strimzi.kafka.oauth.common.HttpUtil.post(HttpUtil.java:272)
    at io.strimzi.kafka.oauth.common.OAuthAuthenticator.lambda$post$0(OAuthAuthenticator.java:543)
    at io.strimzi.kafka.oauth.common.HttpUtil.doWithRetries(HttpUtil.java:89)
    at io.strimzi.kafka.oauth.common.OAuthAuthenticator.post(OAuthAuthenticator.java:542)
    at io.strimzi.kafka.oauth.common.OAuthAuthenticator.loginWithClientSecret(OAuthAuthenticator.java:167)
    at io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler.handleCallback(JaasClientOauthLoginCallbackHandler.java:400)
    at io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler.handle(JaasClientOauthLoginCallbackHandler.java:369)
    at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
    at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
    at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
    at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:679)
    at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:677)
    at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
    at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:677)
    at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.reLogin(ExpiringCredentialRefreshingLogin.java:390)
    at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.access$600(ExpiringCredentialRefreshingLogin.java:40)
    at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin$Refresher.run(ExpiringCredentialRefreshingLogin.java:106)
    at java.base/java.lang.Thread.run(Thread.java:840)

2024-10-02 12:09:35 DEBUG <> [io.strimzi.kafka.oauth.common.HttpUtil] (kafka-expiring-relogin-thread-8410d359-4213-435c-8ca6-642f9db356c8) Action failed after 1 tries
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule] (kafka-expiring-relogin-thread-8410d359-4213-435c-8ca6-642f9db356c8) Nothing here to abort
2024-10-02 12:09:35 WARN <> [org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin$Refresher] (kafka-expiring-relogin-thread-8410d359-4213-435c-8ca6-642f9db356c8) [Principal=:8410d359-4213-435c-8ca6-642f9db356c8]: LoginException during login retry; will sleep 10 seconds before trying again.: javax.security.auth.login.LoginException: java.lang.IllegalStateException: Unexpected exception while sending HTTP POST request
    at io.strimzi.kafka.oauth.common.OAuthAuthenticator.post(OAuthAuthenticator.java:563)
    at io.strimzi.kafka.oauth.common.OAuthAuthenticator.loginWithClientSecret(OAuthAuthenticator.java:167)
    at io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler.handleCallback(JaasClientOauthLoginCallbackHandler.java:400)
    at io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler.handle(JaasClientOauthLoginCallbackHandler.java:369)
    at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316)
    at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301)
    at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
    at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:679)
    at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:677)
    at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
    at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:677)
    at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.reLogin(ExpiringCredentialRefreshingLogin.java:390)
    at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.access$600(ExpiringCredentialRefreshingLogin.java:40)
    at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin$Refresher.run(ExpiringCredentialRefreshingLogin.java:106)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.strimzi.kafka.oauth.common.HttpException: POST request to https://your-keycloak/auth/realms/your-realm/protocol/openid-connect/token failed with status 401: {"error":"unauthorized_client","error_description":"Invalid client or Invalid client credentials"}
    at io.strimzi.kafka.oauth.common.HttpUtil.handleResponse(HttpUtil.java:535)
    at io.strimzi.kafka.oauth.common.HttpUtil.request(HttpUtil.java:492)
    at io.strimzi.kafka.oauth.common.HttpUtil.post(HttpUtil.java:272)
    at io.strimzi.kafka.oauth.common.OAuthAuthenticator.lambda$post$0(OAuthAuthenticator.java:543)
    at io.strimzi.kafka.oauth.common.HttpUtil.doWithRetries(HttpUtil.java:89)
    at io.strimzi.kafka.oauth.common.OAuthAuthenticator.post(OAuthAuthenticator.java:542)
    ... 15 more

    at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:850)
    at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:679)
    at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:677)
    at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)
    at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:677)
    at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.reLogin(ExpiringCredentialRefreshingLogin.java:390)
    at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.access$600(ExpiringCredentialRefreshingLogin.java:40)
    at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin$Refresher.run(ExpiringCredentialRefreshingLogin.java:106)
    at java.base/java.lang.Thread.run(Thread.java:840)

2024-10-02 12:09:35 DEBUG <> [java.lang.System$Logger] (sentry-pool-1-thread-1) sun.net.www.MessageHeader@18a20e409 pairs: {POST /api/588/store/ HTTP/1.1: null}{User-Agent: sentry-java/1.7.30-7a445}{X-Sentry-Auth: Sentry sentry_version=6,sentry_client=sentry-java/1.7.30-7a445,sentry_key=4d29f4b38c494176a5584096e75e6183}{Content-Type: application/json}{Content-Encoding: gzip}{Host: sentry-host.com}{Accept: */*}{Connection: keep-alive}{Content-Length: 1309}
2024-10-02 12:09:35 DEBUG <> [java.lang.System$Logger] (sentry-pool-1-thread-1) sun.net.www.MessageHeader@6483b6037 pairs: {null: HTTP/1.1 200 OK}{Date: Wed, 02 Oct 2024 12:09:35 GMT}{Content-Type: application/json}{Content-Length: 41}{Connection: keep-alive}{access-control-expose-headers: x-sentry-error, x-sentry-rate-limits, retry-after}{vary: Origin}
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Set SASL client state to FAILED
2024-10-02 12:09:35 INFO <> [org.apache.kafka.common.network.Selector] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Failed authentication with broker-1-kafka-your-host.com.io/your-ip (channelId=1) (Token expired at: 1727867581000 (2024-10-02T11:13:01 UTC) (ErrId: 4b4d0f28))
2024-10-02 12:09:35 INFO <> [org.apache.kafka.clients.NetworkClient] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Node 1 disconnected.
2024-10-02 12:09:35 ERROR <> [org.apache.kafka.clients.NetworkClient] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Connection to node 1 (broker-1-kafka-your-host.com/ip_address) failed authentication due to: Token expired at: 1727867581000 (2024-10-02T11:13:01 UTC) (ErrId: 4b4d0f28)
2024-10-02 12:09:35 DEBUG <> [java.lang.System$Logger] (sentry-pool-1-thread-1) sun.net.www.MessageHeader@44ea4ee57 pairs: {null: HTTP/1.1 200 OK}{Date: Wed, 02 Oct 2024 12:09:35 GMT}{Content-Type: application/json}{Content-Length: 41}{Connection: keep-alive}{access-control-expose-headers: x-sentry-error, x-sentry-rate-limits, retry-after}{vary: Origin}
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Initialize connection to node broker-2-kafka-your-host.com.io:9095 (id: 2 rack: null) for sending metadata request
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.clients.ClientUtils] (kafka-producer-network-thread | dh-apicurio-registry-producer) Resolved host broker-2-kafka-your-host.com.io as your_ip
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.clients.NetworkClient] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Initiating connection to node broker-2-kafka-your-host.com.io:9095 (id: 2 rack: null) using address broker-2-kafka-your-host.com.io/your_ip
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Set SASL client state to SEND_APIVERSIONS_REQUEST
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Creating SaslClient: client=null;service=kafka;serviceHostname=broker-2-kafka-your-host.com.io;mechs=[OAUTHBEARER]
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient] (kafka-producer-network-thread | dh-apicurio-registry-producer) Setting SASL/OAUTHBEARER client state to SEND_CLIENT_FIRST_MESSAGE
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.common.network.Selector] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.clients.NetworkClient] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Completed connection to node 2. Fetching API versions.
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Set SASL client state to SEND_HANDSHAKE_REQUEST
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Set SASL client state to INITIAL
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient] (kafka-producer-network-thread | dh-apicurio-registry-producer) Setting SASL/OAUTHBEARER client state to RECEIVE_SERVER_FIRST_MESSAGE
2024-10-02 12:09:35 DEBUG <> [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Set SASL client state to INTERMEDIATE
2024-10-02 12:09:36 DEBUG <> [org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Set SASL client state to FAILED
2024-10-02 12:09:36 INFO <> [org.apache.kafka.common.network.Selector] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Failed authentication with broker-2-kafka-your-host.com.io/your-ip (channelId=2) (Token expired at: 1727867581000 (2024-10-02T11:13:01 UTC) (ErrId: 3d97dddf))
2024-10-02 12:09:36 INFO <> [org.apache.kafka.clients.NetworkClient] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Node 2 disconnected.
2024-10-02 12:09:36 ERROR <> [org.apache.kafka.clients.NetworkClient] (kafka-producer-network-thread | dh-apicurio-registry-producer) [Producer clientId=dh-apicurio-registry-producer] Connection to node 2 (broker-2-kafka-your-host.com/your-ip) failed authentication due to: Token expired at: 1727867581000 (2024-10-02T11:13:01 UTC) (ErrId: 3d97dddf)