ReactiveBayes / Rocket.jl

Functional reactive programming extensions library for Julia
https://reactivebayes.github.io/Rocket.jl/
MIT License
184 stars 19 forks source link

Is there a way to wait explicitly for the next value, like `take!` is doing it on Channels #56

Open schlichtanders opened 1 month ago

schlichtanders commented 1 month ago

Hi there,

it would be nice to make take! also work on Rocket.jl observables. Something like the following

function take!(datastream::Subscribable)
    c = Channel(1)
    subscribe(datastream, lambda(on_next=d -> put!(c, d) )
    result = take!(c)
    close(c)
    result
end

Not sure, whether the performance can be improved of this. Hence the wish to have it as part of Rocket.jl itself, so that really the most performant implementation is easily available to everyone.

bvdmitri commented 1 month ago

Hey @schlichtanders!

Is there a way to wait explicitly for the next value

reactive programming is non-blocking by design, so better to avoid this as much as possible. For testing purposes you can use the sync. It combines nicely with the take operator as follows

julia> actor = sync(logger());

julia> stream = from([ 1, 2, 3 ]) |> delay(5000) |> take(1)
ProxyObservable(Int64, TakeProxy())

julia> subscribe!(stream, actor)
DelaySubscription()

julia> wait(actor) # block until failure or completion, `|> take(1)` forces to complete after the first value
[LogActor] Data: 1
[LogActor] Completed
SyncActor[]
schlichtanders commented 1 month ago

hi @bvdmitri ;-) what is the disadvantage of blocking? In julia blocking is not really expensive, or?

I am still figuring out what is the best way to combine Rocket.jl with Pluto's reactivity. I guess something like a take! on subscribable or a transformation to a Channel would be easiest. But maybe there is some more efficient low level way to switch the tasks respectively...

bvdmitri commented 1 month ago

what is the disadvantage of blocking?

Nothing inherently wrong, but It goes against reactive programming principles and the reactive API is designed to be non-blocking (it's not the best article though). It's usually better to avoid it as much as possible, but I do understand that sometimes it might be useful.