Closed gotamafandy closed 5 years ago
Hello! Good question. We recently added a new module coroutines-interop
but it's not released yet. The next release will be this week. The current implementation might be a bit unstable, I will review it before release. Thanks.
hi @arkivanov, i've tried coroutinesinterop from your master branch and use it in my view model:
subject
.subscribeOn(ioScheduler)
.flatMapSingle { singleFromCoroutine { service.getUser() } }
.observeOn(mainScheduler)
on Android it runs fine, however on ios, i'm getting error:
Uncaught Kotlin exception: kotlin.native.concurrent.InvalidMutabilityException: mutation attempt of frozen kotlinx.atomicfu.AtomicRef@59ddc8
it looks like the error is caught everytime i called httpclient.get in my api class
override suspend fun execute(request: String?): Result<String> {
val client = HttpClient {
install(JsonFeature) {
serializer = KotlinxSerializer()
}
install(Logging) {
logger = Logger.DEFAULT
level = LogLevel.ALL
}
}
val httpResponse = client.get<HttpResponse> {
url {
protocol = URLProtocol.HTTPS
host = hostUrl
parameter("s", request)
parameter("apiKey", key)
}
}
val json = httpResponse.readText()
return Result.success(json)
}
Yeah, coroutines do not like to be frozen in K/N because they are single threaded ATM. This is the known transitive problem in coroutines-interop
that's why it is not stable, I'm thinking of how I can improve it.
Currently you can write the following:
subject
.observeOn(mainScheduler)
.threadLocal()
.flatMapSingle { singleFromCoroutine { service.getUser() } }
It should work given you are subscribing on the main thread.
hi @arkivanov yes its working now, i need to look more into this K/N concurrency things.
thanks and nice library, will definitely use it 👍
Thanks for the feedback! If your are working for a company, would you mind to mention its name?
hi @arkinov, upon further checking, i'm still getting this error
Uncaught Kotlin exception: kotlin.native.concurrent.InvalidMutabilityException: mutation attempt of frozen kotlinx.atomicfu.AtomicRef@25a628
after i did 2nd onnext (pull to refresh on ios)
subject
.observeOn(mainScheduler)
.threadLocal()
.flatMapSingle { singleFromCoroutine { service.getUser() } }
subject.onNext() <-- working
subject.onNext() <-- getting error
and yes, i'm from Indonesia, i run my own consultant and software company with other 2 friends :), here's our website:
thanks
Could you please post a full stack trace?
And subscription part as well please (where you are subscribing to this flatMapSingle
).
sure, ill post it first thing tomorrow
hi @arkivanov , this is the full stack
Uncaught Kotlin exception: kotlin.native.concurrent.InvalidMutabilityException: mutation attempt of frozen kotlinx.atomicfu.AtomicRef@aa59c8 at 0 Common 0x0000000104a545b5 kfun:kotlin.Exception.<init>(kotlin.String?)kotlin.Exception + 85 at 1 Common 0x0000000104a535b5 kfun:kotlin.RuntimeException.<init>(kotlin.String?)kotlin.RuntimeException + 85 at 2 Common 0x0000000104a880b5 kfun:kotlin.native.concurrent.InvalidMutabilityException.<init>(kotlin.String)kotlin.native.concurrent.InvalidMutabilityException + 85 at 3 Common 0x0000000104a88748 ThrowInvalidMutabilityException + 680 at 4 Common 0x0000000104e3bba8 MutationCheck + 104 at 5 Common 0x0000000104b37b30 kfun:kotlinx.atomicfu.AtomicRef.<set-value>(#GENERIC) + 96 at 6 Common 0x0000000104b37c68 kfun:kotlinx.atomicfu.AtomicRef.compareAndSet(#GENERIC;#GENERIC)ValueType + 264 at 7 Common 0x0000000104bc3757 kfun:kotlinx.coroutines.JobSupport.invokeOnCompletion(kotlin.Boolean;kotlin.Boolean;kotlin.Function1<kotlin.Throwable?,kotlin.Unit>)kotlinx.coroutines.DisposableHandle + 1367 at 8 Common 0x0000000104bbb9b5 kfun:kotlinx.coroutines.Job.invokeOnCompletion$default(kotlin.Boolean;kotlin.Boolean;kotlin.Function1<kotlin.Throwable?,kotlin.Unit>;kotlin.Int)kotlinx.coroutines.DisposableHandle + 469 at 9 Common 0x0000000104bcafb7 kfun:kotlinx.coroutines.JobSupport.attachChild(kotlinx.coroutines.ChildJob)kotlinx.coroutines.ChildHandle + 359 at 10 Common 0x0000000104bbddf5 kfun:kotlinx.coroutines.JobSupport.initParentJobInternal$kotlinx-coroutines-core(kotlinx.coroutines.Job?) + 373 at 11 Common 0x0000000104b9d9c9 kfun:kotlinx.coroutines.AbstractCoroutine.initParentJob$kotlinx-coroutines-core() + 297 at 12 Common 0x0000000104b9e3df kfun:kotlinx.coroutines.AbstractCoroutine.start(kotlinx.coroutines.CoroutineStart;#GENERIC;kotlin.coroutines.SuspendFunction1<#GENERIC,#GENERIC>)Generic + 127 at 13 Common 0x0000000104ba0f4d kfun:kotlinx.coroutines.launch@kotlinx.coroutines.CoroutineScope.(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.CoroutineStart;kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.CoroutineScope,kotlin.Unit>)kotlinx.coroutines.Job + 637 at 14 Common 0x0000000104ba1300 kfun:kotlinx.coroutines.launch$default@kotlinx.coroutines.CoroutineScope.(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.CoroutineStart;kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.CoroutineScope,kotlin.Unit>;kotlin.Int)kotlinx.coroutines.Job + 704 at 15 Common 0x0000000104c213a8 kfun:kotlinx.coroutines.io.launchChannel#internal + 616 at 16 Common 0x0000000104c1f678 kfun:kotlinx.coroutines.io.writer@kotlinx.coroutines.CoroutineScope.(kotlin.coroutines.CoroutineContext;kotlin.Boolean;kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.io.WriterScope,kotlin.Unit>)kotlinx.coroutines.io.WriterJob + 360 at 17 Common 0x0000000104d1e77b kfun:io.ktor.client.engine.ios.IosClientEngine.<no name provided>_3.URLSession_6#internal + 1915 at 18 Common 0x0000000104d2080b _knbridge26 + 379 at 19 CFNetwork 0x00007fff22ea3313 _CFNetworkHTTPConnectionCacheSetLimit + 206635 at 20 Foundation 0x00007fff25654583 __NSBLOCKOPERATION_IS_CALLING_OUT_TO_A_BLOCK__ + 7 at 21 Foundation 0x00007fff2565447b -[NSBlockOperation main] + 98 at 22 Foundation 0x00007fff25657408 __NSOPERATION_IS_INVOKING_MAIN__ + 17 at 23 Foundation 0x00007fff2565367b -[NSOperation start] + 731 at 24 Foundation 0x00007fff25657d62 __NSOPERATIONQUEUE_IS_STARTING_AN_OPERATION__ + 17 at 25 Foundation 0x00007fff2565786c __NSOQSchedule_f + 182 at 26 libdispatch.dylib 0x000000010607a3ff _dispatch_block_async_invoke2 + 83 at 27 libdispatch.dylib 0x000000010606bd64 _dispatch_client_callout + 8 at 28 libdispatch.dylib 0x0000000106079e1c _dispatch_main_queue_callback_4CF + 1500
This is where i implemented subscription:
private lazy var _binding: ViewModelBinding = {
let binding = ViewModelBinding()
return binding
}()
override func viewDidLoad() {
super.viewDidLoad()
binding()
...
_viewModel.inputs.start(request: "avenger")
}
// MARK - Selector
@objc func refresh() {
_viewModel.inputs.start(request: "avenger") <-- crash after this onNext
}
// MARK - Private
private func binding() {
_binding.subscribe(observable: _viewModel.outputs.result) { [weak self] result in
guard let strongSelf = self, let result = result as? DataList<Movie>, let list = result.list as? [Movie] else { return }
strongSelf._movies = list
strongSelf.collectionView.reloadData()
}
}
this is the viewmodel class
class ViewModelImpl<R, T>(useCase: UseCase<R, T>): ViewModel<R, T>, ViewModelInput<R>, ViewModelOutput<T> {
override val inputs: ViewModelInput<R> = this
override val outputs: ViewModelOutput<T> = this
override val loading: Observable<Boolean>
override val result: Observable<T>
private val mStartProperty = publishSubject<R?>()
init {
val loadingProperty = publishSubject<Boolean>()
loading = loadingProperty
result = mStartProperty
.doOnBeforeNext { loadingProperty.onNext(true) }
.subscribeOn(ioScheduler)
.threadLocal()
.flatMapSingle {
singleFromCoroutine { useCase.execute(it) }
}
.observeOn(mainScheduler)
.flatMap {
when (it) {
is Result.Success -> {
observableOf(it.response)
}
else -> observableOfEmpty()
}
}
.doOnBeforeNext { loadingProperty.onNext(false) }
}
override fun start(request: R?) {
mStartProperty.onNext(request)
}
}
and this is the code in kotlin multiplatform for ViewModelBinding
class ViewModelBinding {
private val disposables = CompositeDisposable()
fun <T> subscribe(observable: Observable<T>,
isThreadLocal: Boolean = true,
onSubscribe: ((Disposable) -> Unit)? = null,
onError: ((Throwable) -> Unit)? = null,
onComplete: (() -> Unit)? = null,
onNext: ((T) -> Unit)? = null) {
disposables.add(observable.subscribe(isThreadLocal, onSubscribe, onError, onComplete, onNext))
}
fun <T> subscribe(observable: Observable<T>, onError: ((Throwable) -> Unit)? = null, onNext: ((T) -> Unit)? = null) {
disposables.add(observable.subscribe(true, onError = onError, onNext = onNext))
}
fun <T> subscribe(observable: Observable<T>, onNext: ((T) -> Unit)? = null) {
disposables.add(observable.subscribe(true, onNext = onNext))
}
fun dispose() {
disposables.dispose()
}
}
This is the link to my example project if you want to see the full flow :) https://www.dropbox.com/s/hcl1ypjl53w1wv1/CleanArchitecture.zip?dl=0
There are two points:
subscribeOn for subjects does not make sense, it's affects only subscription phase, values will emitted on whatever thread you call onNext. So I believe it's main thread. In this particular case you can remove subscribeOn, threadLocal and observe on operators. It's ok to call ktor on main thread, it will suspend and resume when needed.
I do see some circumstances when it would be nice to execute coroutines in background. I will think of how we can improve the interop module.
hi @arkivanov, yes i've tried that before but i'm getting this exception on java and working fine on K/N
FATAL EXCEPTION: Thread-2
Process: com.adrena.helloworld, PID: 8222
java.lang.IllegalStateException: Accessing ThreadLocalStorage from another threads is prohibited. Original thread was (2, main), actual thread is (396, Thread-2).
it turned out i have to use isThreadLocal = false in java and isThreadLocal = true in K/N when subscribing, is that the correct approach? because then i have to use runOnUiThread on my android view:
runOnUiThread {
adapter.setList(result.list)
}
I have checked you project, let's try the following:
result = mStartProperty
.doOnBeforeNext { loadingProperty.onNext(true) }
.flatMapSingle {
singleFromCoroutine(context = Dispatchers.Main) { useCase.execute(it) }
}
.observeOn(mainScheduler)
.flatMap {
when (it) {
is Result.Success -> {
observableOf(it.response)
}
else -> observableOfEmpty()
}
}
.doOnBeforeNext {
loadingProperty.onNext(false)
}
The crucial part is to use main context for corotuine: singleFromCoroutine(context = Dispatchers.Main)
. It works just fine for Android, but please check how it works for iOS and let me know.
PS: you can keep isThreadLocal=true
hi @arkivanov it's crash on iOS with the following error:
kotlin.IllegalStateException: There is no event loop. Use runBlocking { ... } to start one.
i ended up using observeOn
on android view and keep isThreadLocal = true
, and now it is running on both android and ios:
private fun binding() {
mBinding.subscribe(viewModel.outputs.loading.observeOn(mainScheduler), onNext = ::loading)
mBinding.subscribe(viewModel.outputs.result.observeOn(mainScheduler), onNext = ::result)
}
Makes sense. I will try to overcome this limitation in the upcoming release. Let's keep this issue open.
Dispatchers.Main
is not supported on iOS. See https://github.com/Kotlin/kotlinx.coroutines/issues/470.
Yep, also Job and CoroutineScope can not be frozen. Almost finished with a workaround.
@gotamafandy After #221 you should be able to write in the following normal way:
result = mStartProperty
.doOnBeforeNext { loadingProperty.onNext(true) }
.flatMapSingle {
singleFromCoroutine { useCase.execute(it) }
.susbcribeOn(ioScheduler)
}
.flatMap {
when (it) {
is Result.Success -> {
observableOf(it.response)
}
else -> observableOfEmpty()
}
}
.observeOn(mainScheduler)
.doOnBeforeNext {
loadingProperty.onNext(false)
}
And keep isThreadLocal=true
in the bindings.
hi @arkivanov i've checked your latest 1.0.0 release version, i see that you did modification in singleFromCoroutine
method by removing: context: CoroutineContext = Dispatchers.Unconfined
Checking on android its working as expected, however on iOS i'm not getting http response 200 ok from ktor. I tried changing back to old one:
fun <T> singleFromCoroutine(context: CoroutineContext = Dispatchers.Unconfined, block: suspend () -> T): Single<T> =
single { emitter ->
launchCoroutine(
context = context,
onSuccess = emitter::onSuccess,
onError = emitter::onError,
block = block
)
.also(emitter::setDisposable)
}
fun <T> (suspend () -> T).asSingle(context: CoroutineContext = Dispatchers.Unconfined): Single<T> =
singleFromCoroutine(context, this)
and i'm getting 200 OK response from ktor, any thoughts?
Please check whether the coroutine is actually executed or not. Also please try version 1.0.1 and let me know the result.
@arkivanov i'm still getting the same result with ver 1.0.1, on other topic, i just published a tutorial article on medium for kotlin multiplatform + reactive using this library, here's the link if you want to check it out: https://medium.com/@fandygotama/kotlin-multiplatform-reactive-a45263e1fd7a
Could you please elaborate? Is coroutine actually executed? What about http request, does it execute?
@CherryPerry I would appreciate your help as well with this issue. Would be nice if you check coroutine-interop with ktor on iOS.
hi @arkivanov, yes the both coroutine and http request is executed, this is the log using the old launchCoroutines
method:
**Run executeHttpClient: REQUEST: https://www.omdbapi.com/?apiKey=b445ca0b&s=avenger**
HttpClient: METHOD: HttpMethod(value=GET)
HttpClient: COMMON HEADERS
HttpClient: -> Cache-Control: no-cache
HttpClient: -> Accept: application/json
HttpClient: -> Accept-Charset: UTF-8
HttpClient: CONTENT HEADERS
HttpClient: BODY Content-Type: null
HttpClient: BODY START
HttpClient: BODY END
**Got http responseHttpClient: RESPONSE: 200 OK**
as you can see, i'm able to get http response 200 OK
now changing back to version 1.0.1 by using:
implementation "com.badoo.reaktive:coroutines-interop:$reactive_version"
Run executeHttpClient: REQUEST: https://www.omdbapi.com/?apiKey=b445ca0b&s=avenger
HttpClient: METHOD: HttpMethod(value=GET)
HttpClient: COMMON HEADERS
HttpClient: -> Cache-Control: no-cache
HttpClient: -> Accept: application/json
HttpClient: -> Accept-Charset: UTF-8
HttpClient: CONTENT HEADERS
HttpClient: BODY Content-Type: null
HttpClient: BODY START
HttpClient: BODY END
The process stop there. (i will going to check on this deeper later)
This is the part of the code to get http response
@UseExperimental(UnstableDefault::class)
override suspend fun execute(request: String?): List<Movie> {
print("Run execute")
val httpResponse = client.get<HttpResponse> {
apiUrl()
parameter("s", request)
}
print("Got http response")
val json = httpResponse.readText()
val response = Json.nonstrict.parse(MoviesResponse.serializer(), json)
return mapper.transform(response)
}
This is my view model
mListProperty
.doOnBeforeNext { loadingProperty.onNext(true) }
.flatMapSingle { request ->
singleFromCoroutine { useCase.execute(request) }
}
.doOnBeforeNext {
loadingProperty.onNext(false)
}
Note: adding .subscribeOn and observeOn will cause exception on iOS
Run executeUncaught exception: kotlin.native.concurrent.InvalidMutabilityException: mutation attempt of frozen io.ktor.client.request.HttpRequestPipeline@a63488kotlin.native.concurrent.InvalidMutabilityException: mutation attempt of frozen io.ktor.client.request.HttpRequestPipeline@a63488
Thanks, we will check it as well. What I can tell now is: you should add subscribeOn and observe on, because on iOS it uses runBlocking {} which will block the UI thread. Perhaps this this why your can't get the response. Try to create HttpClient inside coroutine to avoid its freezing.
hi @arkivanov moving httpclient to suspend fun, give me a different error:
kotlin.native.IncorrectDereferenceException: Trying to access top level value not marked as @ThreadLocal or @SharedImmutable from non-main threadkotlin.native.IncorrectDereferenceException: Trying to access top level value not marked as @ThreadLocal or @SharedImmutable from non-main thread
i will take a closer look into ktor code to check it out, thanks for the help
Make sure you have observeOn(mainThread), right after flatMap
As I posted earlier, try the code as follows:
result = mStartProperty
.doOnBeforeNext { loadingProperty.onNext(true) }
.flatMapSingle {
// useCase and any objects captured by it will be frozen, create HttpClient directly in execute() function
singleFromCoroutine { useCase.execute(it) }
.susbcribeOn(ioScheduler) // This is required to avoid blocking of UI thread
}
.flatMap {
when (it) {
is Result.Success -> {
observableOf(it.response)
}
else -> observableOfEmpty()
}
}
.observeOn(mainScheduler) // This is also required because of ThreadLocal
.doOnBeforeNext {
loadingProperty.onNext(false)
}
hi @arkivanov yes, i've updated the viewmodel as well before moving httpclient to coroutines
@gotamafandy could you please provide a full stack trace of IncorrectDereferenceException
?
hi @arkivanov , this is the full stack trace
Uncaught exception: kotlin.native.IncorrectDereferenceException: Trying to access top level value not marked as @ThreadLocal or @SharedImmutable from non-main threadkotlin.native.IncorrectDereferenceException: Trying to access top level value not marked as @ThreadLocal or @SharedImmutable from non-main thread
at 0 Core 0x0000000104587a95 kfun:kotlin.Exception.<init>(kotlin.String?)kotlin.Exception + 85
at 1 Core 0x0000000104587cb5 kfun:kotlin.RuntimeException.<init>(kotlin.String?)kotlin.RuntimeException + 85
at 2 Core 0x00000001045bab65 kfun:kotlin.native.IncorrectDereferenceException.<init>(kotlin.String)kotlin.native.IncorrectDereferenceException + 85
at 3 Core 0x00000001045d84d9 ThrowIncorrectDereferenceException + 137
at 4 Core 0x000000010496c5e9 CheckIsMainThread + 25
at 5 Core 0x00000001047c67e3 kfun:io.ktor.http.<get-URL_ALPHABET>#internal + 35
at 6 Core 0x00000001047ca6a8 kfun:io.ktor.http.encodeURLParameter$<anonymous>_2#internal + 360
at 7 Core 0x00000001047cadb0 kfun:io.ktor.http.$encodeURLParameter$<anonymous>_2$FUNCTION_REFERENCE$96.invoke#internal + 128
at 8 Core 0x00000001047cae29 kfun:io.ktor.http.$encodeURLParameter$<anonymous>_2$FUNCTION_REFERENCE$96.$<bridge-UNNN>invoke(#GENERIC)#internal + 89
at 9 Core 0x00000001047c784e kfun:io.ktor.http.forEach#internal + 670
at 10 Core 0x00000001047c7323 kfun:io.ktor.http.encodeURLParameter@kotlin.String.(kotlin.Boolean)kotlin.String + 707
at 11 Core 0x00000001047e4436 kfun:io.ktor.http.formUrlEncodeTo$lambda-0#internal + 470
at 12 Core 0x00000001047e47fd kfun:io.ktor.http.$formUrlEncodeTo$lambda-0$FUNCTION_REFERENCE$101.invoke#internal + 173
at 13 Core 0x0000000104573224 kfun:kotlin.text.appendElement$stdlib@kotlin.text.Appendable.(#GENERIC;kotlin.Function1<#GENERIC,kotlin.CharSequence>?)Generic + 532
at 14 Core 0x000000010454fc54 kfun:kotlin.collections.joinTo@kotlin.collections.Iterable<#GENERIC>.(#GENERIC_kotlin.text.Appendable;kotlin.CharSequence;kotlin.CharSequence;kotlin.CharSequence;kotlin.Int;kotlin.CharSequence;kotlin.Function1<#GENERIC,kotlin.CharSequence>?)Generic + 868
at 15 Core 0x000000010455026e kfun:kotlin.collections.joinTo$default@kotlin.collections.Iterable<#GENERIC>.(#GENERIC_kotlin.text.Appendable;kotlin.CharSequence;kotlin.CharSequence;kotlin.CharSequence;kotlin.Int;kotlin.CharSequence;kotlin.Function1<#GENERIC,kotlin.CharSequence>?;kotlin.Int)Generic + 974
at 16 Core 0x00000001047e41f9 kfun:io.ktor.http.formUrlEncodeTo@kotlin.collections.List<kotlin.Pair<kotlin.String,kotlin.String?>>.(kotlin.text.Appendable) + 329
at 17 Core 0x00000001047e404b kfun:io.ktor.http.formUrlEncodeTo@io.ktor.http.Parameters.(kotlin.text.Appendable) + 2299
at 18 Core 0x00000001047f017d kfun:io.ktor.http.appendUrlFullPath$ktor-http@kotlin.text.Appendable.(kotlin.String;io.ktor.http.Parameters;kotlin.Boolean) + 605
at 19 Core 0x00000001047e8a45 kfun:io.ktor.http.URLBuilder.appendTo#internal + 1429
at 20 Core 0x00000001047e9044 kfun:io.ktor.http.URLBuilder.buildString()kotlin.String + 244
at 21 Core 0x00000001048893e9 kfun:io.ktor.client.engine.ios.IosClientEngine.execute(io.ktor.client.request.HttpRequestData)io.ktor.client.request.HttpResponseData + 3689
at 22 Core 0x0000000104809846 kfun:io.ktor.client.engine.HttpClientEngine.$install$lambda-1COROUTINE$21.invokeSuspend#internal + 1702
at 23 Core 0x000000010480a398 kfun:io.ktor.client.engine.HttpClientEngine.$install$lambda-1COROUTINE$21.invoke#internal + 312
at 24 Core 0x00000001047c28f6 kfun:io.ktor.util.pipeline.SuspendFunctionGun.loop#internal + 1126
at 25 Core 0x00000001047c1f4b kfun:io.ktor.util.pipeline.SuspendFunctionGun.proceed#internal + 395
at 26 Core 0x00000001047c23aa kfun:io.ktor.util.pipeline.SuspendFunctionGun.execute#internal + 474
at 27 Core 0x00000001047bcc59 kfun:io.ktor.util.pipeline.Pipeline.execute(#GENERIC_kotlin.Any;#GENERIC_kotlin.Any)#GENERIC_kotlin.Any + 361
at 28 Core 0x00000001048218fd kfun:io.ktor.client.features.HttpSend.DefaultSender.$executeCOROUTINE$34.invokeSuspend#internal + 1021
Alright, seems like the issue is inside ktor
(here). Those top level val
's should be annotated as @SharedImmutable
. I would suggest to report the issue to them. We will try to reproduce the crash and find a workaround.
yes, thats what i thought, havent gotten chance to check it deeper, thanks @arkivanov
We are getting exactly the same crash
did you find any workaround? i'm currently using the former approach (without observeOn/subscribeOn, and using the old singleFromCoroutine)
Not really. We ended up with expect/actual with custom simple implementations for Android and iOS. And we are not using coroutines at the moment. Still using kotlinx serialization for JSON parsing.
hi guys, this is more of a question rather than an issue, i'm trying this library to connect to API using ktor which depend on coroutines.
Let say i have http request
any suggestion how to convert into observable?
thanks guys