Autodesk / coroutineworker

Kotlin Coroutine-based workers for native
Apache License 2.0
370 stars 24 forks source link

Ktor-client inside CoroutineWorker.execute - Uncaught Kotlin exception: kotlin.native.IncorrectDereferenceException: Trying to access top level value not marked as @ThreadLocal or @SharedImmutable from non-main thread #39

Closed khairilushan closed 4 years ago

khairilushan commented 4 years ago

I just watched the talk that @benasher44 gave at droidCon NYC about Concurrency in Kotlin/Native and decided to try this CoroutineWorker library.
So what I am trying to do is consuming some backend REST API using Ktor inside CoroutineWorker.execute. Basically, I am still not really sure if I am doing this correct but here is the code:

So I have a NetworkProvider class:

internal class NetworkProvider<T : Endpoint>(private val engine: HttpClientEngine) {

    private val client: HttpClient by lazy {
        HttpClient(engine) {
            install(JsonFeature) {
                serializer = KotlinxSerializer()
            }
        }
    }

    @UnstableDefault
    internal suspend fun <R> request(endpoint: T, strategy: DeserializationStrategy<R>): R {

        val response: String = client.request {
            method = endpoint.method
            url.apply {
                protocol = URLProtocol.HTTPS
                host = endpoint.baseUrl

                val query = endpoint.parameters
                    .map { "${it.key}=${it.value}" }
                    .joinToString("&")

                encodedPath = if (query.isNotEmpty()) {
                    "${endpoint.path}?$query"
                } else {
                    endpoint.path
                }
            }
        }

        return Json.nonstrict.parse(strategy, response)
    }

}

And then I use this NetworkProvider inside my Repository class like this:

internal class MovieRepositoryImpl(
    private val provider: NetworkProvider<MovieEndpoint>
) : MovieRepository {

    override fun getMovieList(
        year: Int,
        page: Int,
        sort: String,
        completion: (Result<MovieListEntity>) -> Unit
    ) {

        CoroutineWorker.execute {

            val result = try {
                val response = provider.request(
                    endpoint = MovieListEndpoint(year, page, sort),
                    strategy = MovieListEntity.serializer()
                )
                Result.Success(response)
            } catch (error: Throwable) {
                Result.Failure(error)
            }

            CoroutineWorker.withContext(MainDispatcher) {
                completion(result)
            }
        }
    }

}

But it's end up gave me this error messages:

Uncaught Kotlin exception: kotlin.native.IncorrectDereferenceException: Trying to access top level value not marked as @ThreadLocal or @SharedImmutable from non-main thread
        at 0   MovieCore                           0x0000000109964fe7 kfun:kotlin.Throwable.<init>(kotlin.String?)kotlin.Throwable + 87
        at 1   MovieCore                           0x000000010995e355 kfun:kotlin.Exception.<init>(kotlin.String?)kotlin.Exception + 85
        at 2   MovieCore                           0x000000010995df15 kfun:kotlin.RuntimeException.<init>(kotlin.String?)kotlin.RuntimeException + 85
        at 3   MovieCore                           0x0000000109990c95 kfun:kotlin.native.IncorrectDereferenceException.<init>(kotlin.String)kotlin.native.IncorrectDereferenceException + 85
        at 4   MovieCore                           0x00000001099add29 ThrowIncorrectDereferenceException + 137
        at 5   MovieCore                           0x0000000109cd3799 CheckIsMainThread + 25
        at 6   MovieCore                           0x0000000109c5a893 kfun:utils.<get-MainDispatcher>$core()kotlinx.coroutines.CoroutineDispatcher + 35
        at 7   MovieCore                           0x0000000109c3f7e4 kfun:repository.MovieRepositoryImpl.$getMovieList$lambda-1COROUTINE$1.invokeSuspend#internal + 1892
        at 8   MovieCore                           0x0000000109c40020 kfun:repository.MovieRepositoryImpl.$getMovieList$lambda-1COROUTINE$1.invoke#internal + 256
        at 9   MovieCore                           0x0000000109b069f3 kfun:kotlinx.coroutines.intrinsics.startUndispatchedOrReturn$kotlinx-coroutines-core@kotlinx.coroutines.internal.ScopeCoroutine<#GENERIC>.(#GENERIC;kotlin.coroutines.SuspendFunction1<#GENERIC,#GENERIC>)Generic + 899
        at 10  MovieCore                           0x0000000109ab11bd kfun:kotlinx.coroutines.coroutineScope(kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.CoroutineScope,#GENERIC>)Generic + 429
        at 11  MovieCore                           0x0000000109c37f70 kfun:com.autodesk.coroutineworker.CoroutineWorker.Companion.WorkItem.$<init>$lambda-1COROUTINE$11.invokeSuspend#internal + 1520
        at 12  MovieCore                           0x0000000109c386a0 kfun:com.autodesk.coroutineworker.CoroutineWorker.Companion.WorkItem.$<init>$lambda-1COROUTINE$11.invoke#internal + 256
        at 13  MovieCore                           0x0000000109c33226 kfun:com.autodesk.coroutineworker.BackgroundCoroutineWorkQueueExecutor.$processWorkItems$lambda-0COROUTINE$7.invokeSuspend#internal + 726
        at 14  MovieCore                           0x0000000109986618 kfun:kotlin.coroutines.native.internal.BaseContinuationImpl.resumeWith(kotlin.Result<kotlin.Any?>) + 712
        at 15  MovieCore                           0x0000000109afb06c kfun:kotlinx.coroutines.DispatchedTask.run() + 2732
        at 16  MovieCore                           0x0000000109ab606d kfun:kotlinx.coroutines.EventLoopImplBase.processNextEvent()ValueType + 813
        at 17  MovieCore                           0x0000000109b097c6 kfun:kotlinx.coroutines.BlockingCoroutine.joinBlocking#internal + 1958
        at 18  MovieCore                           0x0000000109b0886e kfun:kotlinx.coroutines.runBlocking(kotlin.coroutines.CoroutineContext;kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.CoroutineScope,#GENERIC>)Generic + 1246
        at 19  MovieCore                           0x0000000109b08de4 kfun:kotlinx.coroutines.runBlocking$default(kotlin.coroutines.CoroutineContext?;kotlin.coroutines.SuspendFunction1<kotlinx.coroutines.CoroutineScope,#GENERIC>;kotlin.Int)Generic + 372
        at 20  MovieCore                           0x0000000109c32da9 kfun:com.autodesk.coroutineworker.BackgroundCoroutineWorkQueueExecutor.enqueueWork$<anonymous>_3#internal + 217
        at 21  MovieCore                           0x0000000109c340fb kfun:com.autodesk.coroutineworker.BackgroundCoroutineWorkQueueExecutor.$enqueueWork$<anonymous>_3$FUNCTION_REFERENCE$7.invoke#internal + 59
        at 22  MovieCore                           0x0000000109c3415b kfun:com.autodesk.coroutineworker.BackgroundCoroutineWorkQueueExecutor.$enqueueWork$<anonymous>_3$FUNCTION_REFERENCE$7.$<bridge-UNN>invoke()#internal + 59
        at 23  MovieCore                           0x0000000109c3cca4 kfun:com.autodesk.coroutineworker.WorkerPool.performWork$lambda-1#internal + 308
        at 24  MovieCore                           0x0000000109c3d0df kfun:com.autodesk.coroutineworker.WorkerPool.$performWork$lambda-1$FUNCTION_REFERENCE$17.invoke#internal + 63
        at 25  MovieCore                           0x0000000109c3d13b kfun:com.autodesk.coroutineworker.WorkerPool.$performWork$lambda-1$FUNCTION_REFERENCE$17.$<bridge-UNN>invoke()#internal + 59
        at 26  MovieCore                           0x00000001099927c1 WorkerLaunchpad + 177
        at 27  MovieCore                           0x0000000109cd8069 _ZN6Worker19processQueueElementEb + 2569
        at 28  MovieCore                           0x0000000109cd8616 _ZN12_GLOBAL__N_113workerRoutineEPv + 54
        at 29  libsystem_pthread.dylib             0x00007fff51c04d36 _pthread_start + 125

Anyone know how to work around this error message?

benasher44 commented 4 years ago

In general, ktor is not "freeze safe." You have to be careful to only touch ktor objects on the threads where you created them. There are sort of 2 versions of improving this that can be done with different degrees of effort.

Easy-ish

As long as there aren't globals that have to be mutated, they should all be marked as @SharedImmutable. This ensures clients that do wish to do multi-threaded won't run into issues with Ktor objects they create touching internal global state throwing exceptions like this in native.

It looks like ktor already takes some care to do this to globals, so this particular issue is probably just a bug/mistake.

Harder (requires more refactoring by ktor devs)

Make sure all ktor objects are frozen after init and use frozen structures. This will ensure native consumers have no issues using ktor from multiple threads. This is probably more of a medium/long term fix, and it'd require more care/maintenance/strategy from the core devs

khairilushan commented 4 years ago

Thank you so much for your explanation. From my code above, which part should I annotate as @SharedImmutable ?

benasher44 commented 4 years ago

Ah sorry so I misread this in the slack DM and confused myself. I thought you were originally asking me to comment on a ktor issue, but now I realize this is coroutineworker.

There are a few issues going on here:

  1. The crash is because accessing MainDispatcher from a background thread ends up calling some function called CheckIsMainThread, which touches some internal global state that was originally initialized on some other thread and wasn't marked as @SharedImmutable, so it throws an exception.

  2. withContext doesn't actually use the dispatcher arg on native in this library. It just switches to another thread in the thread pool. It only uses it on JVM, which I did to facilitate using Dispatcher.IO on JVM.

If you can tell me more about where MainDispatcher comes from and how's its implemented, I might be able to help more.

khairilushan commented 4 years ago

Here is my actual Dispatchers for both JVM and Native.

JVM

internal actual val ApplicationDispatcher: CoroutineDispatcher = Dispatchers.IO

internal actual val MainDispatcher: CoroutineDispatcher = Dispatchers.Main

Native

internal actual val MainDispatcher: CoroutineDispatcher = NsQueueDispatcher(
    dispatch_get_main_queue()
)

internal actual val ApplicationDispatcher: CoroutineDispatcher = NsQueueDispatcher(
    dispatch_get_main_queue()
)

internal class NsQueueDispatcher(
    private val dispatchQueue: dispatch_queue_t
) : CoroutineDispatcher() {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        dispatch_async(dispatchQueue) {
            block.run()
        }
    }
}
benasher44 commented 4 years ago

Those two dispatcher vals need to be marked as @SharedImmutable.

CoroutineWorker.withContext ignores the context arg on native though. For this use case, it'd prob be easier to instead just make an expect fun called performOnMain or something like that which uses the MainDispatcher on JVM and on iOS just calls out to dispatch_get_main_queue directly.

khairilushan commented 4 years ago

OK. I tried to mark the expect declaration of Dispatchers as @SharedImmutable but still get the same error as above.

And I also try to create performOnMainThread function as you mentioned. I create it as an extension of CoroutineScope so I can access coroutine builder in JVM side. But i end up with another error .

Here is what I did:

expect fun CoroutineScope.performOnMainThread(block: () -> Unit)

// JVM
actual fun CoroutineScope.performOnMainThread(block: () -> Unit) {

    launch(MainDispatcher) {
        block()
    }
}

// Native
actual fun CoroutineScope.performOnMainThread(block: () -> Unit) {

    dispatch_async(dispatch_get_main_queue()) {
        block()
    }
}

The way I use it inside the repository

   override fun getMovieList(
            year: Int,
            page: Int,
            sort: String,
            completion: (Result<MovieListEntity>) -> Unit
    ) {

        CoroutineWorker.execute {

            val result = try {
                val response = provider.request(
                        endpoint = MovieListEndpoint(year, page, sort),
                        strategy = MovieListEntity.serializer()
                )
                Result.Success(response)
            } catch (error: Throwable) {
                Result.Failure(error)
            }

            performOnMainThread {
                completion(result)
            }
        }
    }

Here is the error message:

Uncaught Kotlin exception: kotlin.native.IncorrectDereferenceException: illegal attempt to access non-shared utils.$performOnMainThread$lambda-0$FUNCTION_REFERENCE$2@2d6ca48 from other thread
        at 0   MovieCore                           0x0000000104b8e007 kfun:kotlin.Throwable.<init>(kotlin.String?)kotlin.Throwable + 87
        at 1   MovieCore                           0x0000000104b87375 kfun:kotlin.Exception.<init>(kotlin.String?)kotlin.Exception + 85
        at 2   MovieCore                           0x0000000104b86f35 kfun:kotlin.RuntimeException.<init>(kotlin.String?)kotlin.RuntimeException + 85
        at 3   MovieCore                           0x0000000104bb9cb5 kfun:kotlin.native.IncorrectDereferenceException.<init>(kotlin.String)kotlin.native.IncorrectDereferenceException + 85
        at 4   MovieCore                           0x0000000104bbbf48 ThrowIllegalObjectSharingException + 744
        at 5   MovieCore                           0x0000000104ed25b0 _ZNK16KRefSharedHolder3refEv + 80
        at 6   MovieCore                           0x0000000104ef170a Kotlin_Interop_unwrapKotlinObjectHolder + 42
        at 7   MovieCore                           0x0000000104e78687 _knbridge15 + 135
        at 8   libdispatch.dylib                   0x0000000105581dd4 _dispatch_call_block_and_release + 12
        at 9   libdispatch.dylib                   0x0000000105582d48 _dispatch_client_callout + 8
        at 10  libdispatch.dylib                   0x0000000105590de6 _dispatch_main_queue_callback_4CF + 1500
        at 11  CoreFoundation                      0x00007fff23bd4049 __CFRUNLOOP_IS_SERVICING_THE_MAIN_DISPATCH_QUEUE__ + 9
        at 12  CoreFoundation                      0x00007fff23bceca9 __CFRunLoopRun + 2329
        at 13  CoreFoundation                      0x00007fff23bce066 CFRunLoopRunSpecific + 438
        at 14  GraphicsServices                    0x00007fff384c0bb0 GSEventRunModal + 65
        at 15  UIKitCore                           0x00007fff48092d4d UIApplicationMain + 1621
        at 16  Movie App                           0x000000010480fc7b main + 75 (/Users/khairilushan/Development/Android/KU-Area/Kotlin/MovieMPP/iOS/Movie App/Movie App/AppDelegate.swift:12:7)
        at 17  libdyld.dylib                       0x00007fff5227ec25 start + 1
        at 18  ???                                 0x0000000000000001 0x0 + 1
benasher44 commented 4 years ago

You'll need to freeze the lambda that you pass to the main queue before you pass it on native (calling freeze on it on the first line of performOnMainThread should do it).

khairilushan commented 4 years ago

Still end up with the same error. Here is my Native implementation of performOnMainThread.

actual fun CoroutineScope.performOnMainThread(block: () -> Unit) {

    val frozenBlock = block.freeze()

    dispatch_async(dispatch_get_main_queue()) {
        frozenBlock()
    }
}
benasher44 commented 4 years ago

That code still technically creates a lambda on one thread, and then it's used on another. Try dispatch_async(dispatch_get_main_queue(), frozenBlock)

khairilushan commented 4 years ago

I see.... now it's different error

kotlin.native.concurrent.InvalidMutabilityException: mutation attempt of frozen io.ktor.client.request.HttpRequestPipeline@2b47308
benasher44 commented 4 years ago

Yep and now you've hit an error in ktor. As I mentioned earlier, it's not freeze-safe, so you'll need to just not reference ktor from other threads and not freeze it to ensure it can freely mutate internals

benasher44 commented 4 years ago

Since there's no follow up to do with CoroutineWorker, I'm going to close this. Hopefully this helped understand a bit more what's going on, and further discussion / resolution should probably be redirect to the ktor and Kotlin-native channels in Kotlin slack.

Thanks for using CoroutineWorker! If you disagree with my assessment, feel free to reopen

khairilushan commented 4 years ago

Thank you so much fo patiently guiding me. I will read more resources about all this, for now.

benasher44 commented 4 years ago

No problem sounds good! Best of luck :)