apollographql / apollo-kotlin

:rocket: Β A strongly-typed, caching GraphQL client for the JVM, Android, and Kotlin multiplatform.
https://www.apollographql.com/docs/kotlin
MIT License
3.73k stars 655 forks source link

Websockets protocol over Phoenix Channels for Absinthe #3809

Closed ajacquierbret closed 2 years ago

ajacquierbret commented 2 years ago

Currently, there is no built-in way for the Apollo Client (v3) to communicate over websockets with an Absinthe server. An Absinthe official implementation is available for the JS client, but not for Kotlin.

I decided to build my own and share it with you so that everyone can use it.

Even better, if you (project contributors) think it's something developers out there would definitely use and if it meets the project's code quality requirements, feel free to include it in the repo. If that sounds good to you, I'd be happy to tweak a bit the code in order to meet those requirements.

Here's some important things to know :

I'd love to have some feedbacks about this !

Installation (be sure to use Apollo v3 !)

/* shared/build.gradle.kts */

dependencies {
  val androidMain by getting {
    dependencies {
      implementation("com.github.dsrees:JavaPhoenixClient:1.0.0")
    }
  }
}

/* /build.gradle.kts */

repositories {
  mavenCentral()
}

Usage

val apolloClient =
  ApolloClient.Builder()
    .wsProtocol(
      absintheWsProtocol.Factory(
        websocketServerUrl = "ws://some.ws.domain/socket",
        // Authentication example 
        headers = mapOf("token" to token)
      )
    )
    .build()

Expect protocol

/* shared/src/commonMain/kotlin/com/example/app/apollo/AbsintheWsProtocol.kt */

package com.example.app.apollo

expect class AbsintheWsProtocol(
    webSocketConnection: WebSocketConnection,
    listener: Listener,
    websocketServerUrl: String,
    headers: Map<String, Any?>?,
    connectionAcknowledgeTimeoutMs: Long
) : WsProtocol {

    class Factory(
        websocketServerUrl: String,
        headers: Map<String, Any?>?,
        connectionAcknowledgeTimeoutMs: Long = 10_000
    ) : WsProtocol.Factory
}

Actual Android protocol

/* shared/src/androidMain/kotlin/com/example/app/apollo/AbsintheWsProtocol.kt */

package com.example.app.apollo

import com.apollographql.apollo3.api.*
import com.apollographql.apollo3.api.http.DefaultHttpRequestComposer
import com.apollographql.apollo3.exception.ApolloNetworkException
import com.apollographql.apollo3.network.ws.*
import kotlinx.coroutines.*
import org.phoenixframework.Message
import org.phoenixframework.Socket
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

private const val ABSINTHE_CHANNEL_TOPIC = "__absinthe__:control"

/**
 * An [AbsintheWsProtocol] that allows [Apollo](https://github.com/apollographql/apollo-kotlin) to communicate with [Absinthe](https://github.com/absinthe-graphql/absinthe)
 * over [Phoenix Channels](https://hexdocs.pm/phoenix/channels.html)
 *
 * @param websocketServerUrl the address of the websocket server
 * @param headers Constant parameters to send when connecting (e.g. authentication headers). Defaults to null
 *
 * @author [@ajacquierbret](https://github.com/ajacquierbret)
 */
actual class AbsintheWsProtocol actual constructor(
    webSocketConnection: WebSocketConnection,
    listener: Listener,
    private val websocketServerUrl: String,
    private val headers: Map<String, Any?>?,
    private val connectionAcknowledgeTimeoutMs: Long,
): WsProtocol(webSocketConnection, listener) {

    private val socket = Socket(websocketServerUrl, headers)
    private val channel = socket.channel(ABSINTHE_CHANNEL_TOPIC)
    private val activeSubscriptions = mutableMapOf<String, String>()

    private suspend fun openSocket() = suspendCoroutine<Unit> { continuation ->
        socket.connect()

        socket.onOpen {
            continuation.resume(Unit)
        }
    }

    private suspend fun joinChannel() = suspendCoroutine<Message?> { continuation ->
        channel
            .join()
            .receive("ok") {
                continuation.resume(it)
            }
            .receive("error") {
                continuation.resume(it)
            }
    }

    @Suppress("UNCHECKED_CAST")
    private fun absintheMessageToApolloMessage(message: Message? = null, type: String? = null): Map<String, Any?> {
        val subscriptionKey =
            (message?.payload as Map<String, Any>?)?.get("subscriptionId") as String?
                ?: message?.topic

        val subscriptionId = activeSubscriptions[subscriptionKey]

        return mapOf(
            "id" to subscriptionId,
            "payload" to message?.payload,
            "type" to (type ?: message?.status)
        )
    }

    override suspend fun connectionInit() {
        openSocket()

        socket.onMessage {
            handleServerMessage(absintheMessageToApolloMessage(message = it, type = "ok"))
        }

        socket.onError { throwable, _ ->
            close()
            listener.networkError(throwable)
        }

        socket.onClose {
            close()
        }

        withTimeout(connectionAcknowledgeTimeoutMs) {
            val message = joinChannel()
            when (val status = message?.status) {
                "ok" -> return@withTimeout
                "error" -> throw ApolloNetworkException("Connection error while joining phoenix channel : $message")
                else -> println("Unhandled message status while joining phoenix channel : $status")
            }
        }
    }

    @Suppress("UNCHECKED_CAST")
    override fun handleServerMessage(messageMap: Map<String, Any?>) {
        println("MESSAGE FROM ABSINTHE SERVER : $messageMap")

        val subscriptionId = messageMap["id"] as String?
        val payload = messageMap["payload"] as Map<String, Any?>?

        if (subscriptionId != null) {
            when (messageMap["type"]) {
                "ok" -> if (payload !== null) {
                    val result = payload["result"] as Map<String, Any?>?

                    if (result !== null) {
                        listener.operationResponse(
                            id = subscriptionId,
                            payload = result
                        )
                    }
                }
                "error" -> listener.operationError(subscriptionId, payload)
                "complete" -> listener.operationComplete(subscriptionId)
                else -> Unit
            }
        } else {
            when (messageMap["type"]) {
                "error" -> listener.generalError(payload)
                else -> Unit
            }
        }
    }

    @Suppress("UNCHECKED_CAST")
    override fun <D : Operation.Data> startOperation(request: ApolloRequest<D>) {
        channel
            .push("doc", DefaultHttpRequestComposer.composePayload(request))
            .receive("ok") {
                val subscriptionKey = (it.payload as Map<String, Any>?)?.get("subscriptionId") as String?
                if (subscriptionKey !== null) {
                    activeSubscriptions[subscriptionKey] = request.requestUuid.toString()
                }
            }
            .receive("error") {
                val messageMap = absintheMessageToApolloMessage(it, type = "error")
                handleServerMessage(messageMap)
            }
    }

    @Suppress("UNCHECKED_CAST")
    override fun <D : Operation.Data> stopOperation(request: ApolloRequest<D>) {
        val subscriptionId = request.requestUuid.toString()
        val isActiveSubscription = activeSubscriptions.values.contains(subscriptionId)

        if (isActiveSubscription) {
            channel.push(
                "unsubscribe", mapOf(
                    "subscriptionId" to subscriptionId
                )
            ).receive("ok") {
                val subscriptionKey =
                    (it.payload as Map<String, Any>?)?.get("subscriptionId") as String?
                        ?: it.topic

                activeSubscriptions.remove(subscriptionKey)
                handleServerMessage(absintheMessageToApolloMessage(it, type = "complete"))
            }
        }
    }

    override suspend fun run() = Unit

    override fun close() {
        channel.leave()
        socket.disconnect()
    }

     actual class Factory actual constructor(
         private val websocketServerUrl: String,
         private val headers: Map<String, Any?>?,
         private val connectionAcknowledgeTimeoutMs: Long
    ) : WsProtocol.Factory {
        override val name: String
            get() = "absinthe-ws"

        override fun create(
            webSocketConnection: WebSocketConnection,
            listener: Listener,
            scope: CoroutineScope
        ): WsProtocol {
            return AbsintheWsProtocol(
                webSocketConnection = webSocketConnection,
                listener = listener,
                websocketServerUrl = websocketServerUrl,
                headers = headers,
                connectionAcknowledgeTimeoutMs = connectionAcknowledgeTimeoutMs,
            )
        }
    }

}

Actual iOS protocol (TODO)

/* shared/src/iosMain/kotlin/com/example/app/apollo/AbsintheWsProtocol.kt */

package com.example.app.apollo

import com.apollographql.apollo3.api.ApolloRequest
import com.apollographql.apollo3.api.Operation
import com.apollographql.apollo3.network.ws.WebSocketConnection
import com.apollographql.apollo3.network.ws.WsFrameType
import com.apollographql.apollo3.network.ws.WsProtocol
import kotlinx.coroutines.CoroutineScope

actual class AbsintheWsProtocol actual constructor(
    webSocketConnection: WebSocketConnection,
    listener: Listener,
    private val websocketServerUrl: String,
    private val headers: Map<String, Any?>?,
    private val connectionAcknowledgeTimeoutMs: Long
): WsProtocol(webSocketConnection, listener) {
    override suspend fun connectionInit() {
        TODO("Not yet implemented")
    }

    override fun handleServerMessage(messageMap: Map<String, Any?>) {
        TODO("Not yet implemented")
    }

    override fun <D : Operation.Data> startOperation(request: ApolloRequest<D>) {
        TODO("Not yet implemented")
    }

    override fun <D : Operation.Data> stopOperation(request: ApolloRequest<D>) {
        TODO("Not yet implemented")
    }

    actual class Factory actual constructor(
        private val websocketServerUrl: String,
        private val headers: Map<String, Any?>?,
        private val connectionAcknowledgeTimeoutMs: Long
    ) : WsProtocol.Factory {
        override val name: String
            get() = "absinthe-ws"

        override fun create(
            webSocketConnection: WebSocketConnection,
            listener: Listener,
            scope: CoroutineScope
        ): WsProtocol {
            return AbsintheWsProtocol(
                webSocketConnection = webSocketConnection,
                listener = listener,
                websocketServerUrl = websocketServerUrl,
                headers = headers,
                connectionAcknowledgeTimeoutMs= connectionAcknowledgeTimeoutMs
            )
        }
    }
}
martinbonnin commented 2 years ago

Hi πŸ‘‹ Thanks for sharing this πŸ’œ !

@mrctrifork might be interested as they asked about this some time ago.

I'm personally not super familiar with Absinthe but an early question is whether absinthe uses WebSockets or something else? Your implementation uses WsProtocol which takes a WebSocketConnection so it will get a WebSocket from WebSocketNetworkTransport. But it also opens a org.phoenixframework.Socket, which makes me think maybe not both are required? It might be worth moving to the NetworkTransport api to avoid opening a WebSocket.

BoD commented 2 years ago

From what I can see, is does use a WebSocket internally, with a protocol on top, which JavaPhoenixClient implements. In theory this means we could make this a WsProtocol:

With the current approach using JavaPhoenixClient (I also think it would make sense for it to be at the NetworkTransport level):

ajacquierbret commented 2 years ago

Hi @martinbonnin !

FYI, Absinthe is just a GraphQL toolkit for Elixir that does not enforce a particular websocket protocol.

The core absinthe project doesn't concern itself with any transport. Right now the only websocket based transport that exists is in absinthe_phoenix which expects the client to use the phoenix channels library. You can find an integration here https://github.com/absinthe-graphql/absinthe-socket/tree/master/packages/socket-apollo-link

If you don't want to use that package then you'll need to write a handler for the apollo specific protocol.

That said, Absinthe however uses Phoenix channels (that uses websockets under the hood) in order for it to manage GraphQL subscriptions.

Like queries and mutations, subscriptions are not intrinsically tied to any particular transport, and they're built within Absinthe itself to be able to operate on many different platforms. At the moment however the most common and fully featured platform that you can run them on with Elixir is via Phoenix channels

So I thought using Phoenix channels would be the easiest and fastest way to go with Apollo Kotlin since a client was already out there !

Concerning WsProtocol , I agree that the WebSocketConnection it takes is completely useless for my implementation as it uses a Phoenix Socket instead. I wanted to get this up and running quickly so implementing WsProtocol (which already has abstract functions designed for websocket connections) was somewhat easier in my mind. However, I agree that moving to NetworkTransport is certainly the way to go.

In the next few days/weeks, I'll try to move forward this roadmap:

ajacquierbret commented 2 years ago

Sorry @BoD, I didn't see your comment. What do you both think about this roadmap ? It seems to me that it could work for iOS/Android but given my recent knowledge of Kotlin & KMP I may be wrong !

martinbonnin commented 2 years ago

Forking and re-designing the JavaPhoenixClient in order to build a Multiplatform library (at least iOS & Android), using OkHttp for Android and NSUrlSession for iOS

🀩

Switching all the JavaPhoenixClient library callbacks to a coroutine based approach.

🀩

Publishing this brand new library as KotlinPhoenixClient on Github

🀩

Using KotlinPhoenixClient as a dependency of AbsintheWsProtocol (maybe AbsintheNetworkTransport ?)

I suspect AbsintheNetworkTransport will be a better fit but 🀩

What do you think about using Ktor for both platforms in order to reduce platform specific code ?

Last time I checked, there was no native websocket support in Ktor so that wouldn't be an option unless things changed recently?

You can reuse DefaultWebSocketEngine if you don't want to deal with platform specific code. This is the low level API that the current Apollo code is using. Right now it's bundled in apollo-runtime but if you want to reuse it, we could publish it as a separate module.

All in all, sounds like a great plan πŸ‘ . Let us know if extracting DefaultWebSocketEngine would be useful and we can look into it.

BoD commented 2 years ago

Sorry @BoD, I didn't see your comment.

No worries at all, we commented concurrently πŸ˜…

On my side, what you said looks good. Forking / reworking JavaPhoenixClient could be a big endeavour however (but an interesting one for sure!).

About using KTOR client, I think it would be nice to avoid the additional dependency. We also had a recent question about it here.

ajacquierbret commented 2 years ago

I'm glad you both like this plan!

No worries at all, we commented concurrently πŸ˜…

It looks like a race condition haha!

Joke aside, for shure reworking JavaPhoenixClient seems a bit challenging but nothing too difficult I guess. The lib already uses OkHttp3 as a networking client so I thought reusing its implementation for Android (and maybe switching to OkHttp4 ?) and adapting what's platform specific (i.e. modules using OkHttp and GSON) with NSUrlSession for iOS networking.

What JSON encoding/parsing utilities does the Apollo Client already uses ? I thought using kotlinx.serialisation for this work since it's already multiplatform, so no need to bridge platform specific libs.

I'll take a look at DefaultWebSocketEngine tonight when I'll move from the office and let you know what are my thoughts on it !

Thanks for all the inputs!

martinbonnin commented 2 years ago

What JSON encoding/parsing utilities does the Apollo Client already uses ?

Apollo uses its own Json parser borrowed from Moshi (itself borrowed from Gson): https://github.com/apollographql/apollo-kotlin/blob/31c873c5abbc4873c98e31ba17da7c16e90109ef/apollo-api/src/commonMain/kotlin/com/apollographql/apollo3/api/json/BufferedSourceJsonReader.kt#L31

You can use AnyAdapter to map a Json bufferedSource to/from a Map<String, Any?>. The usage is not as nice as kotlinx.serialization because you're dealing with an untyped Map but we found that avoiding extra libraries was worth it.

Using kotlinx.serialization is ok too if it saves some time.

Let us know how that goes!

ajacquierbret commented 2 years ago

I'd like to avoid as much as possible to depend on Apollo modules for the rewrite of JavaPhoenixClient.

I think that the new KotlinPhoenixClient must be agnostic and thus should not be aware of any GraphQL client or protocol that's used on top of it, because its only purpose is to provide a clear and robust API for dealing with Phoenix channels.

That said, using kotlinx.serialization may be the best solution in order to fulfill this requirement, instead of relying on a custom Json parser from Apollo, regardless of how suited for GraphQL and performant it is!

I'll keep you informed of the progress of the project in this thread!

ajacquierbret commented 2 years ago

Hi @martinbonnin and @BoD ! Some feedbacks about the project !

I just published a brand new Kotlin Phoenix library on Github and Maven Central !

It now uses Coroutines (with multithreading support for iOS) and is now mobile friendly (Android/iOS) ! I decided to go with native HTTP clients, OkHttp3 (v4) for Android, NSURLSession for iOS, and Kotlin Serialization for JSON parsing.

The majority of callbacks have been replaced with Kotlin's SharedFlow from which you can now collect. The old sendBuffer MutableList of callbacks have been refactored with a MutableList of lazily initiated Jobs. All Java's ThreadPoolExecutor schedulers uses have been refactored with delayed coroutines executed on the default dispatcher (background pool of threads).

I did my best to properly handle coroutines cancellation and suspending functions but I'd like to have some feedbacks about it.

There's still quite a lot of work to do on this project. I'm very new to the Kotlin realm and had to learn a lot, especially concerning coroutines ! Some implementations may not be adequate and may lack optimization or concurrency paradigms understanding, so experienced eyes are very much welcome !

Here's the updated roadmap for next weeks :

BoD commented 2 years ago

Hey! Thanks for the update, and congrats on phoenix-kotlin, it looks great 😁 On my side I don't have any feedback at this point but don't hesitate if you want eyes on specific parts!

ajacquierbret commented 2 years ago

Hi ! I'd like to have some advices from Apollo devs concerning my implementation of AbsintheNetworkTransport.

It's pretty simple:

First option: I keep AbsintheWsProtocol as is and create a slightly different version of WebSocketNetworkTransport called PhoenixNetworkTransport that handles a Phoenix socket lifecycle instead of a plain old websocket. Going with that, both Protocol and Transport follow the event passing architecture implemented by the new PhoenixNetworkTransport, including Transport's WsProtocol.Listener implementation that Protocol can call in response to server events.

Second option: On the other hand, I can just create a standalone AbsintheNetworkTransport that skips the message passing architecture that normally binds it with a Protocol. Doing so will force me to imperatively call startOperation, stopOperation, etc. instead of dispatching events to which Protocol reacts.

Both options involve gaining access to some Apollo's internal classes such as Event and Command and all classes that inherits from them.

I prefer the first option for these reasons :

@BoD @martinbonnin What are your thoughts on this ?

martinbonnin commented 2 years ago

Hi @ajacquierbret πŸ‘‹ Congrats on kotlin-phoenix, this is seriously cool πŸ‘ πŸ‘

To the question of how to use it in Apollo Kotlin, I'd prefer a standalone AbsintheNetworkTransport that defines its own Event and Command classes.

The current WebSocketNetworkTransport state machine and the Event/Command machinery is still pretty young and might require modifications as we get more feedbacks and more users. I'm afraid exposing more internal classes will make these modifications harder.

On the other hand, the NetworkTransport API is really small and battle-proven, I don't expect it to change in the foreseeable future.

The only reason there is WsProtocol in Apollo Kotlin is to handle AppSync and graphql-ws but as its name implies, it is completely bound to WebSockets. If you're using another kind of socket, I think you might as well not use a WsProtocol anymore, or maybe define another one (PhoenixProtocol maybe?)

Long story short, can you copy/paste WebSocketNetwork and modify the bits that you need to change? That might duplicate some code but I'd rather much have 2 working implementations living side by side the time we stabilize them. Once stabilized, we can always refactor to extract the common parts. The other way is not possible sadly. Once an API is made public, there's no coming back.

ajacquierbret commented 2 years ago

Hi @martinbonnin ! πŸ‘‹ Thanks a lot for your precious advices and congratulations !

Following your recommendations, I made a PhoenixNetworkTransport that is a tweaked version of WebSocketNetworkTransport, with minimal modifications. Actually it's much more a merge of the old AbsintheWsProtocol inside the newly created PhoenixNetworkTransport than a new implementation. But it works!

As you said, I think we could extract some parts of the code in order to avoid duplication, but for now it's completely functional.

If it doesn't bother you too much, I'd really like to have your opinion on the coroutine dispatcher disposal mechanism that is called right after the supervise call of the transport. I didn't know what to do with this internal implementation, so I skipped it while waiting to learn a bit more about it.

Anyway, I published the transport on the repo and updated the READMEs, the whole library should now be theoretically usable. I'll continue to post the project's progress on this thread on each update if you don't mind.

If this project satisfies Apollo's requirements and reaches some kind of stability, do you think you could recommend developers which uses Absinthe/Phoenix servers to use Kotlin Phoenix in conjunction with the Apollo Client in order to take advantage of subscriptions ? I was thinking about just adding a simple line to the docs.

The updated roadmap lies in the repo : https://github.com/ajacquierbret/kotlin-phoenix

martinbonnin commented 2 years ago

it's much more a merge of the old AbsintheWsProtocol inside the newly created PhoenixNetworkTransport than a new implementation. But it works!

Nice 🀩

I'd really like to have your opinion on the coroutine dispatcher disposal mechanism [...] I didn't know what to do with this internal implementation, so I skipped it while waiting to learn a bit more about it.

The websocket CoroutineScope uses a single thread dispatcher because:

  1. supervise() touches mutable state and touching it from multiple thread will fail with Kotlin native
  2. more generally, idleJob, connectionJob, etc... is accessed by multiple concurrent coroutines so using a single thread dispatcher ensures that no race condition happens there.

On the JVM, we need the dispose() to correctly shutdown the thread backing the dispatcher. Without this, the JVM leaks a thread and doesn't terminate. This is very often not an issue for Android app with a global ApolloClient but can become a real issue in backend or containerized environments.

Anyway, I published the transport on the repo and updated the READMEs, the whole library should now be theoretically usable. I'll continue to post the project's progress on this thread on each update if you don't mind.

Nice πŸ‘ Of course updates are always welcome πŸ’™

do you think you could recommend developers which uses Absinthe/Phoenix servers to use Kotlin Phoenix in conjunction with the Apollo Client in order to take advantage of subscriptions ?

Sure thing! I'll open a PR in a few minutes

ajacquierbret commented 2 years ago

supervise() touches mutable state and touching it from multiple thread will fail with Kotlin native

If I understand well, Kotlin Native through Coroutines version 1.6.0-native-mt now supports sharing mutable state between threads without requiring this state to be frozen. Does that mean using this specific version of Coroutines allows me to overlook this ?

more generally, idleJob, connectionJob, etc... is accessed by multiple concurrent coroutines so using a single thread dispatcher ensures that no race condition happens there.

I assume this is something that no kind of magical version of Coroutine adresses, so I think I'll have to use a single thread dispatcher anyway.

On the JVM, we need the dispose() to correctly shutdown the thread backing the dispatcher. Without this, the JVM leaks a thread and doesn't terminate. This is very often not an issue for Android app with a global ApolloClient but can become a real issue in backend or containerized environments.

Because JVM is a target I want Kotlin Phoenix to support pretty soon, I'll make sure the thread is properly shutdown !

Thanks a lot for your PR ! ✨

martinbonnin commented 2 years ago

The new memory model will indeed help lifting some restrictions but it's not 100% ready yet. Also we need single-threaded operation for other reasons (shared state and concurrent coroutines) so this will most likely stay like this in the foreseeable future.

ajacquierbret commented 2 years ago

Hi! Some news about the project!

I managed to handle single-threaded operations by making a copy of Apollo's BackgroundDispatcher. I would have liked to simply import the module but it is flagged as internal, and because I don't think it's useful to create something different just for the sake of authenticity, copy/paste was the most logical solution I came through. Let me know if you have any objections.

Also, I managed to handle socket reconnection by exposing a public reconnect function from the adapter that will run a reconnectWith param defined in the builder so that one can define his own reconnection strategy (e.g. based on user session changes). Sample usage can be found in the adapter's doc.

Does it sound like a good strategy to you?

martinbonnin commented 2 years ago

Hi @ajacquierbret. This sounds like a very good strategy to me πŸ‘. I know copy/pasting isn't the greatest feeling out there but in that specific case, I think it's ok and will allow us to keep evolving the public API without risking breaking too many things.

martinbonnin commented 2 years ago

Hi @ajacquierbret can we close this issue now that https://github.com/ajacquierbret/kotlin-phoenix/tree/main/kotlinphoenix-adapters is released?

martinbonnin commented 2 years ago

Released in 3.2.0. Thanks again!