Open kofalt opened 5 years ago
In our case, we just exit when we detect a connection loss and rely on the container runtime to restart our app.
Hah - makes sense :)
I ended up modifying this logic using sync/atomic.Value
, such that a watchdog
-like coroutine can kick off a new connection without waiting for all uses of Session.Call
or etc to exit. Previously I used a sync.RWMutex
, but Value
seems better suited by not blocking the reconnect.
I've been debating whether to ask for a Client.Reconnect
method. On one hand, Reconnect
might make concurrent use easier when dealing with disconnects, on the other, requiring a new ConnectNet
call enforces the idea that any session state and teardown must be handled robustly.
I am not sure how to use github wikis, but since this ticket is searchable, we can probably just close it.
@kofalt In a commercial product that uses nexus, there is code which is very similar to what you have above. So, this is a useful pattern and at least worth documenting. I am trying to decide if adding something like this to the client library is correct, or if this should only be part of the application implementing a client.
For my own product I ended up wrapping the client entirely within my own, with an internal structure that looks a bit like this:
type Client struct {
mu sync.RWMutex
done chan struct{}
closed bool
logger *log.Logger
debug bool
realmName cw2.RealmName // realm to join
// these track topic subscriptions and procedure registrations that we wish to have applied each time the underlying
// client is created, either due to initialization or socket close -> open cycle
resilientSubscriptions map[string]TopicSubscription // topics to automatically subscribe to
resilientSubscriptionsMu sync.Mutex
resilientRegistrations map[string]ProcedureRegistration // procedures to automatically register
resilientRegistrationsMu sync.Mutex
// underlying client configuration
tlsConfig *tls.Config // optional tls configuration to use
connectionType ConnectionType // what type of client to maintain
serializationType SerializationType // what type of serialization to use
bootstrapFailMode BootstrapFailMode // how we should handle bootstrap errors
outboundQueueSize int // size of outbound ws frame buffer
nexusClient *nclient.Client // nexus client
nexusClientClosed chan struct{} // if there is a value here, the nexus client was closed for some external reason.
}
This provided me with a few benefits:
Close()
it, at which time it will become defunctMy implementation has been running very successfully for a few months now, and this topic discussion has me of a mind to possibly contribute to a ResilientClient
implementation.
If there is interest I can start on that this weekend.
That looks great @dcarbone. Have a gander at atomic.Value
when you get a chance, I swapped to that over sync.RWMutex
when replacing the Client
and like that a lot better. I would review a ResilientClient
patch with interest.
For reference, here's roughly what I ended up with after some improvements:
// Type changed
Session atomic.Value
// start() same as before
// resilientConnect() same as before, but calls Session.Store
func watchdog() {
// same as before, except now calls Load:
_ = <-Session.Load().(*client.Client).Done()
// ..
}
I then added a new function:
// GetClient blocks until an allegedly-connected client session is available.
func GetClient() *client.Client {
for {
maybeSession := Session.Load()
if maybeSession != nil {
session := maybeSession.(*client.Client)
open := true
// Check session status
select {
case <-session.Done():
open = false
default:
}
if open {
return session
}
}
// Wait for retry...
}
}
Then, any WAMP interaction first calls GetClient
and receives a probably-valid session.
This has the advantage of allowing you to renegotiate new connections while the old one is still failing; functions involving Session.Call
/etc can shut down concurrently with a new connection being made.
I would like to pass a context to GetClient
so you could give up after some period.
Maybe something like this could also be integrated into our high-level 'service' library: service...
@kofalt @martin31821: alright, i will have something by Saturday. Looks like its going to rain here all weekend, so perfect timing :)
I have time this weekend to collaborate on design discussions, code reviews, and help with development here, so count me in!
Previously I mentioned a resilient client wrapper that wraps the nexus client, so I will share a stripped-down likeness of the code to give some ideas of what was done for a production system. The wrapper maintains a connected nexus client. If the client is disconnected, then the wrapper creates a new connected client. The wrapper also presents a channel-based interface. For example, a Publish()
method returns a channel that the caller can use for publishing messages, similar for others like Subscribe(), Register(), etc. The channel is serviced by the same goroutine that handles reconnection, which allows the wrapper to avoid race conditions between connecting a new client and handling communications.
Here is some skeleton code to provide ideas:
type ResilientClient struct {
realm string
routerURI url.URL
logger *log.Logger
session *client.Client
acts chan func()
once sync.Once
}
func (c *Client) Run(ctx context.Context) {
c.once.Do(func() {
go c.run(ctx)
})
}
func (c *ResilientClient) run(ctx context.Context) {
var (
newClientTimer *time.Timer
err error
)
defer func() {
if newClientTimer != nil {
newClientTimer.Stop()
}
if c.session != nil {
c.session.Close()
}
}()
for {
var (
currClientDied <-chan struct{}
newClientTimeout <-chan time.Time
)
// Create a new client if first time or if old one died.
if c.session == nil {
cfg := &client.Config{
Realm: c.realm,
Logger: c.logger,
// Config other session details
//HelloDetails: ...
}
c.session, err = client.ConnectNet(c.routerUri, cfg)
if err != nil {
log.Println(err)
} else {
// Setup publishers, subscribers, etc.
//...
}
}
if c.session != nil {
// Current client: monitor for premature death.
currClientDied = c.session.Done()
} else {
// No client: set timer to create one after a short delay.
if newClientTimer == nil {
newClientTimer = time.NewTimer(time.Second)
} else {
newClientTimer.Reset(time.Second)
}
newClientTimeout = newClientTimer.C
}
Loop:
for {
select {
case act := <-c.acts:
// Invoke an action delegate: Publish, Subscribe, etc.
act()
case <-currClientDied:
// The current client has died, force creation of a new one
c.session.Close()
c.session = nil
break Loop
case <-newClientTimeout:
// It is time to retry new client creation
break Loop
case <-ctx.Done():
// This goroutine is done
return
}
}
}
}
// Publish returns a channel for the caller to use for publishing events.
func (c *Client) Publish() chan *Message {
messages := make(chan *Message)
go func() {
for message := range messages {
c.acts <- func() {
c.publish(message)
}
}
}()
return messages
}
func (c *Client) publish(msg *Message) {
if c.client == nil {
return
}
err := c.client.Publish(msg.Topic, msg.Options, msg.Args, msg.KwArgs)
if err != nil {
log.Printf("Publish failure (router=%s) (realm=%s) (topic=%s): %s",
c.routerUri, c.realm, msg.Topic, err)
c.close()
}
}
I have a branch in works: https://github.com/dcarbone/nexus/tree/feature/resilient-client
Let me know what you think. One big thing for me is to not block while the client is down, implementors should be allowed to determine how they want to handle things, i believe.
https://github.com/dcarbone/nexus/blob/feature/resilient-client/client/resilient_client.go contains the code.
There are a number of things i would like to clean up and address, namely using determinable errors for Closed
and Disconnected
differentiation, and some other misc cleanup, but something very similar to this is what i wrote for use in my own projects and it works quite well.
Input welcome @gammazero @kofalt @martin31821
@dcarbone Yes, many similarities. Some implementation diffs like RWLock vs goroutine for synchronization, but basically the same idea.
I agree that having determinable errors is beneficial. Similar to a Context
, the nexus client already has a Done()
method. I will add an Err()
method to describe why the done signal is set, and that will return a specific error value, again like Context
(https://golang.org/src/context/context.go#L155)
Similar should be done for cancelable calls.
Tonight, I will put together a list of design goals, which seem shared by our client implementations, that we can discuss. We should also consider what kind of overall interface we want, like calling methods, or writing/reading messages over channels.
What's your idea or plan in regards to automatically reestablishing previously used registrations and subscriptions? Should those be reestablished? And also what's the case with dynamically established subscriptions/registrations?
@martin31821: i retain a separate list of what I've coined "Resilient" procedures and topics. Upon underlying client connection, i call this method, which will attempt to re-subscribe / re-register as necessary. One thing that needs some work still is how to handle errors during these calls when they are being executed asynchronously post-initialization.
Calling Unsubscribe
and Unregister
will remove any resilient subs / regs from their respective lists.
@dcarbone that seems like a fairly good approach to me. I'd also like to see a callback to clear local state when the connection is lost. How are publications handled when the client is in the reconnecting state? And how are errors handled when the service has another auth role after it reconnects?
@martin31821 at the moment anything done during reconnect is blocked by virtue of the reconnect happening within a Lock()
, which I could see an argument for modifying. Any actions taken when the client is nil will result in an error return.
A callback is a good idea, I will work on one.
@dcarbone Regarding Any actions taken when the client is nil will result in an error return
. I am thinking that is preferable to block to caller until the reconnection happens. Otherwise, this forces the caller to implement resiliency in the form or retries -- and defeats the purpose of a resilient client.
@gammazero I disagree. WAMP messages should be ephemeral, and the blocking the entire process without at least providing the implementor some way to cancel the attempt after a period of time is not a good model. There are a litany of reasons the connection would be unable to be established, network issues, external auth service down, etc., and there is no guarantee that a connection will be re-established while the message still has relevancy.
The primary benefit of this client is reducing the amount of boilerplate each individual implementor should do when utilizing WAMP In their product.
The overall goal is to provide a client that recovers from temporary network outages, as invisibly as possible. A secondary goal is to reduce the amount of boiler plate code needed to implement a client.
ErrCanceled
).ErrClosed
and client will stop trying to reconnect.@dcarbone I was thinking that blocking would be handled by having the caller provide a context to be able to cancel or timeout a blocked call.
@gammazero I could see that as a course of action, however I think it may be difficult to pass the context all the way down to the socket transmission layer in an eloquent manner. Would have to either turn each message into a context.Context
or modify quite a bit of the codebase.
I looked into this quite some time ago and came to the conclusion that it was more work than I felt made sense at the time. I was also less familiar with GO then, so its possible I am misremembering the amount of work that would be involved.
It may be worth another look (I don't have an answer either), since a context "carries deadlines, cancelation signals, and other request-scoped values across API boundaries". If it is the right thing to do then we can always do a major API revision.
Catching up on this thread.
1: The "Design Ideas" comment LGTM; I would probably want to add something about custom re-subscription behavior.
Consider the use case of watching an append-only log, like a chatroom. A chat client probably wants to subscribe to chat events, wait for the subscription to be acknowledged - is there a way to do that in WAMP? - then make an RPC fetching recent chat history. The client then de-duplicates any chat events they get that overlap with the chat history, and displays the chat. Otherwise, you'd have a race condition where you could miss some chat lines.
For a resilient WAMP client, you would want to make the same chat-history RPC to see if you missed any chatter while you were disconnected. Maybe callbacks are enough to handle this scenario?
type ResilientClient struct {
// ...
// Fires when the session fails and resilient reconnection is about to begin.
// Return false to cancel the reconnet.
OnDisconnect func() bool
// Fires when the session is healthy again.
OnReconnect func(client ResilientClient)
}
I made the func signatures up - maybe they'd need to take some other / useful parameters. And they probably wouldn't be exported on the struct itself but instead part of ClientConfig
or something. You get the idea.
If something like this pattern is used, you wouldn't need to re-subscribe based on a remembered topic list. That would be left to the callback to call client.Subscribe()
- or not, as business logic dictates.
Maybe you don't want to re-subscribe to a topic if your connection has failed five times in the last hour, or some other graceful-degradation / circuit-breaker behavior, etc...
2: Seconded on the context approach; I'm using contexts all over the place to manage a highly-concurrent server task and it is exceedingly useful to coordinate semantic chunks of logic that do not acknowledge or understand each other.
These are all excellent points. I agree, lets work on a context-focused model.
@gammazero, ideas on where to begin?
A literal shower thought after vegetating on this more - my proposed OnReconnect
handler would be more appropriately used as OnConnect
instead, explicitly firing on the first and every subsequent connection.
Then, it's trivial to get @dcarbone's resilientSubscriptions
behavior (just pass a closure with some Subscribe()
calls), while still meeting the use case in my previous comment.
@kofalt Regarding your comment on "chat history". WAMP does have an advance profile feature called Event History (https://wamp-proto.org/_static/gen/wamp_latest.html#event-history) that would allow the recover of missed events. This requires the router to support it though. If there is router support, the the resilient client should be able to handle automatic recovery of missed events.
As far as the OnConnect
callback, that can certainly be provided so that the application can do something if a reconnect happens. However, I do not think that is the place to handle re-subscribe and re-register. I think the resilient client should always do that so that on reconnect. If the application wants to end a subscription or registration, then it should explicitly unsubscribe or unregister. In other words, the reconnection should be invisible unless disconnect caused the application to timeout/cancel some blocked action.
@dcarbone Where to begin... Interface design. That is the most critical part of all of this. If the interfaces support all of the use cases we agree on, in the safest and simplest way, then that will determine how all the rest works.
I say we edit wiki pages to propose resilient client interfaces, and then we can pick what we like best from them all and come up with a final, or we can submit links to code in our repos, gists, etc. I say try to come up with a fairly complete set of function signatures, but do not provide implementation beyond them.
@gammazero resubscribe and re-register should be handled by the client itself. The onConnect callback should only be used to change internal state of your application.
I was wondering if this pattern matches your usage for a WAMP client that should try to always maintain a healthy connection. Do you do something differently? If so, would it make sense to stick something like the below in the wiki?
The rough approach I use: