org-arl / fjage

Framework for Java and Groovy Agents
https://fjage.readthedocs.io/en/latest/
Other
25 stars 13 forks source link

"Freshness" constraint on the `receive` APIs #288

Open notthetup opened 8 months ago

notthetup commented 8 months ago

The various Agent/Gatway receive APIs have a default semantic, that if a message which matches the filter is already in the receive queue, then it's returned immediately.

While this is a great default in light of all the async and co-operative multi-threading-based processing that is done in Fjåge, in certain situations it may lead to requiring lots of complicated code to be able to receive a new message that matches a given filter.

For example, a common use case could be to send a Req to an Agent and wait for a Ntf which the Agent sends out after some internal processing. Just doing a agentid.receive(MyNtf) may give a stale notification which might have been in the receive queue.. Doing a agent.queue.flush() (supported in some Gateways) is 1. not a standard and 2. can cause other behaviours or parts of the Agent to lose some messages.

So the question is do we need some mechanism to simply tell thereceive API call that we are only interested in the newer message received since we invoked the API.

@ettersi tagging you since you have brought this up a few times.

ettersi commented 8 months ago

@ettersi tagging you since you have brought this up a few times.

Thanks for bringing this up, @notthetup!

For context, off the top of my head I remember this particular instance where message staleness was an issue.

In an older version of the SWIS UI, we used FileListChangeNtfs to monitor whether a preview creation request has completed. I assume this must have worked fine at some point, but by the time I started getting involved in SWIS, whatever it was that cleared the FileListChangeNtf queue before we started listening for preview completion must have been removed and instead we had lots of stale FileListChangeNtfs which triggered the preview completion monitor prematurely.

Providing a convenient API for listening for notifications that came in only after the receive was submitted would have made this bug somewhat easier to fix, but it wouldn't necessarily have prevented it because for that the original programmer would have had to be very careful to use exactly the right API and if they got it wrong then the feedback would have come in only after they had moved on to making other changes. Therefore, while I agree that there's some discussion to be had around triggering on stale messages, to me the much bigger issue is that the Fjåge message queue represents global mutable state.

mchitre commented 8 months ago

Without any API change, one could easily filter based on sentAt field, if the platforms can be assumed to be on the same hardware or time synchronized.

t0 = currentTimeMillis()
msg = receive({ msg -> msg.sentAt >= t0 })
notthetup commented 8 months ago

platforms can be assumed to be on the same hardware or time-synchronized

I think that's not the case with the example that @ettersi talked about.

But this makes me think of a useful way to solve this to have a receivedAt timestamp on a message which is set by the Gateway or Agent when the message is received?

mchitre commented 8 months ago

I was thinking along the same lines...

ettersi commented 8 months ago

We can add a message-specific flush:

function flush(gw::Gateway, ::Type{T}) where {T <: Message}
    while true
        if isnothing(receive(gw, T))
            break
        end
    end
end

Whenever we're receiving a notification, we're already potentially stealing it from someone else, so it's probably fine to steal all of them while we're at it...