Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.99k stars 1.85k forks source link

Unable to access Repository's suspending function scope launched from ViewModel #1618

Open adam-hurwitz opened 4 years ago

adam-hurwitz commented 4 years ago

Overview

Expected

Observed

Implementation

ViewModel

  1. The ViewModel uses viewModelScope to launch getContentList().
  2. getContentList() is a suspending function that calls the Repository with another suspending function getMainFeedList().
class ContentViewModel : ViewModel() {
    fun processEvent(...) {
        ...
        viewModelScope.launch {
            _feedViewState.value = getContentList(...)
        }
        ...
    }

   suspend private fun getContentList(...): LiveData<PagedList<Content>> =
       switchMap(getMainFeedList(isRealtime, timeframe)) { lce ->
           // Do something with result.
       }
}

Repository

  1. getMainFeedList() is a suspending function that uses withContext(Dispatchers.Default) in order to get the coroutine scope.
  2. getMainFeedList() returns LiveData with the result from a Firebase Firestore collection request, contentEnCollection.get().addOnCompleteListener.
  3. The Firestore result is saved to a Room DB with insertContentList(), from within the nested suspending coroutine launch { ... }. This suspending coroutine function is not working as the main feed of the app is empty on load.
object ContentRepository {
    suspend fun getMainFeedList(...): MutableLiveData<Lce<PagedListResult>> = withContext(Dispatchers.Default) {
        MutableLiveData<Lce<PagedListResult>>().also { lce ->
            val newContentList = arrayListOf<Content?>()
            contentEnCollection.get().addOnCompleteListener {
                arrayListOf<Content?>().also { contentList ->
                    it.result!!.documents.all { document ->
                        contentList.add(document.toObject(Content::class.java))
                        true
                    }
                    newContentList.addAll(contentList)
                }
                launch {
                    try {
                        database.contentDao().insertContentList(newContentList)
                    } catch (e: Exception) {
                        this.cancel()
                    }
                }.invokeOnCompletion { throwable ->
                    if (throwable == null)
                        lce.postValue(...)
                    else // Log Room error here.
                }
            }.addOnFailureListener {
                // Log Firestore error here.
                lce.postValue(...)
            }
        }
    }
}

Dao

@Dao
interface ContentDao {
    @Insert(onConflict = OnConflictStrategy.REPLACE)
    suspend fun insertContentList(users: ArrayList<Content?>)
}

Attempted Solutions

Attempt #3 works as the main feed loads with data. However, manually launching a coroutine is not ideal.

  1. Initiating suspending coroutine scope outside of the asynchronous Firestore API call contentEnCollection.get() and using that scope inside the Firebase call.
object ContentRepository {
    suspend fun getMainFeedList(...): MutableLiveData<Lce<PagedListResult>> = withContext(Dispatchers.Default) {
        val scope: CoroutineScope = this
        MutableLiveData<Lce<PagedListResult>>().also { lce ->
            ...
                scope.launch { ... }
        }
}
  1. Launch coroutine with CoroutineScope() and use the suspending function's coroutine scope context.
object ContentRepository {
    suspend fun getMainFeedList(...): MutableLiveData<Lce<PagedListResult>> = withContext(Dispatchers.Default) {
        val scope: CoroutineScope = this
        MutableLiveData<Lce<PagedListResult>>().also { lce ->
            ...
              CoroutineScope(scope.coroutineContext).launch { ... }
        }
}
  1. Launch new coroutine without the suspending function's scope and manage cancelling the Job if there is an error.
CoroutineScope(Dispatchers.Default).launch {
    try {
        database.contentDao().insertContentList(newContentList)
    } catch (e: Exception) {
        this.cancel()
    }
}.invokeOnCompletion { throwable ->
    if (throwable == null)
        // Do more things.
    else
       // Log Room error.
}
zach-klippenstein commented 4 years ago

I'm not sure I understand exactly what the problem is – your insertContentList call is not being executed, or your MutableLiveData.postValue is never being executed?

I've got a couple questions about the rest of this code too:

  1. Why is your return type a MutableLiveData instead of LiveData? Do your callers also need to post their own values? If not, you can just use th liveData coroutine builder.
  2. Why are you adding values to contentList and then copying to newContentList, instead of just adding directly to newContentList?
  3. Why use invokeOnCompletion at all, you can just post the value or log errors from your launch body and catch body directly.
  4. This would be easier to read if you turned contentEnCollection into a suspend function or Flow first, so you can eliminate some callback nesting.
  5. It looks like getMainFeedList is only suspending to access the scope. In general, functions that return asynchronous types (like LiveData) should not be suspending, since they presumably don't actually suspend the caller since their return value is already asynchronous. This function would be more idiomatic if you just passed the scope in as a parameter.
adam-hurwitz commented 4 years ago

It is now working as expected. Thanks @zach-klippenstein!

Questions

Addressing the suggestions above:

  1. liveData coroutine builder - The MutableLiveData return type has been replaced with a liveData coroutine builder. Using the also extension function, the inner value is assigned to the name lce. Then, lce has a value emited inside the nested Firebase function.
fun getMainFeedList(scope: CoroutineScope, ...) = liveData<Lce<PagedListResult>> {
    this.also { lce ->
        contentEnCollection.orderBy(TIMESTAMP, DESCENDING)
                        .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe)
                        .get()
                        .addOnCompleteListener {
                            arrayListOf<Content?>().also { contentList ->
                                it.result!!.documents.all { document ->
                                    contentList.add(document.toObject(Content::class.java))
                                    true
                                }
                                scope.launch {
                                    database.contentDao().insertContentList(contentList)
                                }
                                scope.launch {
                                    lce.emit(Lce.Content(PagedListResult(...)))
                                }
                        }
  1. Firebse results - Saving the Firebase results have been simplified.

  2. Remove invokeOnCompletion - This has been removed.

  3. Refactor contentCollection into suspend function - Would this still be recommended after implementing item 5.? The scope is now passed in as a parameter to the getMainFeedList method. The Room functions are launched from a Coroutine given the passed in scope.

  4. Pass in scope as a parameter - This fixed the issue with insertContentList not being executed as expected.

object ContentRepository {
    suspend fun getMainFeedList(scope: CoroutineScope, ...) =  liveData<Lce<PagedListResult>> {
        this.also { lce ->    
            contentEnCollection.get().addOnCompleteListener {
                arrayListOf<Content?>().also { contentList ->
                    it.result!!.documents.all { document ->
                        contentList.add(document.toObject(Content::class.java))
                        true
                    }
                    scope.launch {
                        database.contentDao().insertContentList(contentList)
                    }
                    ...
               }
            }
       }
    }
}

I'm not sure I understand exactly what the problem is – your insertContentList call is not being executed, or your MutableLiveData.postValue is never being executed?

@zach-klippenstein, Both. insertContentList was not being executed. Because insertContentList was not saving data to the database, there was no data to be observed from the query. This has been fixed with your recommendation item 5. above.

adam-hurwitz commented 4 years ago

For item 5, is it a bad practice to pass Coroutinescope into a Repository the same way Context should not be passed into a ViewModel or Repository?

I described the method above @zach-klippenstein, of passing CoroutineScope as a parameter to the Repository, to someone on the Kotlin integration team at Google this past week during the Android Developer Summit. They confirmed passing scope is acceptable/safe.

Why use invokeOnCompletion...

Taking a look at the documentation for invokeOnCompletion, it lets you know when the job launched has finished.

  • Registers handler that is synchronously invoked once on completion of this job.
  • When the job is already complete, then the handler is immediately invoked
  • with the job's exception or cancellation cause or null. Otherwise, the handler will be invoked once when this
  • job is complete.
  • The meaning of cause that is passed to the handler:
    • Cause is null when the job has completed normally.

For this use case it is used to know when the data has been finished writing to the database so that the said data can be queried and passed down to the ViewModel.

scope.launch {
    database.contentDao().insertContentList(...)
}.invokeOnCompletion {
    if (it?.cause == null)
        scope.launch {
            lce.emit(...)
        }
}
zach-klippenstein commented 4 years ago

Sorry for the delayed response, I was on vacation.

Passing CoroutineScope to a function that returns some sort of asynchronous type is idiomatic – e.g. Flow.launchIn, etc. The return value indicates "this function will start some asynchronous work, return immediately, and communicate back to you via the returned handle" (in this case a LiveData). The scope parameter indicates where and how to execute that asynchronous work. Contrast to a suspend function, which indicates "this function will perform some long-running work in the current scope and won't return until it's finished". This is why passing scope into a suspending function is confusing.

    this.also { lce ->

This is just a style preference, but I usually think this makes code harder to read – just do val lce = this. It's much more idiomatic inside JetBrains-written Kotlin code, and saves you a level of nesting. Same with arrayListOf<>().let { … – I would just use a regular variable assignment. But again, just a style preference.

it.result!!.documents.all { document ->
  contentList.add(document.toObject(Content::class.java))
  true
}

Unless I'm missing something here, this should just be a map.

scope.launch {
  lce.emit(Lce.Content(PagedListResult(...)))
}

Why are you launching a coroutine just to call emit? It's not a suspending function, nothing in that call looks like it's suspending. Just call it directly.

        contentEnCollection.orderBy(TIMESTAMP, DESCENDING)
                        .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe)
                        .get()
                        .addOnCompleteListener {

You could write an extension function (if one doesn't exist already) to turn .get().addOnCompleteListener { … } into something like .awaitGet() (or .await()), which would get that callback out of there, potentially handle cancellation, errors, and save you another level of nesting. In other words, coroutines are syntactic sugar for writing cleaner callback-based asynchronous code, so it doesn't really make sense to have coroutines that also use async callbacks directly.

A simple implementation might look like this, where QueryResult is whatever type whereGreaterThanOrEqualTo returns:

suspend fun <T> QueryResult<T>.await(): T = suspendCancellableCoroutine { continuation ->
  get().addOnCompleteListener { result ->
    continuation.resume(result)
  }
}

A production-ready implementation should also handle errors and cancellation.


So with all this, you'd end up with something like:

fun getMainFeedList(scope: CoroutineScope, ...) =
  liveData<Lce<PagedListResult>>(scope.coroutineContext) {
    val lceLiveData = this
    val results = contentEnCollection.orderBy(TIMESTAMP, DESCENDING)
        .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe)
        .await()
    val contentList = results.result!!.documents.map { document ->
      document.toObject(Content::class.java)
    }
    launch {
      database.contentDao()
          .insertContentList(contentList)
    }
    lceLiveData.emit(Lce.Content(PagedListResult(...)))
  }
adam-hurwitz commented 4 years ago

Extension function to remove callbacks

Thanks for sharing the suspendCancellableCoroutine pattern @zach-klippenstein! I used this to build a custom extension for Firebase's realtime Firestore calls.

For Firebase's Firestore database there are two types of calls.

  1. One time requests - addOnCompleteListener
  2. Realtime updates - addSnapshotListener

One time requests

For one time requests there is an await extension function provided by the library org.jetbrains.kotlinx:kotlinx-coroutines-play-services:X.X.X. The function returns results from addOnCompleteListener.

Resources

Realtime updates

The extension function awaitRealtime has checks including verifying the state of the continuation in order to see whether it is in isActive state. This is important because the function is called when the user's main feed of content is updated either by a lifecycle event, refreshing the feed manually, or removing content from their feed. Without this check there will be a crash.

ExtenstionFuction.kt

data class QueryResponse(val packet: QuerySnapshot?, val error: FirebaseFirestoreException?)

suspend fun Query.awaitRealtime() = suspendCancellableCoroutine<QueryResponse> { continuation ->
    addSnapshotListener({ value, error ->
        if (error == null && continuation.isActive)
            continuation.resume(QueryResponse(value, null))
        else if (error != null && continuation.isActive)
            continuation.resume(QueryResponse(null, error))
    })
}

In order to handle errors the try/catch pattern is used.

Repository.kt

object ContentRepository {
    fun getMainFeedList(isRealtime: Boolean, timeframe: Timestamp) = flow<Lce<PagedListResult>> {
        emit(Loading())
        val labeledSet = HashSet<String>()
        val user = usersDocument.collection(getInstance().currentUser!!.uid)
        syncLabeledContent(user, timeframe, labeledSet, SAVE_COLLECTION, this)
        getLoggedInNonRealtimeContent(timeframe, labeledSet, this)        
    }
    // Realtime updates with 'awaitRealtime' used
    private suspend fun syncLabeledContent(user: CollectionReference, timeframe: Timestamp,
                                       labeledSet: HashSet<String>, collection: String,
                                       lce: FlowCollector<Lce<PagedListResult>>) {
        val response = user.document(COLLECTIONS_DOCUMENT)
            .collection(collection)
            .orderBy(TIMESTAMP, DESCENDING)
            .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe)
            .awaitRealtime()
        if (response.error == null) {
            val contentList = response.packet?.documentChanges?.map { doc ->
                doc.document.toObject(Content::class.java).also { content ->
                    labeledSet.add(content.id)
                }
            }
            database.contentDao().insertContentList(contentList)
        } else lce.emit(Error(PagedListResult(null,
            "Error retrieving user save_collection: ${response.error?.localizedMessage}")))
    }
    // One time updates with 'await' used
    private suspend fun getLoggedInNonRealtimeContent(timeframe: Timestamp,
                                                      labeledSet: HashSet<String>,
                                                      lce: FlowCollector<Lce<PagedListResult>>) =
            try {
                database.contentDao().insertContentList(
                        contentEnCollection.orderBy(TIMESTAMP, DESCENDING)
                                .whereGreaterThanOrEqualTo(TIMESTAMP, timeframe).get().await()
                                .documentChanges
                                ?.map { change -> change.document.toObject(Content::class.java) }
                                ?.filter { content -> !labeledSet.contains(content.id) })
                lce.emit(Lce.Content(PagedListResult(queryMainContentList(timeframe), "")))
            } catch (error: FirebaseFirestoreException) {
                lce.emit(Error(PagedListResult(
                        null,
                        CONTENT_LOGGED_IN_NON_REALTIME_ERROR + "${error.localizedMessage}")))
            }
}

Style changes

I've refactored the recommended style changes.

  1. val lce = this
  2. Refactoring ArrayList extension function
  3. Refactoring all to map
val contentList = ArrayList<Content?>()
value!!.documentChanges.map { document ->
    document.document.toObject(Content::class.java).let { dismissedContent ->
        contentList.add(dismissedContent)
        labeledSet.add(dismissedContent.id)
    }
}
insertContentListToDb(scope, contentList)