Tinder / Scarlet

A Retrofit inspired WebSocket client for Kotlin, Java, and Android
Other
3.24k stars 239 forks source link

How do we use coroutine adapter? I can't find an example for the life of me. #114

Closed Orpheus007 closed 5 years ago

Orpheus007 commented 5 years ago

Can someone please give me an example on how to use kotlin coroutines with scarlet? Like how to set it up and how to "Observe" the data etc.. Thank you

ilyaklyukin commented 5 years ago

@NaaleinGrohiik

For dependencies of Scarlett2: maven { url "https://oss.sonatype.org/content/repositories/snapshots" }

// Networking
//versions.scarlet : '0.2.5-SNAPSHOT' implementation "com.tinder.scarlet:scarlet:${versions.scarlet}" implementation "com.tinder.scarlet:protocol-websocket-okhttp:${versions.scarlet}" implementation "com.tinder.scarlet:message-adapter-gson:${versions.scarlet}" implementation "com.tinder.scarlet:stream-adapter-coroutines:${versions.scarlet}" implementation "com.tinder.scarlet:lifecycle-android:${versions.scarlet}" implementation "com.squareup.okhttp3:okhttp:${versions.okhttp3}" implementation "com.squareup.okhttp3:logging-interceptor:${versions.okhttp3}" implementation "com.google.code.gson:gson:${googleVersions.gson}"

I'm using Dagger2 for config Scarlet2:

    @Provides
    @Singleton
    @JvmStatic
    @Named("apiBaseUrl")
    fun provideApiBaseUrl(app: Application): String {
        val url = "wss://" + app.getString(R.string.api_url) + "/game"
        Timber.d("${LogConfig.CONNECTION_TAG} using url: $url")
        return url
    }

    @Provides
    @Singleton
    @JvmStatic
    fun provideScarlet(
        app: Application,
        httpClient: OkHttpClient,
        @Named("apiBaseUrl") baseUrl: String
    ): Scarlet = Scarlet(
        OkHttpWebSocket(
            httpClient,
            OkHttpWebSocket.SimpleRequestFactory(
                { Request.Builder().url(baseUrl).build() },
                { ShutdownReason.GRACEFUL }
            )),
        Scarlet.Configuration(
            backoffStrategy = LinearBackoffStrategy(ClientConfig.RECONNECT_INTERVAL),
            messageAdapterFactories = listOf(GsonMessageAdapter.Factory()),
            streamAdapterFactories = listOf(CoroutinesStreamAdapterFactory()),
            lifecycle = AndroidLifecycle.ofApplicationForeground(app)
        )
    )

/*
    @Provides
    @Singleton
    @JvmStatic
    fun createNoConnectionInterceptor(
        app: Application
    ): NoConnectionInterceptor = NoConnectionInterceptor(app)
*/
    @Provides
    @Singleton
    @JvmStatic
    fun createHttpClient(
   //     noConnectionInterceptor: NoConnectionInterceptor
    ): OkHttpClient {
        val builder = OkHttpClient.Builder()
            //.addInterceptor(noConnectionInterceptor)
            .readTimeout(ClientConfig.RECONNECT_INTERVAL, TimeUnit.MILLISECONDS)
            .writeTimeout(ClientConfig.RECONNECT_INTERVAL, TimeUnit.MILLISECONDS)
            .followRedirects(false)
            .connectTimeout(ClientConfig.RECONNECT_INTERVAL, TimeUnit.MILLISECONDS)

        if (BuildConfig.DEBUG) {
            val httpLoggingInterceptor = HttpLoggingInterceptor()
            val loggingInterceptor =
                httpLoggingInterceptor.apply {
                    httpLoggingInterceptor.level = HttpLoggingInterceptor.Level.BODY
                }
            builder.addInterceptor(loggingInterceptor)
        }

        return builder.build()
    }

    @Provides
    @Singleton
    @JvmStatic
    fun createRestApi(scarlet: Scarlet): RestApi {
        return scarlet.create()
    }

    @Provides
    @Singleton
    @JvmStatic
    fun provideWebSocketService(restApi: RestApi): WebSocketService =
        WebSocketService(restApi)

Custom interface:

interface RestApi {
    @Send
    fun send(message: Any): Boolean

    @Receive
    fun observeEvents(): ReceiveChannel<WebSocketEvent>

    @Receive
    fun observeApiMessage(): ReceiveChannel<ApiMessage>
}
import com. ... .datasource.net.message.ApiMessage
import com.tinder.scarlet.websocket.WebSocketEvent
import kotlinx.coroutines.channels.ReceiveChannel
import javax.inject.Inject

class WebSocketService @Inject constructor(private val restApi: RestApi) {

    fun send(message: Any): Boolean {
        return restApi.send(message)
    }

    fun observeEvents(): ReceiveChannel<WebSocketEvent> {
        return restApi.observeEvents()
    }

    fun observeApiMessage(): ReceiveChannel<ApiMessage> {
        return restApi.observeApiMessage()
    }
}

Api message is a base class for a list of different messages, converted to json abstract class ApiMessage(type: String) for example:

data class NeedAlias(
    val type: String = "NEED_ALIAS",
    val playerId: Long
) : ApiMessage(type)

Our API has websocket requests with responses, so I've created BaseRep mechanism:

abstract class BaseRepository(private val webSocketService: WebSocketService) {

    private fun sendRequest(request: Any): Result<Unit> {
        val isSent = webSocketService.send(request)
        return if (isSent)
            Result.Success(Unit)
        else Result.Error(Failure.NetworkConnection)
    }

    @ExperimentalCoroutinesApi
    protected suspend fun processRequest(
        request: Request,
        onResponse: (message: ApiMessage) -> Result<Unit>
    ): Result<Unit> {
        return suspendCoroutine { continuation ->

            GlobalScope.launch(CoroutineExceptionHandler { _, throwable ->
                Timber.e("SOCKET ERROR $throwable")
                Result.Error(Failure.NetworkConnection)
            }) {
                val channel = this@BaseRepository.webSocketService.observeApiMessage()
                channel.consumeEach {
                    continuation.resume(onResponse(it))
                    channel.cancel()
                }
            }

            if (sendRequest(request).isFailure) {
                Result.Error(Failure.NetworkConnection)
            }
        }
    }

}
example of rep with request:
class LoginRepositoryImpl @Inject constructor(
    webSocketService: WebSocketService,
    val resources: Resources
) : BaseRepository(webSocketService), LoginRepository {

    @ExperimentalCoroutinesApi
    override suspend fun login(
        email: String,
        password: String
    ): Result<Unit> {
        val request = StandardLogin(email, password)
        return processRequest(request) { message ->
            when (message) {
                is LoginOk -> Result.Success(Unit)
                is LoginFail -> Result.Error(
                    Failure.ServerError(resources.getString(R.string.wrong_password))
                )
                is NeedAlias -> Result.Error(Failure.ServerError(EMPTY_ALIAS_AND_EMAIL))
                is NeedVerification -> Result.Error(Failure.ServerError(resources.getString(R.string.need_verification)))
                is Error -> {
                    val failure = ServerError.getByCode(message.errorCode).let {
                        when (it) {
                            NO_PLAYER_FOUND -> Failure.ServerError(resources.getString(R.string.wrong_password))
                            NOT_PERMITTED_FOR_TRIAL_PLAYER -> Failure.ServerError(
                                resources.getString(
                                    R.string.wrong_password
                                )
                            )
                            INCOMPLETE_EMAIL_VALIDATION -> Failure.ServerError(resources.getString(R.string.need_verification))
                            else -> Failure.ServerError(it, message.errorMessage)
                        }
                    }

                    Result.Error(failure)
                }
                else -> Result.Error(UnexpectedException(message.toString()))
            }
        }
    }
}

LoginRepository is just an interface to be used from domain layer:

interface LoginRepository {

    suspend fun login(email: String, password: String): Result<Unit>

    suspend fun forgotPassword(email: String): Result<Unit>

    suspend fun chooseAlias(alias: String): Result<Unit>

    suspend fun createEmailAccount(email: String, password: String): Result<Unit>
}
Orpheus007 commented 5 years ago

@ilyaklyukin Thank you so much my good sir! Not only did this answer my question it also gave me new ideas for wrapping results. Really appreciate it!

lambdatamer commented 4 years ago

If you prefer to use Kotlin Flows, here is the adapter implementation:

class FlowStreamAdapter<T> : StreamAdapter<T, Flow<T>> {
    override fun adapt(stream: Stream<T>) = callbackFlow<T> {
        stream.start(object : Stream.Observer<T> {
            override fun onComplete() {
                close()
            }

            override fun onError(throwable: Throwable) {
                close(cause = throwable)
            }

            override fun onNext(data: T) {
                if (!isClosedForSend) offer(data)
            }
        })
        awaitClose {}
    }

    object Factory : StreamAdapter.Factory {
        override fun create(type: Type): StreamAdapter<Any, Any> {
            return when (type.getRawType()) {
                Flow::class.java -> FlowStreamAdapter()
                else -> throw IllegalArgumentException()
            }
        }
    }
}

Then call .addStreamAdapterFactory(FlowStreamAdapter.Factory) on your Scarlet.Builder. Now you can use Flow<T> return type in your services:

interface ExampleService {
    @Receive
    fun example(): Flow<String>
}
Orpheus007 commented 4 years ago

I actually just started working with flow. Thank you for sharing!

Amalip commented 3 years ago

@Orpheus007 @lambdatamer do you have an example using Flows?

ShubhamAgr commented 3 years ago

If you prefer to use Kotlin Flows, here is the adapter implementation:

class FlowStreamAdapter<T> : StreamAdapter<T, Flow<T>> {
    override fun adapt(stream: Stream<T>) = callbackFlow<T> {
        stream.start(object : Stream.Observer<T> {
            override fun onComplete() {
                close()
            }

            override fun onError(throwable: Throwable) {
                close(cause = throwable)
            }

            override fun onNext(data: T) {
                if (!isClosedForSend) offer(data)
            }
        })
        awaitClose {}
    }

    object Factory : StreamAdapter.Factory {
        override fun create(type: Type): StreamAdapter<Any, Any> {
            return when (type.getRawType()) {
                Flow::class.java -> FlowStreamAdapter()
                else -> throw IllegalArgumentException()
            }
        }
    }
}

Then call .addStreamAdapterFactory(FlowStreamAdapter.Factory) on your Scarlet.Builder. Now you can use Flow<T> return type in your services:

interface ExampleService {
    @Receive
    fun example(): Flow<String>
}

I think this is much better way, emit the response and then catch the response with viewmodel https://github.com/kizok/tinder_scarlet_with_coroutine_adapter/blob/master/app/src/main/java/tech/kizok/sockettest/ScarletAdapter/ReceiveChannelStreamAdapter.kt

jemshit commented 3 years ago

Few important points:

BroadcastChannelStreamAdapter should be sth like this:

import com.tinder.scarlet.Stream
import com.tinder.scarlet.StreamAdapter
import com.tinder.scarlet.utils.getRawType
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.BroadcastChannel
import java.lang.reflect.Type

class BroadcastChannelStreamAdapter<T> : StreamAdapter<T, BroadcastChannel<T>> {
    @ExperimentalCoroutinesApi
    override fun adapt(stream: Stream<T>): BroadcastChannel<T> {
        val channel = BroadcastChannel<T>(100)
        stream.start(object : Stream.Observer<T> {
            override fun onComplete() {
                channel.cancel()
            }

            override fun onError(throwable: Throwable) {
                channel.close(cause = throwable)
            }

            override fun onNext(data: T) {
                if (!channel.isClosedForSend) {
                    channel.offer(data)
                }
            }
        })
        return channel
    }

    object Factory : StreamAdapter.Factory {
        override fun create(type: Type): StreamAdapter<Any, Any> {
            return when (type.getRawType()) {
                BroadcastChannel::class.java -> BroadcastChannelStreamAdapter()
                else -> throw IllegalArgumentException()
            }
        }
    }
}
taknikiniga commented 2 years ago

@NaaleinGrohiik

For dependencies of Scarlett2: maven { url "https://oss.sonatype.org/content/repositories/snapshots" }

// Networking //versions.scarlet : '0.2.5-SNAPSHOT' implementation "com.tinder.scarlet:scarlet:${versions.scarlet}" implementation "com.tinder.scarlet:protocol-websocket-okhttp:${versions.scarlet}" implementation "com.tinder.scarlet:message-adapter-gson:${versions.scarlet}" implementation "com.tinder.scarlet:stream-adapter-coroutines:${versions.scarlet}" implementation "com.tinder.scarlet:lifecycle-android:${versions.scarlet}" implementation "com.squareup.okhttp3:okhttp:${versions.okhttp3}" implementation "com.squareup.okhttp3:logging-interceptor:${versions.okhttp3}" implementation "com.google.code.gson:gson:${googleVersions.gson}"

I'm using Dagger2 for config Scarlet2:

    @Provides
    @Singleton
    @JvmStatic
    @Named("apiBaseUrl")
    fun provideApiBaseUrl(app: Application): String {
        val url = "wss://" + app.getString(R.string.api_url) + "/game"
        Timber.d("${LogConfig.CONNECTION_TAG} using url: $url")
        return url
    }

    @Provides
    @Singleton
    @JvmStatic
    fun provideScarlet(
        app: Application,
        httpClient: OkHttpClient,
        @Named("apiBaseUrl") baseUrl: String
    ): Scarlet = Scarlet(
        OkHttpWebSocket(
            httpClient,
            OkHttpWebSocket.SimpleRequestFactory(
                { Request.Builder().url(baseUrl).build() },
                { ShutdownReason.GRACEFUL }
            )),
        Scarlet.Configuration(
            backoffStrategy = LinearBackoffStrategy(ClientConfig.RECONNECT_INTERVAL),
            messageAdapterFactories = listOf(GsonMessageAdapter.Factory()),
            streamAdapterFactories = listOf(CoroutinesStreamAdapterFactory()),
            lifecycle = AndroidLifecycle.ofApplicationForeground(app)
        )
    )

/*
    @Provides
    @Singleton
    @JvmStatic
    fun createNoConnectionInterceptor(
        app: Application
    ): NoConnectionInterceptor = NoConnectionInterceptor(app)
*/
    @Provides
    @Singleton
    @JvmStatic
    fun createHttpClient(
   //     noConnectionInterceptor: NoConnectionInterceptor
    ): OkHttpClient {
        val builder = OkHttpClient.Builder()
            //.addInterceptor(noConnectionInterceptor)
            .readTimeout(ClientConfig.RECONNECT_INTERVAL, TimeUnit.MILLISECONDS)
            .writeTimeout(ClientConfig.RECONNECT_INTERVAL, TimeUnit.MILLISECONDS)
            .followRedirects(false)
            .connectTimeout(ClientConfig.RECONNECT_INTERVAL, TimeUnit.MILLISECONDS)

        if (BuildConfig.DEBUG) {
            val httpLoggingInterceptor = HttpLoggingInterceptor()
            val loggingInterceptor =
                httpLoggingInterceptor.apply {
                    httpLoggingInterceptor.level = HttpLoggingInterceptor.Level.BODY
                }
            builder.addInterceptor(loggingInterceptor)
        }

        return builder.build()
    }

    @Provides
    @Singleton
    @JvmStatic
    fun createRestApi(scarlet: Scarlet): RestApi {
        return scarlet.create()
    }

    @Provides
    @Singleton
    @JvmStatic
    fun provideWebSocketService(restApi: RestApi): WebSocketService =
        WebSocketService(restApi)

Custom interface:

interface RestApi {
    @Send
    fun send(message: Any): Boolean

    @Receive
    fun observeEvents(): ReceiveChannel<WebSocketEvent>

    @Receive
    fun observeApiMessage(): ReceiveChannel<ApiMessage>
}
import com. ... .datasource.net.message.ApiMessage
import com.tinder.scarlet.websocket.WebSocketEvent
import kotlinx.coroutines.channels.ReceiveChannel
import javax.inject.Inject

class WebSocketService @Inject constructor(private val restApi: RestApi) {

    fun send(message: Any): Boolean {
        return restApi.send(message)
    }

    fun observeEvents(): ReceiveChannel<WebSocketEvent> {
        return restApi.observeEvents()
    }

    fun observeApiMessage(): ReceiveChannel<ApiMessage> {
        return restApi.observeApiMessage()
    }
}

Api message is a base class for a list of different messages, converted to json abstract class ApiMessage(type: String) for example:

data class NeedAlias(
    val type: String = "NEED_ALIAS",
    val playerId: Long
) : ApiMessage(type)

Our API has websocket requests with responses, so I've created BaseRep mechanism:

abstract class BaseRepository(private val webSocketService: WebSocketService) {

    private fun sendRequest(request: Any): Result<Unit> {
        val isSent = webSocketService.send(request)
        return if (isSent)
            Result.Success(Unit)
        else Result.Error(Failure.NetworkConnection)
    }

    @ExperimentalCoroutinesApi
    protected suspend fun processRequest(
        request: Request,
        onResponse: (message: ApiMessage) -> Result<Unit>
    ): Result<Unit> {
        return suspendCoroutine { continuation ->

            GlobalScope.launch(CoroutineExceptionHandler { _, throwable ->
                Timber.e("SOCKET ERROR $throwable")
                Result.Error(Failure.NetworkConnection)
            }) {
                val channel = this@BaseRepository.webSocketService.observeApiMessage()
                channel.consumeEach {
                    continuation.resume(onResponse(it))
                    channel.cancel()
                }
            }

            if (sendRequest(request).isFailure) {
                Result.Error(Failure.NetworkConnection)
            }
        }
    }

}
example of rep with request:
class LoginRepositoryImpl @Inject constructor(
    webSocketService: WebSocketService,
    val resources: Resources
) : BaseRepository(webSocketService), LoginRepository {

    @ExperimentalCoroutinesApi
    override suspend fun login(
        email: String,
        password: String
    ): Result<Unit> {
        val request = StandardLogin(email, password)
        return processRequest(request) { message ->
            when (message) {
                is LoginOk -> Result.Success(Unit)
                is LoginFail -> Result.Error(
                    Failure.ServerError(resources.getString(R.string.wrong_password))
                )
                is NeedAlias -> Result.Error(Failure.ServerError(EMPTY_ALIAS_AND_EMAIL))
                is NeedVerification -> Result.Error(Failure.ServerError(resources.getString(R.string.need_verification)))
                is Error -> {
                    val failure = ServerError.getByCode(message.errorCode).let {
                        when (it) {
                            NO_PLAYER_FOUND -> Failure.ServerError(resources.getString(R.string.wrong_password))
                            NOT_PERMITTED_FOR_TRIAL_PLAYER -> Failure.ServerError(
                                resources.getString(
                                    R.string.wrong_password
                                )
                            )
                            INCOMPLETE_EMAIL_VALIDATION -> Failure.ServerError(resources.getString(R.string.need_verification))
                            else -> Failure.ServerError(it, message.errorMessage)
                        }
                    }

                    Result.Error(failure)
                }
                else -> Result.Error(UnexpectedException(message.toString()))
            }
        }
    }
}

LoginRepository is just an interface to be used from domain layer:

interface LoginRepository {

    suspend fun login(email: String, password: String): Result<Unit>

    suspend fun forgotPassword(email: String): Result<Unit>

    suspend fun chooseAlias(alias: String): Result<Unit>

    suspend fun createEmailAccount(email: String, password: String): Result<Unit>
}