wlandau / crew

A distributed worker launcher
https://wlandau.github.io/crew/
Other
129 stars 4 forks source link

Use condition variables instead of nanonext::stat() to query NNG connections #57

Closed wlandau closed 1 year ago

wlandau commented 1 year ago

This should improve performance at scale. cc @shikokuchuo.

wlandau commented 1 year ago

To be clear, this will affect the custom bus sockets that {crew} creates, not the data req/rep sockets internal to {mirai}.

shikokuchuo commented 1 year ago

I hope the below gives you a clear example of how condition variables can be signalled by pipe events and how to read them. You are not doing any waits, so slightly more efficient not to set the flag for pipe_notify().

Without resetting, you can easily track the state by taking cv_value(cv) %% 2L.

For your case you'd need a cv for each socket.

library(nanonext)
s <- socket("bus")
s1 <- socket("bus")

cv <- cv()
pipe_notify(s, cv = cv, flag = FALSE)
listen(s)
cv_value(cv)

dial(s1) # connection established
cv_value(cv)

close(s1) # connection dropped
cv_value(cv)

cv_reset(cv)
cv_value(cv)
wlandau commented 1 year ago

Thanks, this is really helpful! So the condition variable can live on the client and it gets updated based on connection events?

shikokuchuo commented 1 year ago

Yes, you just pair a cv with your bus socket on the client. Call pipe_notify() before calling listen() with the URL to make sure you capture the initial connect.

Then you don't need to stat accept or pipes, you just call cv_value(cv), and you know whether you are connected or not depending on whether the value is divisible by 2. It also keeps the record of cumulative connects / disconnects for you.

wlandau commented 1 year ago

Awesome!

Just to be safe, I think it might also be nice to check $state on the listener to make sure an accidental closure of the listener socket does not interfere with output of cv_value(). Is $state fast for hundreds of calls?

shikokuchuo commented 1 year ago

No need to check $state - it's just an attribute, not an active function.

It doesn't matter if your listener dies or the other side, it's the same - your cv is notified on pipe events - a pipe requires both sides!

wlandau commented 1 year ago

That's really neat. I do wonder though, suppose I connect the listener and the dialer, but then something goes wrong with the listener and it accidentally disconnects. cv_value() is then 2, but yet the dialer is still connected. Do you see what I mean?

wlandau commented 1 year ago

And should I worry about a scenario where the listener disconnects and reconnects behind the scenes, silently incrementing the cv by 2?

shikokuchuo commented 1 year ago

That's really neat. I do wonder though, suppose I connect the listener and the dialer, but then something goes wrong with the listener and it accidentally disconnects. cv_value() is then 2, but yet the dialer is still connected. Do you see what I mean?

Nice try! I like how you think these through, but no listener, no connection. It is not possible for the dialer to still be connected. Connection requires both endpoints.

shikokuchuo commented 1 year ago

And should I worry about a scenario where the listener disconnects and reconnects behind the scenes, silently incrementing the cv by 2?

I am pretty sure we're safe here. If the listener drops there is no reconnect mechanism. It is only a dialer which has the 'async' dial capability.

wlandau commented 1 year ago

Nice try! I like how you think these through, but no listener, no connection. It is not possible for the dialer to still be connected. Connection requires both endpoints.

Awesome! Then I think I can entirely rely on the condition variable as you say. If I set it up correctly, 0 means the worker has not yet connected, 1 means it is currently connected, and 2 means it connected and then disconnected.

I am pretty sure we're safe here. If the listener drops there is no reconnect mechanism. It is only a dialer which has the 'async' dial capability.

Ah, that makes me even more glad I disabled asyncdial for the servers in crew.