ReactiveBayes / RxInfer.jl

Julia package for automated Bayesian inference on a factor graph with reactive message passing
MIT License
254 stars 24 forks source link

How to run rxinfer on infinite stream iteratively? #129

Closed schlichtanders closed 1 year ago

schlichtanders commented 1 year ago

Hello, thank you very much for this awesome package and that it supports infinite streams.

I would like to use it, however my setup builds on Pluto.jl where the reactivity is source of the infinite stream. Something changes, and a new data point arrives because of this.

The infinite stream example seems to fully rely on Rocket.jl for iteration.

Is there a way to make this iteration explicit? I.e. can I rewrite the rxinference call to a standard for loop on a possible infinite iterator? (I then want to use a single iteration of this for loop)

EDIT: In Pluto, a respective plot is updated by just returning it. Hence I really need to run a single iteration step independently so that I can produce the plot and return it.

bartvanerp commented 1 year ago

Hi @schlichtanders!

Good to hear that you like our package! I am interested to learn more about the issue that you are running into, such that we can improve our package for future users. Personally, I have also been using RxInfer in combination with Pluto, but have not yet experienced any issues.

I am not exactly sure about your setup and about what you want to achieve, as this is not entirely clear to me yet. Are you trying to create a setup where the datastream depends on a variable, which might get updated by the user at any moment in time (in your case in a Pluto notebook)? e.g.

# cell 1:
a = 1

# cell 2:
datastream = ... # something that depends on a

# cell 3:
rxinference(datastream= datastream, ....)

or are you trying to achieve something else?

Nonetheless, I think you might be interested in the following (advanced) examples, where we use Makie.jl to create an interactive interface:

schlichtanders commented 1 year ago

Thank you for your answer. Yes I also thought that Makie and its use of Observables pretty well fits the design of RxInfer.

However for me the usecase is really different. Think more like this

# cell 1:
update = Dates.now()

# cell 2:
# do whatever rxinference does internally to get new posteriors
posterior = rxinference_internal(prior, update)

# cell 3:
plot(posterior)

This actually makes sense, as Pluto can trigger reevaluations of cells automatically, so that cell 1 could indeed run again and again if there is a new update available

bvdmitri commented 1 year ago

@schlichtanders Perhaps, for your use case you can just use the inference function then? Your logic will be looking like

# cell 1:
update = Dates.now()

# cell 2:
# I assume in your model specification you have a `datavar` named `y`
posterior = inference(model = my_model(prior), data = (y = update, )) 

# cell 3:
plot(posterior)

or something similar.

We use this kind of pattern in almost all of our examples in the documentation, and it works perfectly inside Pluto because Pluto itself handles all reactivity. The interface to the inference function is almost identical to rxinference, with the exception that it encapsulates all the reactivity magic inside, and simply returns the result, allowing you to write an explicit for loop. Based on your original question, it seems that this is precisely what you are looking for.

See more documentation for the inference function here: https://biaslab.github.io/RxInfer.jl/stable/manuals/inference/inference/

As @bartvanerp pointed out, you can also explicitly setup the datastream object yourself an update it with the next! function from Rocket manually and use the rxinference function. The logic will be looking something like

# I assume in your model specification you have a `datavar` named `y`
subject = Subject(Int) # An observable

# Transform (or map) the subject of integers into a datastream of named tuples
# `rxinference` function expects a source of `NamedTuple`s
datastream = subject |> map(NamedTuple{(:y, ), Tuple{Int}}, (update) -> (y = update, ))

engine = rxinference(datastream = datastream, ...)

subscription_on_posterior = subscribe!(engine.posteriors[:x], (new_posterior) -> do_something_with(new_posterior))

And then you can push a new update to a subject explicitly (when a timer hits or you get a new update from an internet)

next!(subject, 1)

See more documentation here: https://biaslab.github.io/RxInfer.jl/stable/manuals/inference/rxinference/

The thing is, you must not forget to unsubscribe from subscription_on_posterior to avoid a memory leak. This kind of pattern doesn't work well in Pluto because if you have an unsubscribe!(subscription_on_posterior) somewhere in Pluto, it will be executed immediately and unsubscribe right away. If you don't unsubscribe, Pluto will create a new subscription every time something changes, resulting in multiple running inference processes in the background. Therefore, for your case, I would suggest using the inference function first.