Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.96k stars 1.84k forks source link

RxJava doOnError analog (to support intercepting errors) #1827

Closed krossovochkin closed 4 years ago

krossovochkin commented 4 years ago

In RxJava there is operator doOnError which allows to intercept exceptions without handling them.

Example of usage:

observeChanges()
    .doOnError { error -> println(error) } // error is not caught here
    .subscribe(
        { ... },
        { error -> println(error) } // error is handled here
    )

The main usage of such operator is to add logging of error somewhere in between stream chain.

In Kotlin Flow this operator seems to be missing, though there are onEach and onCompletion operators.

I propose to have onError operator which will be analog to RxJava doOnError.

We could have similar behavior with:

flow()
    .catch { error ->
        // intercept error
        throw error
    }
    ...

but it feels too verbose.

Also we could get (sometimes) similar behavior in onCompletion:

flow()
    .onCompletion { error: Throwable? ->
        if (error != null) {
            // intercept error
        }
    )
    ...

But it applies to only limited cases when we'd like to intercept errors on completion.

In general it would be good to have such operator built into standard set of operators, so the implementation is unified across all the projects (without need to implement it every time for each project). Also it feels naturally fit to onEach and onCompletion operators.

Proposed implementation is:

fun <T> Flow<T>.onError(block: (Throwable) -> Unit): Flow<T> {
    return flow {
        try {
            collect { value ->
                emit(value)
            }
        } catch (e: Exception) {
            block(e)
            throw e
        }
    }
}

Optionally it can be done as:

fun <T> Flow<T>.onError(block: (Throwable) -> Unit): Flow<T> {
    return catch { error ->
        block(error)
        throw error
    }
}
krossovochkin commented 4 years ago

Hi,

Just want to ask whether this proposal is something that can potentially fit (and some work for PR can be started)? Or it is something that is considered optional and is not likely to be added to the list of common operators?

Thank you!

elizarov commented 4 years ago

I think that Flow.onCompletion is the operator you are looking for. You can write:

flow.onCompletion { cause -> 
    if (cause != null) .... // there was an error
}

However, note that this operator is still experimental and its behavior will be slightly changed in the next major release, see #1732.

Does it work for you?

krossovochkin commented 4 years ago

Hi @elizarov

Thank you for your answer. Yes, if I create something like:

    inline fun <T> Flow<T>.doOnError(onError: (Throwable) -> Unit): Flow<T> {
        return this.onCompletion {
            if (it != null) {
                onError(it)
            }
        }
    }

Then it works as I expect (and same as in RxJava). Initially I musunderstood how onCompletion works (which led to not that correct description).

I'm not actually a fan of having to add if (cause != null) each time when doOnError needed. But I guess that something like onError operator which will have non-null exception and will be triggered when there was an error, is not considered into adding to a set of standard operators?

So, think this issue can be closed until if there will be huge need in this operator?

elizarov commented 4 years ago

Thanks. Yes, we are trying to minimize the number of built-in operators for now.

Andrew0000 commented 4 weeks ago

@krossovochkin Thank you for the extension. Looks like we also need to check for CancellationException, because the documentation says it's possible.

fun <T> Flow<T>.doOnError(onError: (Throwable) -> Unit): Flow<T> =
    this.onCompletion {
        if (it != null && it !is CancellationException) {
            onError(it)
        }
    }