Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.98k stars 1.84k forks source link

Provide abstraction for cold streams #254

Closed elizarov closed 5 years ago

elizarov commented 6 years ago

All the currently provided channel abstractions in kotlinx.coroutines are hot. The data is being produced regardless of the presence of subscriber. This is good for data sources and applications that are inherently hot, like incoming network and UI-events.

However, hot streams are not an ideal solution for cases where data stream is produced on demand. Consider, for example, the following simple code that produces ReceiveChannel<Int>:

produce<Int> { 
    while (true) {
        val x = computeNextValue()
        send(x)
    } 
}

One obvious downside is the computeNextValue() is invoked before send, so even when receiver is not ready, the next value gets computed. Of course, it gets suspended in send if there is no receiver, but it is not as lazy as you get with cold reactive Publisher/Observable/Flowable/Flux/Flow.

We need the abstraction for cold streams in kotlinx.coroutines that is going to be just as lazy, computing data in "push" mode versus "pull" mode of hot channels that we have now.

There are the following related discussions:

elizarov commented 6 years ago

Let me add here that we are looking at an asynchronous analogue of a standard Kotlin's Sequence here. We already can write the following code in Kotlin:

val s = buildSequence<Int> { 
    while (true) {
        val x = computeNextValue()
        yield(x)
    } 
}

and this code is perfectly lazy in the sense that computeNextValue is not invoked until sequence gets to be iterated and only actually requested values are ever compute. However, we cannot do arbitrary suspension from inside of buildSequence. We cannot delay, we cannot do asynchronous network requests, etc.

pull-vert commented 6 years ago

Hi, I was writing a kotlinx-coroutine reactive (=cold) library based on reactive-streams. But following Elizarov advice on #201 I switched to a pure kotlin suspend version, strongly inspired by your StreamBenchmarks project. Here is the library : Reactivity

It provides 2 cold producers (Reactor inspiration) :

As they are reactive objects, they start producing items only when a client calls a Terminal (final/consuming) operation function.

Reactivity is a multiplatform project, with common and platform specific tests and extensions. Providing for example platform specific Solo.toPromise in JS, Solo.toCompletableFuture or Stream.toMulti in JVM (in JDK8 project). There are only a few operators right now (map, filter, delay) but they can be easily added as they are very simple.

I would be really happy if Reactivity can save time, or serve as source of inspiration for this issue

ZakTaccardi commented 6 years ago

The biggest issue with Channel<T> in my experience is that .openSubscription() has to be called before operators can be applied. I personally really liked LiveData<T>'s approach.

It shipped with LiveData<T>, MutableLiveData<T>, and MediatorLiveData<T>, and two transformations, .map() and .switchMap{ }.

Implementing custom operators is easy, just add an extension function.

Unfortunately, LiveData<T> falls completely flat because it's not thread safe and highly coupled to the main thread.

Ultimately, my use case is that I need a lightweight RxJava for scenarios where I can't use RxJava but want a reactive-state based architecture, and I am hoping co-routines can solve this problem.

elizarov commented 6 years ago

@ScottPierce We can open a separate issue about LinkedListBroadcastChannel (that's a broadcast channel implementation with UNLIMITED capacity) if that helps you. What's your specific use-case about it? Go ahead, create a separate issue and describe your use-case for it.

elizarov commented 6 years ago

@pull-vert I'm not a big fan of Single-like abstraction. Solo is great name, btw, but the if we include it in the library, then we'll have three slightly different ways to asynchronously perform an operation that returns a value:

They are all different ways, but do we really need an abstraction for that last one? Consider this definition:

typealias Solo<T> = suspend () -> T

Isn't this functional type the Solo we are looking for? Do we really need to give it some other name like Solo? We can always declare all the extensions we might need directly on top of that suspend () -> T functional type.

Let me also quote Eric Meijer's tweet here: https://twitter.com/headinthebox/status/971492821151580160

JakeWharton commented 6 years ago

I'd prefer a real type for a few reasons.

The typealias trick can go really far for lots of abstractions but that doesn't always mean it's appropriate. Would you do the same for a multi-value source in suspend (Consumer<T>) -> Unit or would it get a proper type?

These types can and will leak into Java and I'd much prefer a semantically named type than an arbitrary complex function type that only the Kotlin metadata can disambiguate. Ignoring the fact that there's a strong possibility these types will be partially usable from Java, the ability to do simple things like instanceof and isAssignableFrom checks against a real type will help this abstract to be wired into libraries like Retrofit, JAX RS, etc. Even just having them able to be received by Java code and converted into something else (like an RxJava type) goes a long way.

A Solo<T> is a specialization of a Many<T> (or whatever you want to call it) and thus should be polymorphic with respect to it. It's really convenient being able to return a Solo<T> from a call to a Many<T>.flatMap( T -> Many<T> ) function without requiring overloads, for example.

Eric's tweet does not make sense. Single is a factory of a coroutine/async+awaitable. Your typealias is evidence of this. And since the context of the tweet was Java, when we rasterize the typealias into a proper type (as the Java language requires) and desugar coroutines into their underlying form of callbacks (as the Java language requires) we're left with the exact shape of Rx's single so they're actually exactly the same thing.

elizarov commented 6 years ago

I question the very need of Single/Solo. The typealias digression was just to demonstrate what this concept actually does. I question the very use-cases for Single/Solo. Why would you ever use it? Why would you need a dedicated type to denote a reference to a computation that reruns every time you ask for it? Why would you need this level of indirection? Your code will be much easier to understand if you use suspending functions directly.

To illustrate. Instead of:

interface A { fun doSomething(): Single<T> } 

write

interface A { suspend fun doSomething(): T }

Instead of:

fun foo() = // return Single<R>
    doSomething().map { it.transform() }

do

suspend fun foo() = // return R
    doSomething().transform()  

you can continue this example with flatMap, etc, etc. The code that does not use Single/Solo will be invariably more direct and easier to understand.

On the other hand, we need a dedicated abstraction for asynchronous streams of values simply because there is no other way to represent it in Kotlin, but via a separate type.

JakeWharton commented 6 years ago

Fair point. I tend to think in a world where strong interop is a must. For the core coroutines library I'd be happy with a single abstraction on a multi-value suspending source.

On Thu, Mar 15, 2018 at 9:59 AM Roman Elizarov notifications@github.com wrote:

I question the very need of Single/Solo. The typealias digression was just to demonstrate what this concept actually does. I question the very use-cases for Single/Solo. Why would you ever use it? Why would you need a dedicated type to denote a reference to a computation that reruns every time you ask for it. Why would you need this level of indirection? Your code will be much easier to understand if you use suspending functions directly.

To illustrate. Instead of:

interface A { fun doSomething(): Single }

write

interface A { suspend fun doSomething(): T }

Instead of:

fun foo() = // return Single doSomething().map { it.transform() }

do

suspend fun foo() = // return R doSomething().transform()

you can continue this example with flatMap, etc, etc. The code that does not use Single/Solo will be invariably more direct and easier to understand.

On the other hand, we need a dedicated abstraction for asynchronous streams of values simply because there is no other way to represent it in Kotlin, but via a separate type.

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/Kotlin/kotlinx.coroutines/issues/254#issuecomment-373385015, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEEETtozZbVXUPMhCWfONpMXxbvREgyks5tenPQgaJpZM4SNhnC .

akarnokd commented 6 years ago

Isn't this

suspend fun foo() = // return R
    doSomething().transform()  

eagerly executing doSomething and you call a method on the returned value T?

I'd think that given a suspend () -> T, you'd want to create a new function suspend () -> R that when invoked, calls the previous suspend function only then, transform its results and returns that as the value R, right?

JakeWharton commented 6 years ago

Yes, but you wouldn't invoke the method until you needed the value (i.e., when you'd otherwise subscribe). When it's suspend functions all the way down you don't need the abstraction because you can just call the suspending function over and over.

pull-vert commented 6 years ago

I like the Solo = Single value cold emmitter because it can provide specific Operators, and some intermediate (= cold/not consuming) Operators on Multi can return a Solo :

fun <E> Solo<E>.merge(vararg others: Solo<E>): Multi<E>

inline fun <E, R> Multi<E>.reduce(initial: R, crossinline operation: (acc: R, E) -> R):): Solo<R>

fun <E> Multi<E>.first(): Solo<E>

See https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html to see some Operators of Flux that return Mono.

After that some Terminal (final/consuming) Solo specific extensions I mentioned before are useful :

fun <T> Solo<T>.toPromise(): Promise<T>

fun <E> Solo<E>.toCompletableFuture(
  coroutineContext: CoroutineContext = DefaultDispatcher
): CompletableFuture<E>
JakeWharton commented 6 years ago

The point above is that operators on a single async value and operators which narrow a multi-async stream into a single value can be simple, imperative suspending functions.

Conversion to a Promise or CF is straightforward when all you have is suspending functions.

The most interesting case is multiple singles to a multi. It's conceptually easy (we already do concatMap all the time often without realizing it) but I'm curious how terse it can be made syntactically.

On Thu, Mar 15, 2018, 7:29 PM pull-vert notifications@github.com wrote:

I like the Solo = Single value cold emmitter because it can provide specific Operators, and some intermediate (= cold/not consuming) Operators on Multi can return a Solo :

fun Solo.merge(vararg others: Solo): Multi inline fun <E, R> Multi.reduce(initial: R, crossinline operation: (acc: R, E) -> R):): Solo fun Multi.first(): Solo

See https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html to see some Operators of Flux that return Mono.

After that some Terminal (final/consuming) Solo specific extensions I mentioned before are useful :

fun Solo.toPromise(): Promise fun Solo.toCompletableFuture( coroutineContext: CoroutineContext = DefaultDispatcher ): CompletableFuture

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/Kotlin/kotlinx.coroutines/issues/254#issuecomment-373555008, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEEES6Bbp6kpWjmTomRcUT-CFrM2w8vks5tevlYgaJpZM4SNhnC .

fvasco commented 6 years ago

Hi, I tried to picture some code to understand better the final shape.

I played with Solo, using a type alias can lead to some interesting collateral effects.

typealias AsyncCallable<T> = suspend () -> T

fun <E> AsyncCallable<E>.toCompletableFuture(
        coroutineContext: CoroutineContext = DefaultDispatcher
): CompletableFuture<E> = future(coroutineContext) { invoke() }

fun main(vararg args: String) = runBlocking {
    val deferred = async { "Hello" }
    val deferredFuture = deferred::await.toCompletableFuture()
}

Then I tried to implement the concatMap, but I don't know if it is possible to do better.

interface AsyncSource<E> {
    suspend fun consumeEach(consumer: suspend (E) -> Unit)
}

fun <E> buildAsyncSource(builder: suspend AsyncSourceBuilder<E>.() -> Unit): AsyncSource<E> = TODO()

interface AsyncSourceBuilder<E> {
    suspend fun yield(item: E)
    suspend fun yieldAll(items: AsyncSource<E>) = items.consumeEach { yield(it) }
}

suspend fun <E> AsyncSource<E>.concatMap(block: suspend (E) -> AsyncSource<E>): AsyncSource<E> =
        buildAsyncSource {
            this@concatMap.consumeEach { item ->
                yieldAll(block(item))
            }
        }
elizarov commented 6 years ago

@pull-vert In kotlinx.coroutines terminal operations on a cold reactive streams (let's call it Multi for this example) that you mention will have the following signatures:

inline suspend fun <E, R> Multi<E>.reduce(initial: R, crossinline operation: (acc: R, E) -> R):): R
suspend fun <E> Multi<E>.first(): E

@JakeWharton I don't think we need any kind of "multiple singles to a multi" (merge) at all. Where can those "multiple singles" can come from? They can appear in a situation like this:

suspend fun listUsers(): List<User>
suspend fun getUserProfile(userId: UserId): UserProfile

Here we want to get a list of users, then get a profile for each user. In reactive Java world we'd define getUserProfile(user: UserId): Single<UserProfile> and we'd have to use some kind of merging operation on those Single types. With coroutines all we need is a plain map:

val profiles: List<UserProfile> = listUsers().map { getUserProfile(it.userId) } 

Note, that sometimes you'd want to perform this map "in parallel" (but be careful that unlimited parallelism can exhaust your local resources and/or DOS your server unless you take appropriate precautions). We have separate issues about that (see #172 and #180).

pull-vert commented 6 years ago

@elizarov I understand and agree your example for terminal/consuming Operations. But for me the "first" Operator must be Intermediate, that will suspend and send the Single first value of a Multi cold Source but should not be terminal/consuming.

I would prefer the resulting Solo/Single value to be cold and not consuming, to be able to chain Operators until the first terminal/consuming Operator (can have multiple consumers, each will receive all elements).

For exemple a stupid Reactive Operator chain

Multi
                .fromValues("one", "two", "three")
                .filter { it.startsWith("t") } // Intermediate cold = returns Multi with "two" and "three"
                .first() // Intermediate cold = returns a Mono with "two"
                .concatMap { it.splitAndDelay(15)} // Intermediate cold = returns a Multi with "t", "w" and "o" (adding a 15ms delay between each send)
                .consumeEach { println(it)} // terminal/consuming operation printing "t", "w" then "o"
elizarov commented 6 years ago

@pull-vert In a world of channels and cold streams we might have the following function defined in some API:

suspend fun subscribeToSomeStrings(): ReceiveChannel<String> 

It performs an operation to subscribe to the data stream (establishing network connection, etc) and returns a channel of strings flowing from the server. It is hot. There is no much reason to defer the actual subscription until the terminal operation. It is better to "fail-fast" (return an error immediately) if we have a problem establishing communication channel to the server.

The actual processing of this hot data stream would use cold channels in intermediate steps, like this:

subscribeToSomeStrings() // establishes connection to datasource & returns a hot stream
    .filter { it.startWith("t" } // intermediate - returns cold stream (not coroutine to filter it yet)
    .first() // terminal, starts filtering incoming stream, returns a String
    .split() // a regular operator on String, no need to wrap this operator
    .delayEach(15) // intermediate - returns cold stream with delays between each items
    .consumeEach { ... } // terminal

There is no much need for special "glue" operators that work on Single/Solo. You can apply regular functions (like String.split()) directly on the values that you have. Interestingly, if we replace the first line with a function that returns List<String> instead of ReceiveChannel<String>, then the code will compile and continue to work. It would just become more eager in processing the data.

And that is whole beauty. You can take a whole synchronous data processing pipeline with intermediate operators like filter, map and terminal operators like first and introduce asynchrony into it (mabe make a source asynchronous or maybe introduce some asynchronous request inside one of the map or filter operators) and it would continue to work without you having to change a single line of code. IDE will helpfully highlight with suspension marks in the gutter what pieces of your pipeline have become asynchronous.

pull-vert commented 6 years ago

@elizarov Thanks for this clarification I now understand your vision for introducing cold Intermediate Operators to Channels, not using new Coroutine for each Operator with a lot better performance than actual.

I am still a bit confused by this chaining of mixed hot and cold Operators, it will require good documentation to be sure in what state (hot instantly usable or cold that will require terminal consuming operator) we are at every step of the chain. I wonder how this will interact with existing pure cold reactive libraries (rxjs, rxjava, reactor, java 9 Flow...), for example if we want to use kotlinx-coroutines between a cold Observable rxjava reactive Couchbase driver as Input and return a cold Mono as Spring 5 output ?

If I understand your example delayEach or map cold intermediate Operator will have to be declared as Extension on kotlinx ReceiveChannel, java 8 Stream, kotlin Sequence, kotlin Collection, kotlin Array, maybe more.

elizarov commented 6 years ago

@pull-vert The mental model is simple. There are no cold operators. You just "setup your pipeline" (that is what cold operators do) that is later executed by the terminal operator. Since every chain of operators ultimately ends in a terminal operator, you don't really care what happens in between. It just works.

With respect to the kind of operators we need to provide, we have huge advantage over reactive libraries because we only need to define basic ones like map and filter. For example, there is no need to have ready-to-use delayEach(time) in the library, because it can be trivially implemented with .map { delay(time); it }

pull-vert commented 6 years ago

@elizarov Thanks again now I see and understand exactly the direction for this cold stream operators topic for kotlinx-coroutines.

I will continue enrich my pure cold reactive library for fun (strongly based on kotlinx-coroutines), and of course follow what is going on here!

elizarov commented 6 years ago

Let me give you an update on the current concept that is on the table for the cold streams to solicit feedback.

The plan is to introduce an interface (tentatively called) Source<T> that represents an asynchronous source of elements of type T that can be consumed in a "push" fashion (you specify a suspending function to be sequentially invoked on each element). The contract for the Source interface would not specify how many times it can be consumed, so it seems logical that there will be different implementations:

The later, however, is the main problem of this design. If some business function is declared like this:

fun requestElementsFromServer(): Source<Element>

then is not clear from its signature what requestElementsFromServer actually does. Does it actually perform an operation and returns a channel (which is resource that must be consumed to release its resources) or does it return a cold source that makes a request to the server each time it is consumed?

Now, if the function was declared like this:

fun requestElementsFromServer2(): ReceiveChannel<Element>

Then it would have been perfectly clear that requestElementsFromServer2 opens a communication channel with the server over the network and returns it, so that it is now responsibility of the caller to consume it or to close it.

The worst incarnation of this problem is that if you now do requestElementsFromServer2().filter { ... }, then the type of the result is Source<Element> and the fact it it must be consumed or cancelled is completely lost.

This seems to derail the idea of unification of channels and cold sources to avoid duplication of filter, map operators. To salvage this unification we need to adjust this plan somehow:

sdeleuze commented 6 years ago

Thanks for the detailed feedback.

I am not sure using Source<Element> + extensions for cold stream would very nice from an API perspective. It does not sound "first class" support to me.

As you said, it is not clear from a signature returning Source<Element> what the function actually does.

Also I tend to consider using extensions for filter and map could be considered as an overuse of extensions. Extensions are awesome, but I don't think they should be used to provide such basic functionality (even if IDEA does a good job to propose them, but Kotlin is not just IDEA, especially in the long term). I think I would prefer if filter and map would be part of a regular class or interface, with extensions only used to add additional capabilities if needed.

While I agree cold streams don't need to be closed, I am not sure it should be an empty implementation. Isn't it possible to cancel a cold stream after getting just a few elements?

So even if that seems not in the 2 solutions you proposed, why not just introducing a type dedicated for cold streams (that could implement Source<T> as well) that would implement map and filter with the right return type? From a signature perspective, the behavior would be explicit, that would avoid the need of a full extension based API, and ReceiveChannel, BroadcastChannel and this new type design would be more consistent.

jcornaz commented 6 years ago

Also I tend to consider using extensions for filter and map could be considered as an overuse of extensions

This is a bit a side topic, but i'd like to say that extensions function cannot really be overuse IMO. If something can be written as an extension function then do it. It will in most cases (if not always) provide better separation of concerns, decoupling and help to fulfill many SOLID principles like interface segregation principle, and single responsibility principle. I "overuse" them for two years now an I'm not able to see any drawback, and see only benefits. The only limitation is when you could end up with many extensions functions with clashing signature.

Having filter, map and all other operators part of the interface would make the interface big, break the "interface segregation principle" and it'll be painful for the implementors.

Or declare that all sources are resource-like and must be consumed or cancelled and add an empty implementation of cancel operation for cold sources that result from source { ... } coroutine builder.

I would go for that personally. It's not a burden for the client code to ensure the source is consumed (thanks to onCompletion = consumes(), consume and consumeEach).

fvasco commented 6 years ago

I wish to provide an example use case to explicit my concerns.

I want to provide an API to parse a file. I use a non blocking file access, so coroutine works well. I parse next block only when it is effectively requested, so it is a cold stream. I keep the file open while reading, so I have to close it. It is possible to consume this source to dispose it, but it is really expensive.

What I should use?

pull-vert commented 6 years ago

i agree with @elizarov and @jcornaz about map, filter and other operators as extension functions, leaving base API pure and simple.

I also agree with @sdeleuze : Cold extensions always returning Source is a problem for me. I can understand that internal implementation of Source.map and Source.filter return a new Source Object, but IMO when user will use ReceiveChannel.map, it must return a ReceiveChannel. Same the new Cold Channel.

ZakTaccardi commented 6 years ago

Does writing operators for an interface (like ReceiveChannel<T>) create limitations compared to writing operators over a class (like Observable<T>)? I'd imagine the class can provide some nice encapsulation benefits with access to privately scoped variables.

How does implementing more complex operators like .switchMap work when the operators are written over extension functions on an interface? Is it even possible?

ZakTaccardi commented 6 years ago

then is not clear from its signature what requestElementsFromServer actually does. Does it actually perform an operation and returns a channel (which is resource that must be consumed to release its resources) or does it return a cold source that makes a request to the server each time it is consumed?

The issue is that a stream being hot or cold would not be known by simply looking at the type? I don't feel that was ever a problem with Rx (in fact, I feel that this abstraction is an advantage of Rx), so why would it be a problem here?

JakeWharton commented 6 years ago

Does writing operators for an interface (like ReceiveChannel<T>) create limitations compared to writing operators over a class (like Observable<T>)? I'd imagine the class can provide some nice encapsulation benefits with access to privately scoped variables.

Your operator is free to be a class. All of the RxJava operators are classes which deal with the ObservableSource interface and not Observable. The latter only exists because of limitations of the Java language.

ZakTaccardi commented 6 years ago

Your operator is free to be a class.

I wasn't referring to the operator being a class, but to ReceiveChannel<T> being a class and not an interface.

All of the RxJava operators are classes which deal with the ObservableSource interface and not Observable.

👍 . My confusion here stems from .switchMap not existing in the official coroutines lib and not knowing how to implement it myself. Assuming it's possible with the current API, then my complaint about classes vs interfaces is invalid and doesn't add to this discussion.

hannesstruss commented 6 years ago

It is possible to write ReceiveChannel<T>.switchMap using an extension: https://gist.github.com/hannesstruss/927ec8120d7cb312d80685f230d50c6e

(don't use this, it's probably broken, but the general idea works)

elizarov commented 6 years ago

@fvasco

I want to provide an API to parse a file. I use a non blocking file access, so coroutine works well. I parse next block only when it is effectively requested, so it is a cold stream. I keep the file open while reading, so I have to close it. It is possible to consume this source to dispose it, but it is really expensive. What I should use?

If you return a cold stream from your file-parsing API, then you should open the file only when there is a consumer. You should be able to write the corresponding file-parsing code like this:

fun parseFile(file: File): Source<Data> = source {
    file.bufferedReader().use { 
        while (true) {
            val line = it.readLine() ? break
            send(parseLineToData(line))
        }
    } 
}

In this case you get a properly cold source. Invoking parseFile(f) does nothing and its result can be safely ignored (no need to close it to release resources).

elizarov commented 6 years ago

@pull-vert

I can understand that internal implementation of Source.map and Source.filter return a new Source > Object, but IMO when user will use ReceiveChannel.map, it must return a ReceiveChannel.

Returning a new channel on each map, filter, etc on another channel is extremely expensive. Channels are expensive multi-threaded abstractions. They should not be overused.

One of explicit design goals of this proposal is to make sure that the result of ReceiveChannel.map is NOT a channel. But what it is? It is some kind of source that is in-between hot and cold. Just like a resource it must be consumed or closed (or otherwise the coroutine sending data on the other side is lost forever), but it is NOT a channel. It it a weaker abstraction than a channel, since you cannot consume it from multiple coroutines in a fan-out fashion. You can convert it to a channel, but you don't have to if all you plan to do is just to consume it once.

fvasco commented 6 years ago

Hi @elizarov, you are right, I wish to explain better my concerns adding a bit of code:

val data = parseFile(dataFile) .first { it.id == requiredDataId }

I suppose that we should consider a mechanism to dispose the source in the first function.

Reading your example lead me to reconsider:

source { .... } coroutine builder returns a cold implementation of the Source. The producer coroutine is started for each consumer. It is not resource and need not be closed.

elizarov commented 6 years ago

@sdeleuze

As you said, it is not clear from a signature returning Source what the function actually does.

I indeed consider it as a weak point, yet people in Rx/Reactive world are actually used to that. They represent both hot and cold data sources by the same type in the type-system and they used to manually managing resources of hot sources if needed.

Also I tend to consider using extensions for filter and map could be considered as an overuse of extensions. Extensions are awesome, but I don't think they should be used to provide such basic functionality (even if IDEA does a good job to propose them, but Kotlin is not just IDEA, especially in the long term). I think I would prefer if filter and map would be part of a regular class or interface, with extensions only used to add additional capabilities if needed.

Interface methods don't scale. We cannot and don't want to provide every conceivable operator that one might need, so we should design about the fact that other people are going to add their own extensions and those extensions should naturally fit just if they were provided by in the core library.

While I agree cold streams don't need to be closed, I am not sure it should be an empty implementation. Isn't it possible to cancel a cold stream after getting just a few elements?

Cancelling a cold after receiving a few elements would indeed be possible. The current idea that you'll have a dedicated cancel() function in scope of consumeEach operator for that purpose. In this case, the producer block in source { ... } will get a cancellation exception and will run its finally sections to close all the resources it was using.

So even if that seems not in the 2 solutions you proposed, why not just introducing a type dedicated for cold streams (that could implement Source as well) that would implement map and filter with the right return type? From a signature perspective, the behavior would be explicit, that would avoid the need of a full extension based API, and ReceiveChannel, BroadcastChannel and this new type design would be more consistent.

We can introduce a dedicated ColdSource<T> : Source<T> and define that source { ... } returns a ColdSource<T>. But what would filter and map be declared to return? It cannot be ColdSource, since you can apply filter to a ReceiveChannel and the result is not really cold....

elizarov commented 6 years ago

@fvasco

I suppose that we should consider a mechanism to dispose the source in the first function.

The first function will cause the send inside the source { .... } block to throw a cancellation exception, thus exit and close a file.

elizarov commented 6 years ago

@ZakTaccardi

The issue is that a stream being hot or cold would not be known by simply looking at the type? I don't feel that was ever a problem with Rx (in fact, I feel that this abstraction is an advantage of Rx), so why would it be a problem here?

In Rx the whole design is usually centered around lazy cold streams and lazy cold values (singles), so you only rarely get to interact with hot sources that are backed by something that needs to be closed.

However, we plan to integrate this new cold streams with channels (that do not really exist in Rx world) and that is why we might get this problem.

akarnokd commented 6 years ago

Generally it doesn't matter in Rx if a source is hot or cold as applying most operators will establish a chain of cold sources anyway. For example, given this setup:

PublishSubject<Integer> subject = PublishSubject.create();

Observable<Integer> observable = subject.filter(v -> v % 2 == 0).map(v -> v + 1);

subject.onNext(1);
subject.onNext(2);

none of the user functions get executed without subscribing to observable and the two onNexts get lost as subject doesn't have a consumer subscribed to it at all.

In a sense, you can know in Rx if a source is hot if the type name includes Subject or Processor, otherwise it's cold/doesn't matter. Hot sources are things that exist outside of your local context anyway and they run/execute likely beyond your control.

So I'm not sure why you'd want to differentiate between them in the type system in a way that somehow retains the hotness property over applied operators that don't change temperature (I can't imagine such an operator btw at the moment). Distinction based on cardinality does work (i.e., Single), trying to caputure the exception type does not (everything ends up as Observable<T, Throwable> after combinations).

elizarov commented 6 years ago

@akarnokd Thanks for the useful perspective on Rx.

They chief reason that I'm asking this question is that coroutines provide an easy way to create a hot sources (that is outside of your control). The hot source with coroutines that is doing some kind of channel.send is being suspended until somebody receives (subscribes to) on the other side, as opposed to being lost with subject.onNext. However, this creates a problem that now you've effectively created a resource that must be consumed. There does not seem to a problem like that in the Rx world, so the absence of the corresponding distinction is quite natural for Rx.

jcornaz commented 6 years ago

However, this creates a problem that now you've effectively created a resource that must be consumed. There does not seem to a problem like that in the Rx world, so the absence of the corresponding distinction is quite natural for Rx.

In the Rx world subscriptions may be (and should be) disposed when done using it. Doing so will release underling resources if any.

Except the terminology ("cancel" for channel and "dispose" for observable), I don't see the difference...

akarnokd commented 6 years ago

is being suspended until somebody receives (subscribes to) on the other side

So practically you want to handle the cases where

  1. don't generate/send data until there is somebody to consume it and
  2. allow to generate/send data but then drop/latest/buffer some or all of the data until there is somebody to consume it.

The first case is a form of deferred (lazy) execution which you can emulate by flatMapping over a consumer object stream. So the input of such generator is a queue of consumers which consumer take a queue of items. The generator is enumerating on this queue-of-consumers and once one arrives, it creates the queue-of-items, calls the consumer with it and then starts a producer routine which then feeds the queue-of-items:

Queue<Consumer<Queue<T>>> consumerQueue = ...

// generator
consumerQueue.forEachAsync(consumer -> {
    Queue<T> itemQueue = ...

    consumer.accept(itemQueue);
    async {
        for (T item : someItems) {
            itemQueue.send(item);
        }
    }
});

consumerQueue.send(queue -> {
   for (T item : queue) {
       whatEver(item);
   }
});

For the second case, I believe you have to actually design the queue itself to not suspend on the send() side but suspend on the poll() side. Also for efficiency, you may want to design different queues for different retention modes. In addition, the queue could check if there was ever a poll() call and if so, allow suspension on the send() side, depending on the requirements of course.

elizarov commented 6 years ago

@jcornaz In the Rx world subscriptions may be (and should be) disposed when done using it. Doing so will release underling resources if any.

Same here. Subscriptions are always resources. The conundrum we are having here revolves around the source/observables/streams themselves. A cold stream is typically a purely garbage-collected entity and does not hold any resources until subscribed to. However, a hot stream (channel) can be backed by an actual resource. Imagine a network server that accepts a connection from a client, represents it as a hot stream of bytes and passes it down to your code for processing. Now you have to ultimately close it even if you decide not to subscribe to it.

elizarov commented 6 years ago

@akarnokd The primary use-case we have in mind is a "connected application" that has incoming and outgoing network connections. You can model on outgoing connection (client sockets) with a cold stream (that actually opens connection on subscription), but incoming connections (servers sockets) are inherently resources and they are more naturally modelled with hot streams (channels). If you program a connected application list that on top of an abstraction of channels, then lots of things like data flow control (backpressure) become easy. You don't usually need this "flatMapping over a consumer object stream", since a typical relation between a producer and consumer in such an application is one-to-one.

Now, the design challenge we have, if you let me rephrase it, is how to reuse implementation of all the operators like map, filter, etc both for those hot streams (channels) and for fully lazy Rx-like cold streams alike, given the fact that hot streams must be closed even if they are never subscribed to.

jmfayard commented 6 years ago

Would it perhaps work to annotate the Source type to both

fun parseFile(file: File): @Cold Source<Data> =   source {  
    file.bufferedReader().use { 
         //...
    } 
}

fun postComment(comment: String) : @Hot @MustBeClosed Source<NetworkResult> {
      return api.networkCall().whatEver()
  }
JakeWharton commented 6 years ago

RxJava didn't split hot vs. cold in 2.x.

Maybe, Single, and Completable are just T?, T, and Unit suspend functions. No need for types.

On Sun, Aug 5, 2018, 2:20 PM Scott Pierce notifications@github.com wrote:

@elizarov https://github.com/elizarov A common type to bridge between hot / cold abstractions seems like the wrong way to go. That sounds like RxJava1 Observable, which was problematic. I understand this isn't exactly an apples to apples comparison, but it was a significant improvement when RxJava2 separated it's hot and cold sources into distinct types. I understand that this would be potentially more code duplication, but from a pure api / usability perspective, distinct types is the way to go in my opinion.

You mentioned above:

I'm not a big fan of Single-like abstraction

My team loves Singles, and also Completables, and even found some uses for Maybes. We used them heavily in our applications and found them to be an improvement over when we used only Observables everywhere. Our feeling is that the library would be incomplete without these.

Will you guys be providing something similar to Subject as well? That would also be useful.

Channels seem somewhat similar to Processors (although I've never actually found a use for one). Is there a plan to add something that would fill the role of Flowable?

— You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub https://github.com/Kotlin/kotlinx.coroutines/issues/254#issuecomment-410538121, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEEEXc_UxKnMwEUYOPn6tuVkj15AlZ6ks5uNzdWgaJpZM4SNhnC .

SolomonSun2010 commented 6 years ago

unification of channels and cold sources to avoid duplication of filter, map operators

I'd like to be based on Clojure 1.7 Transducers. Java 8 have some transducers, called Collectors,such as Comparator.comparing, java.util.stream.Collectors.mapping、reducing、groupingBy、partitioningBy、summing 、Collectors.flatMapping(in Java 9) etc. So a Java 8 Collector is a "reducing function",it's supplier fn is arity-0, it's accumulator fn is arity-2,it's arity-1 fn is finisher or completion. And Stream.collect() is similar to Clojure into function. see : https://github.com/matthiasn/talk-transcripts/blob/master/Hickey_Rich/InsideTransducers.md

Kotlinx channels,kotlin Sequence, kotlin Collection, kotlin Array, maybe more, could provide a extension collect(Collector) in Java https://docs.oracle.com/javase/10/docs/api/java/util/stream/Stream.html#collect(java.util.stream.Collector) Hence, in Source, we could write (reuse filtering, mapping etc.):

val evens = Source.of(1,2,3,4).collect(Collectors.filtering(it %2 == 0) ,Collectors.toSet())

Transducers / Collectors are composable algorithmic transformations, separate the algorithm and the seq-like things. https://clojure.org/reference/transducers

jcornaz commented 6 years ago

Kotlinx channels,kotlin Sequence, kotlin Collection, kotlin Array, maybe more, could provide a extension collect(Collector) in Java

Besides the fact it is not related to the topic of cold-stream IMO, a generic colector is not necessary, because we can write such terminal operators as extension functions. And actually, many are already provided by the standard library (for Sequence and Iterable) and in kotlinx.coroutines (for ReceiveChannel)

@SolomonSun2010, here's your example rewritten using Sequence:


 // here `toSet()` is equivalent to `collect(Collectors.toSet())`, but in a more readable fashion.
val evens = sequenceOf(1, 2, 3, 4).filter { it % 2 == 0 }.toSet()

Are also available reduce, fold, groupBy, partition and others. So most of the time, you already have the needed collector. And if one wants to define a custom one, he can simply write it as an extension function.

SolomonSun2010 commented 6 years ago

thanks, I mean, separate the algorithm and the seq-like things, write these operators only in one place, such as in Collectors , eliminate duplication in all seq-like things. Perhaps, some advanced type system (@raulraja : https://github.com/Kotlin/KEEP/pull/87 ) could eliminate duplication , or refactor technique in Scala 2.13: https://scala-lang.org/blog/2018/06/13/scala-213-collections.html

sdeleuze commented 6 years ago

@elizarov I guess you are super busy with the upcoming 1.3 release, but I would be curious to know your plans about that (IMO) key issue. Do you plan to fix it before Coroutines 1.0? After? Isn't there a risk that it could imply major changes in other part of Coroutines design?

elizarov commented 6 years ago

@sdeleuze

I guess you are super busy with the upcoming 1.3 release, but I would be curious to know your plans about that (IMO) key issue. Do you plan to fix it before Coroutines 1.0? After? Isn't there a risk that it could imply major changes in other part of Coroutines design?

This will go after 1.0 release. It will be one of the first things we'll be working on. We've ruled out all the risk to the core abstractions (like jobs and etc) that we'll be finalizing in 1.0, but it will definitely affect Channel APIs, so for 1.0 release we'll label all the parts of Channel APIs that will be changing as a result of introducing lazy streams as Experimental, which would mean "you can use it, but note that it will change in one of the upcoming major updates".

sdeleuze commented 6 years ago

FYI we have scheduled official Coroutines support in Spring Framework for upcoming 5.2 release expected first half of next year, and a cold stream abstraction seems mandatory for our use case since we will have to translate Flux cold stream to Coroutines.

So any early progress after 1.0 on that topic would be appreciated in order to allow us to expose the right APIs. For now Coroutines support incubates in Spring Fu.

angusholder commented 5 years ago

@jcornaz

Also I tend to consider using extensions for filter and map could be considered as an overuse of extensions

This is a bit a side topic, but i'd like to say that extensions function cannot really be overuse IMO. If something can be written as an extension function then do it. It will in most cases (if not always) provide better separation of concerns, decoupling and help to fulfill many SOLID principles like interface segregation principle, and single responsibility principle. I "overuse" them for two years now an I'm not able to see any drawback, and see only benefits. The only limitation is when you could end up with many extensions functions with clashing signature.

Having filter, map and all other operators part of the interface would make the interface big, break the "interface segregation principle" and it'll be painful for the implementors.

Something Rust does in lots of its standard library traits (basically interfaces) is implement operators as default methods. One extra thing that allows is for a default operator implementation to be overridden when you know it can be implemented more efficiently for the specific type. It also still allows for adding operators while maintaining backwards compatibility.

So in Rust's Iterator<T> type, the only method you have to implement is next(), which provides the next element of the iterator, and by default all other methods are implemented in terms of that method. But any concrete type that implements Iterator<T> could also choose to manually implement any other operators when it makes sense, for example if the type supports random access, then skip(5) can be overridden like self.offset += 5 instead of for i in 0..5 { next(); }.

One reason I wanted to suggest default methods instead of extension functions is that I find extension functions hurt discoverability. A problem I've encountered with Okio is that they've deprecated lots of its static methods in favour of extension functions, but when using ByteArray.toByteString() instead of ByteString.of() you have to know what you're looking for, the extension function isn't available unless ByteString.Companion.* is imported in scope.