apollographql / apollo-kotlin

:rocket:  A strongly-typed, caching GraphQL client for the JVM, Android, and Kotlin multiplatform.
https://www.apollographql.com/docs/kotlin
MIT License
3.77k stars 656 forks source link

Try the experimental WebSocketNetworkTransport #5862

Open martinbonnin opened 7 months ago

martinbonnin commented 7 months ago

Description

Historically, WebSockets have been one of the most complex and error-prone parts of Apollo Kotlin.

Starting with 4.0.0-beta.6, apollo-runtime has an experimental ApolloWebSocketNetworkTransport aiming at simplifying the WebSocket setup.

If you have had issues with WebSockets, try it out. More documentation and migration guide is available at https://www.apollographql.com/docs/kotlin/advanced/experimental-websockets

dmitrytavpeko commented 4 months ago

Great job, thanks! I don't observe kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed errors anymore that prevented from reinstating my subscriptions! While migrating to the new implementation, I encountered these issues:

  1. API doesn't allow to set an OkHttpClient. In the old implementation we have .webSocketEngine(DefaultWebSocketEngine(mySharedOkHttpClient))
  2. The documentation says that it's possible to customize the retry logic by using addRetryOnErrorInterceptor. However, such an API doesn't exist. I used to customize the backoff in the following way:
                        .reopenWhen { throwable, attempt ->
                            val delayBeforeNextAttemptMs =
                                2.0.pow(attempt.toInt()).coerceAtMost(30.0).toLong() * 1_000L
                            delay(delayBeforeNextAttemptMs)
                            true
                        }

    Will these issue be addressed in the future?

martinbonnin commented 4 months ago

Thanks for trying it out and for the details!

  1. Good catch. JvmWebSocketEngine is currently internal. I'll open a public constructor function in next release.

  2. Good catch as well, the method is called retryOnErrorInterceptor, I'll update the docs. You can use it like so:

    val apolloClient = ApolloClient.Builder()
        .retryOnErrorInterceptor(MyRetryOnErrorInterceptor())
        .build()

    class MyRetryOnErrorInterceptor : ApolloInterceptor {
      object RetryException: Exception()

      override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
        var attempt = 0
        return chain.proceed(request).onEach {
              if (request.retryOnError == true && it.exception != null && it.exception is ApolloNetworkException) {
                throw RetryException
              } else {
                attempt = 0
              }
            }.retryWhen { cause, _ ->
              if (cause is RetryException) {
                attempt++
                delay(2.0.pow(attempt).seconds)
                true
              } else {
                // Not a RetryException, probably a programming error, pass it through
                false
              }
            }
      }
    }

I'm aware this is significantly more verbose than the old way but the old way used to do a bunch of assumptions that didn't always work. At least this method should give you full control over what exceptions you want to retry, exponential backoff, etc...

martinbonnin commented 4 months ago

@dmitrytavpeko WebSocketEngine(WebSocket.Factory) is now available in the SNAPSHOTs if you want to try it out.

PS: I also realized we can simplify renewing the uuid (see https://github.com/apollographql/apollo-kotlin/pull/6075)

dmitrytavpeko commented 4 months ago

@martinbonnin Thank you for the quick response!

jvanderwee commented 3 months ago

Hey @martinbonnin 👋 it looks like there are some changes required when authenticating websockets which aren't mentioned in the migration guide, specifically setting connection payload and refreshing token. Would it be possible to add this to the migration doc?

martinbonnin commented 3 months ago

@jvanderwee Can you share your existing code here? I'll add the matching snippet to the doc

jvanderwee commented 3 months ago
class WebSocketReconnectException: Exception("The WebSocket needs to be reopened")
val apolloClient = ApolloClient.Builder()
    .serverUrl("http://localhost:8080/graphql")
    .subscriptionNetworkTransport(
        WebSocketNetworkTransport.Builder()
            .serverUrl("http://localhost:8080/subscriptions")
            .protocol(
                GraphQLWsProtocol.Factory(
                    connectionPayload = {
                        mapOf("Authorization" to token)
                    },
                ),
            )
            .reopenWhen { throwable, attempt ->
                if (throwable is WebSocketReconnectException) {
                    true
                } else {
                    // Another WebSocket error happened, decide what to do with it
                    // Here we're trying to reconnect at most 3 times
                    attempt < 3
                }
            }
            .build()
    )
    .build()
apolloClient.subscriptionNetworkTransport.closeConnection(WebSocketReconnectException())
martinbonnin commented 3 months ago

Hi @jvanderwee thanks for sending this!

Looks like you're calling closeConnection() to force a reconnect? closeConnection() is still around so you can do something similar with:

class WebSocketReconnectException: Exception("The WebSocket needs to be reopened")
class RetryException: Exception("The WebSocket needs to be retried")
class MyInterceptor: ApolloInterceptor {
  override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
    return chain.proceed(request)
        .onEach {
          if (request.retryOnError == true && it.exception != null && it.exception is ApolloNetworkException) {
            throw RetryException()
          }
        }
        .retryWhen { cause, attempt ->
          when (cause) {
            is RetryException -> {
              attempt < 3
            }
            is WebSocketReconnectException -> {
              true
            }
            else -> false
          }
        }.catch { 
          if (it !is RetryException) {
            throw it
          }
        }
  }
}
val apolloClient = ApolloClient.Builder()
    .serverUrl("http://localhost:8080/graphql")
    .subscriptionNetworkTransport(
        WebSocketNetworkTransport.Builder()
            .serverUrl("http://localhost:8080/subscriptions")
            .protocol(
                GraphQLWsProtocol(
                    connectionParams = {
                        mapOf("Authorization" to token)
                    },
                ),
            )
            .retryOnError { it.operation is Subscription }
            .retryOnErrorInterceptor(MyInterceptor())
            .build()
    )
    .build()
apolloClient.subscriptionNetworkTransport.closeConnection(WebSocketReconnectException())

While the above should be working, it's pretty verbose and I'd love to take this opportunity to do things a bit better.

Can you share what triggers the call to closeConnection()? Is that a sepcific GraphQL message? A WebSocket close frame? Or something else completely out of band?

jvanderwee commented 3 months ago

When we change the import for closeConnection

-import com.apollographql.apollo.network.ws.closeConnection
+import com.apollographql.apollo.network.websocket.closeConnection

the exception is expected to be of type ApolloException, so we're forced to use something like DefaultApolloException:

apolloClient.subscriptionNetworkTransport.closeConnection(DefaultApolloException("WebSocket needs to be closed"))

but then this means we need is DefaultApolloException in the retryWhen which doesn't seem quite right:

.retryWhen { cause, attempt ->
    when (cause) {
        is DefaultApolloException -> {
            true
        }
        ...
    }
}

could we add a specific subclass of ApolloException to be used for closing the connection?

if we don't update the import an IllegalArgumentException is thrown because websocket.WebSocketNetworkTransport is not an instance of ws.WebSocketNetworkTransport

we close the connection whenever we need to update the authorisation header in the payload

martinbonnin commented 3 months ago

Thanks for the follow up!

could we add a specific subclass of ApolloException to be used for closing the connection?

We can make a closeConnection() that reports a WebSocketManuallyClosedException. I'd still keep the closeConnection(ApolloException) in case someone needs to tunnel data.

we close the connection whenever we need to update the authorisation header in the payload

That's the question. When do you need to update the authorization header? Ideally I would expect the server to notify its listeners directly in the websocket with some message ("token expired") or even a WebSocket close frame.

This seems like the best way and would allow to handle websocket authentication in a single interceptor, just like for HTTP. That'd be a lot more symmetrical. But maybe your server doesn't signal that? In which case I'm curious how you can tell that you need to refresh the token (timeout, something else?)

jvanderwee commented 3 months ago

We can make a closeConnection() that reports a WebSocketManuallyClosedException

👌 thanks!

I would expect the server to notify its listeners directly in the websocket

very good point! we've been closing the connection as a side-effect of our HTTP authorisation interceptor, but it makes way more sense for this to be handle my the websocket itself. I'll look into this! we will still need to update the authorisation header when the user logs in though

martinbonnin commented 3 months ago

we've been closing the connection as a side-effect of our HTTP authorisation interceptor

Interesting! What happens if you're on a screen that listens to a subscription without ever doing a query/mutation? The subscription has no way to know when to renew its token in that case?

jvanderwee commented 3 months ago

correct! very much an oversight on our part 🙃

martinbonnin commented 3 months ago

@jvanderwee Thanks for the follow up!

I did not mention closeConnection() explicitely in there because I'd like to avoid promoting it too much. While it's technically the only way to get your setup working for now, I'd like to encourage the backend to send something when the token needs to be updated instead.

This probably will require new APIs to update the token from an ApolloInterceptor, just like for the Authorization header for HTTP.

Right now I'm thinking something like this:

class AuthorizationInterceptor : ApolloInterceptor {
  override fun <D : Operation.Data> intercept(request: ApolloRequest<D>, chain: ApolloInterceptorChain): Flow<ApolloResponse<D>> {
    return flow {
      val token = getOrRefreshToken()
      emit(
          request.newBuilder()
              // for HTTP transport
              .addHttpHeader("Authorization", "Bearer $token")
              // for WebSocket transport
              //.webSocketConnectionPayload(mapOf("token" to token))
              .build()
      )
    }.flatMapConcat {
      chain.proceed(it)
    }.onEach { 
      if (it.errors.contains("Token expired")) {
        throw TokenExpired
      }
    }.retryWhen { cause, attempt -> 
      cause is TokenExpired
    }
  }
}

I haven't tested it yet but assuming the server can return the 401 information is a regular GraphQL response, that would allow handling authentication consistently between HTTP and WebSocket. Any thoughts? Could that work for you?

jvanderwee commented 3 months ago

That would work for us! Thanks @martinbonnin

jvanderwee commented 1 month ago

👋 we're seeing the following crash occur on Android in 4.0.1 after migrating to experimental web sockets. unsure when this happens at the moment, will update if we narrow it down

          Fatal Exception: java.lang.NullPointerException: Attempt to invoke interface method 'void w6.u.close(int, java.lang.String)' on a null object reference
       at com.apollographql.apollo.network.websocket.internal.SubscribableWebSocket.shutdown(SubscribableWebSocket.kt:119)
       at com.apollographql.apollo.network.websocket.internal.SubscribableWebSocket.onError(SubscribableWebSocket.kt:220)
       at com.apollographql.apollo.network.websocket.JvmWebSocket.onFailure(WebSocketEngine.jvm.kt:67)
       at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.kt:592)
       at okhttp3.internal.ws.RealWebSocket$connect$1.onFailure(RealWebSocket.kt:202)
martinbonnin commented 1 month ago

@jvanderwee thanks for the feedback. This can happen if the onError callback is called while opening the WebSocket. I expected OkHttp to dispatch the callback but looks like this is not the case. I made a fix here.

For the record, would you have the rest of the stacktrace so we can see what triggers this error?

jvanderwee commented 1 month ago

Thanks!

Fatal Exception: java.lang.NullPointerException: Attempt to invoke interface method 'void w6.u.close(int, java.lang.String)' on a null object reference
       at com.apollographql.apollo.network.websocket.internal.SubscribableWebSocket.shutdown(SubscribableWebSocket.kt:119)
       at com.apollographql.apollo.network.websocket.internal.SubscribableWebSocket.onError(SubscribableWebSocket.kt:220)
       at com.apollographql.apollo.network.websocket.JvmWebSocket.onFailure(WebSocketEngine.jvm.kt:67)
       at okhttp3.internal.ws.RealWebSocket.failWebSocket(RealWebSocket.kt:592)
       at okhttp3.internal.ws.RealWebSocket$connect$1.onFailure(RealWebSocket.kt:202)
       at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:525)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:644)
       at java.lang.Thread.run(Thread.java:1012)
martinbonnin commented 1 month ago

So it dispatches from a separate thread but that thread executes concurrently with the constructor... While the fix should work, I'm not sure it's 100% correct. I'll revisit.

martinbonnin commented 1 month ago

New try here. If you get a chance to try the SNAPSHOTs, let us know how that goes.

jvanderwee commented 1 month ago

Thanks @martinbonnin! When do you plan on publishing the 4.0.2 release?

martinbonnin commented 1 month ago

Currently planned for next week

jvanderwee commented 1 month ago

Thanks @martinbonnin 🙏 just trying out the snapshot, unable to pull in apollo-testing-support due to this:

Could not find apollo-kotlin:apollo-mockserver:unspecified.
     Searched in the following locations:
       - https://s01.oss.sonatype.org/content/repositories/snapshots/apollo-kotlin/apollo-mockserver/unspecified/apollo-mockserver-unspecified.pom
       - https://dl.google.com/dl/android/maven2/apollo-kotlin/apollo-mockserver/unspecified/apollo-mockserver-unspecified.pom
       - https://repo.maven.apache.org/maven2/apollo-kotlin/apollo-mockserver/unspecified/apollo-mockserver-unspecified.pom
     Required by:
         project :shared > com.apollographql.apollo:apollo-testing-support:4.0.2-SNAPSHOT:20241023.101530-26 > com.apollographql.apollo:apollo-testing-support-jvm:4.0.2-SNAPSHOT:20241023.101530-26
martinbonnin commented 1 month ago

Ah this is interesting! Probably due to the moving of mockserver. Thanks for reporting it, I'll take a look!

martinbonnin commented 1 month ago

@jvanderwee the new SNAPSHOT is currently building. In the meantime, you can workaround with

configurations.configureEach {
    exclude(group= "apollo-kotlin", module = "apollo-mockserver")
}