hashicorp / raft

Golang implementation of the Raft consensus protocol
Mozilla Public License 2.0
8.26k stars 995 forks source link

automatic Apply forward to leader? #128

Open rogpeppe opened 8 years ago

rogpeppe commented 8 years ago

It's not possible to call Raft.Apply on any node that isn't the leader. This forces implementations to forward the message to the leader if a non-leader wants to apply a log message.

Given that the Raft already has connectivity to the peers, why not make this automatic, so Apply could be run on non-leaders too?

fd commented 8 years ago

I believe that would hide to many important details in raft.

I find it easier to just forward those FSM operations to the leader over GRPC (as unary RPCs; with deadlines/retry/backoff...)

rogpeppe commented 8 years ago

Is there any way to use the existing channel to the leader? Otherwise there's always a need to have some other way to find out the "alternative" address for the leader to send it the forwarded request because the leader address isn't by itself sufficient. If you want to avoid static port numbers, that becomes awkward because then the alternative addresses need to become part of the state itself. How do most implementations deal with this?

FWIW I'd suggest that if Apply did this, it wouldn't retry or backoff - just return an error if it fails to talk to the leader. There's already provision for a deadline in the Apply method.

Another reason this would be useful is that if you forward the request manually, you don't have access to the value returned by FSM.Apply AFAICS. If forwarding were automatic, that should be relatively easy to arrange.

fd commented 8 years ago

How do most implementations deal with this?

Some solutions:

Currently I'm using solution 1 (with dynamic base ports). But I'm also investigating solution 2 as it would allow me to run everything on a single port.

FWIW I'd suggest that if Apply did this, it wouldn't retry or backoff - just return an error if it fails to talk to the leader. There's already provision for a deadline in the Apply method.

Agreed. Although the deadline on Apply is not quite a deadline on the operation itself. Instead it's a deadline on how long you want to wait for raft to start processing your message.

Another reason this would be useful is that if you forward the request manually, you don't have access to the value returned by FSM.Apply AFAICS. If forwarding were automatic, that should be relatively easy to arrange.

That depends on how the leader responds to a client/follower. I have some code that effectively forwards a GRPC call to the leader, the leader then handles the FSM.Apply marshals the response and sends it back to the follower which in turn sends that back to the client. The advantage heer is that I have per operation control over how retries/deadlines should be handled.

rogpeppe commented 8 years ago

OK, those are pretty much the solutions I'd expect, and they seem... unnecessarily heavyweight, given that all we need to do (as I understand it) under the hood is add another RPC call in the same transport. There would be no need for the response to contain the value from the FSM.Apply as that could be returned from the FSM.Apply call that happens when the log gets applied locally.

That depends on how the leader responds to a client/follower. I have some code that effectively forwards a GRPC call to the leader, the leader then handles the FSM.Apply marshals the response and sends it back to the follower which in turn sends that back to the client. The advantage heer is that I have per operation control over how retries/deadlines should be handled.

That assumes that the value returned from Apply is always marshalable, whereas in fact it could contain arbitrary data structures that refer to pieces of the state or more interesting things.

fd commented 8 years ago

OK, those are pretty much the solutions I'd expect, and they seem... unnecessarily heavyweight, given that all we need to do (as I understand it) under the hood is add another RPC call in the same transport. There would be no need for the response to contain the value from the FSM.Apply as that could be returned from the FSM.Apply call that happens when the log gets applied locally.

Ok, this sounds do-able. My only concern at this time is any unexpected behavioural differences between FSM.Apply on a follower and FSM.Apply on a leader. Especially in the face of leadership changes and network failures.

That assumes that the value returned from Apply is always marshalable, whereas in fact it could contain arbitrary data structures that refer to pieces of the state or more interesting things.

That's true although I restricted my self to just marshalable results.

rogpeppe commented 8 years ago

@fd Are you officially involved with this repo or just speaking in a personal capacity? I'd like to propose a PR that implements this but I don't want to spend the time if it's not likely to be accepted.

fd commented 8 years ago

@rogpeppe I'm speaking in a personal capacity. I'm just raising my own thoughts/concerns based on how we use raft.

slackpad commented 8 years ago

Hi @rogpeppe sorry for the late reply on this.

We do leader forwarding for some reads as well as writes so I don't think we'd be able to take advantage of this for everything in Consul and Nomad. Another subtle thing here is that we forward the whole RPC to the leader and execute it there (the Apply()) is part of a chunk of code that executes to fulfill the request), and the caller is returned the RPC's result. There are cases where the leader maintains some state (expiration timer kind of stuff) that's also required in order to fulfill an RPC, so there's no way we could run the main chunk of RPC handling code on a follower and just forward the Apply() call to the leader.

I think I'd hesitate to take this change because the projects we maintain that depend on Raft wouldn't use it in any way. It's also nice to keep the current traffic on the Raft TCP connections mostly in line with the Raft paper, which keeps things easier to reason about.

rogpeppe commented 8 years ago

[Finally noticed the reply :-)]

@slackpad

We do leader forwarding for some reads as well as writes so I don't think we'd be able to take advantage of this for everything in Consul and Nomad.

Automatic leader-forwarding on Apply doesn't imply that you can't do any leader-forwarding when desired AFAICS... or does it?

Another subtle thing here is that we forward the whole RPC to the leader and execute it there (the Apply()) is part of a chunk of code that executes to fulfill the request), and the caller is returned the RPC's result. There are cases where the leader maintains some state (expiration timer kind of stuff) that's also required in order to fulfill an RPC, so there's no way we could run the main chunk of RPC handling code on a follower and just forward the Apply() call to the leader.

Again, isn't this something that applies to some specific cases (which could explicitly decide to leader-forward) and not necessarily all cases? (For expiration timer kind of stuff, isn't that potentially problematic anyway because it implies that application of a log entry isn't deterministic with respect to the state machine? I'm probably misunderstanding things though)

For myself, I come from a naive user's point of view - I see this nice Raft API (the hashicorp/raft API is way way nicer than the other Raft package I've looked at) but in order to use it for anything practical, I have to immediately invent some out-of-band communication mechanism for no apparent reason. If I forward an Apply to the leader, the entry will actually result in an FSM.Apply call locally, but the result of that is discarded. In fact, are we actually guaranteed that the local FSM.Apply call will have happened by the time we get the forwarded Apply response from the leader?

One possible alternative to my original proposal is to provide the capability to forward arbitrary RPC requests to a peer within the same communication channel. Unfortunately none of the arguments to NewRaft look immediately suitable for adding that functionality to. A new call could work though.

How about something like this?

// NewRaftParams holds parameters for a NewRaftWithParams call.
type NewRaftParams struct {
    Config *Config
    FSM FSM
    Logs LogStore
    Stable StableStore
    Snaps SnapshotStore
    PeerStore PeerStore
    Transport Transport
    // RPC can optionally be provided to handle arbitrary communication
    // between peers. If this is not provided, a call to Raft.CallRPC will fail.
    RPC RPCExecutor
 }

 // NewRaftWithParams is similar to NewRaft but with parameters in a struct
 // so they may be added to.
 func NewRaftWithParams(p NewRaftParams) (*Raft, error)

 type RPCExecutor interface {
       // RPC invokes an arbitrary RPC call a peer. The first argument
       // holds the Raft instance that is doing the invocation.
       // The return data will be returned as the result of the Raft.CallRPC call.
      RPC(r *Raft, param []byte) ([]byte, error)
 }

 // CallRPC invokes an arbitrary RPC call on the given peer with the
 // the given parameter and returns its result.
 func (r *Raft) CallRPC(peer string, param []byte) ([]byte, error)

This at least would avoid the need to use dubious out-of-band mechanisms to create a necessary part of any implementation.

It's also nice to keep the current traffic on the Raft TCP connections mostly in line with the Raft paper, which keeps things easier to reason about.

I understand this, but is it really that awkward to exclude certain message types from consideration? The traffic is still there and important, even if it isn't in-band on the same TCP connection AFAICS.

slackpad commented 8 years ago

Hi @rogpeppe

For expiration timer kind of stuff, isn't that potentially problematic anyway because it implies that application of a log entry isn't deterministic with respect to the state machine?

That's exactly right. We forward the client's RPC to the leader which then makes the decision whether or not to call Apply() as part of that handler. I was just making that case that we probably wouldn't want to magically forward at the Apply() level, at least for some use cases.

One possible alternative to my original proposal is to provide the capability to forward arbitrary RPC requests to a peer within the same communication channel.

This could work in some basic cases, but I think it's still hard to make a robust general RPC forwarding solution without adding more complicated plumbing. Would we open a new connection for each of these RPCs, or mix it in with the long-running pipeline pipeline connection, etc.

I think I'd hesitate to take this change because the projects we maintain that depend on Raft wouldn't use it in any way.

I think that's my main objection - this would be a not-so-robust basic feature that we wouldn't use ourselves, so it would become kind of a liability / extra complexity.

Thinking about this more, maybe the right thing to do is to make it easier to run Raft on top of a connection multiplexer like https://github.com/hashicorp/yamux. That would benefit Raft by sharing a single connection even internally (we have special transport support for pipelining) and by sharing the same multiplexer given to Raft with other parts of the application, you could use it for other things in a way that's completely unknown to Raft's interfaces and internals. We'd use this as well :-)

rogpeppe commented 8 years ago

Thinking about this more, maybe the right thing to do is to make it easier to run Raft on top of a connection multiplexer like https://github.com/hashicorp/yamux. That would benefit Raft by sharing single connection even internally (we have special transport support for pipelining) and by sharing the same multiplexer given to Raft with other parts of the application, you could use it for other things in a way that's completely unknown to Raft's interfaces and internals. We'd use this as well :-)

Yes, this seems like a nice possibility - a Transport implementation that layered onto an interface compatible with or easily implemented by yamux.Session perhaps. Unfortunately I don't think it would be immediately straightforward to share an existing yamux.Session with Raft, because when doing an Accept, there's no way to know which component at the other end is doing the Open, so there'd be no way for the transport to accept only connections intended for Raft. I guess one could change yamux to provide support for sending a stream identifier with the Open (yamux could implement a Listen method to listen for connections on a particular identifier), layer something on top of yamux to do that, or the transport implementation could itself implement a similar interface to yamux.

Do you think there would be adverse implications from using a single TCP connection for all traffic (including independently streaming snapshots) rather using using a TCP connection for each concurrently running operation as currently?

josephglanville commented 8 years ago

We multiplex connections between the Raft RPC layer and our client HTTP layer in Flynn's discoverd using a basic TCP multiplexer. The multiplexer is at https://github.com/flynn/flynn/blob/master/pkg/mux if it's something you want to take a look at. We do use a custom transport though, though I am pretty sure even the standard transport would work as you can just give it a net.Listener. Using a multiplexer is nice because it means all communication happens on a single port.

rogpeppe commented 8 years ago

This could work in some basic cases, but I think it's still hard to make a robust general RPC forwarding solution without adding more complicated plumbing. Would we open a new connection for each of these RPCs, or mix it in with the long-running pipeline pipeline connection, etc.

From my naive understanding of the code, a Transport is already required to implement some kind of RPC interface (for example we have genericRPC in NetworkTransport) so it should be trivial to implement an RPC request in the same framework. As with the other RPCs, it would use a new connection if an existing connection wasn't available.

Thanks for the pointer to discoverd's implementation - that's interesting. I'll keep on investigating and playing around with this.

josephglanville commented 8 years ago

Thanks for the pointer to discoverd's implementation - that's interesting. I'll keep on investigating and playing around with this.

No problem, would be interested to hear about your findings.

skolodyazhnyy commented 7 years ago

May I wonder how consul does it? I guess it needs to forward messages to leader from time to time, for example, to add/remove a peer. Is it done over memberlist/serf? Is it just an RPC call?

Thanks in advance, the library seems to be very nice and I even made a little cluster myself, but some use cases are still a bit of a mystery :)

paralin commented 7 years ago

If you guys want, the way I've done this:

https://github.com/paralin/raft-grpc

GRPC for the Raft transport, and then I send a RPC over GPRC to do the apply. But using GRPC for all of the raft traffic has been working great. In my code I have a connection pool to the peers.

hsanjuan commented 7 years ago

We do leader forwarding in ipfs-cluster. All networking is done via libp2p, which provides multiplexing for free. Thus Raft and our internal RPC work over the same connection and we redirect things to the leader with "ease".

In my experience, I think it's hard to make a robust general RPC forwarding solution (in a context where peerset/leadership changes). It's probably best for every application to decide how to retry, how many times, how long to wait etc. but I don't know so much about raft internals to judge.

bklau commented 6 years ago

@slackpad @rogpeppe Folks: what is the status on this JIRA issue?. From all discussions above, I think perhaps, best to have some "utility" library written and a small doc to show end users how to use it to Apply the calls to the leader?. Seems like a very common use case that we don't to force the end user to write boilerplate code over and over again. Just use it and get on with life.

kleijnweb commented 5 years ago

When dealing with this issue, I ran into the issue that nodes do not seem to know which node is the leader, unless they are the leader themselves. The way I am dealing with that now is writing a log entry when a node becomes leader, so that peers can be informed. I suspect there is a better way, what am I missing?

freeekanayaka commented 5 years ago

@kleijnweb it's perfectly normal that there are periods of time when a node does not know who the leader is. Typically that happens during elections, when actually there is no leader at all (so there is know way a node could find out what the leader is, because there is none). There are also cases where a node thinks to know who the leader is, but in fact that leader is change. Committing an entry to the log doesn't help in any way, since all the considerations above still apply: e.g. a node might think to know who the leader is because it sees a log entry, but in fact it hasn't yet received other log entries that contain information about a new leader.

In short: the only sensible way for clients is to retry. The raft dissertation has a chapter dedicated to how clients should be implemented, it's very informative and basically covers every issue raised in this issue.

The takeaway is that a core raft library like hashicorp/raft should probably be not opinionated about client implementation, because that's very application-specific and ties with the internals of the application's FSM.

The issue of how to tell hashicorp/raft to share TCP connections/listeners with some other RPC framework/handler is orthogonal to the implementation of application-specific clients, and could be solved with a solution along the lines of what @slackpad and others have proposed.

kleijnweb commented 5 years ago

I understand that there are plenty of caveats, what I'm asking is if there is a better (not perfect) way to decide where to forward a message when a node thinks it is not the leader, other than sending an "i am the captain now" message to the cluster using the log.

freeekanayaka commented 5 years ago

@kleijnweb the "I'm the captain now" message is useless, because the node that won the election and became the leader has already sent an "I'm the captain now" message using the RequestVote RPC. What you have to understand is that this is an asynchronous system, so nodes will always have different opinions about what the state of affairs is, and they generally have contradicting opinions. That's the whole point of raft.

As said, your solution of committing an entry to the log is pretty much useless, as you fall back to the same problem you want to solve, which can't be solved in the first place. There's no better solution, you have to live with the pitfalls of distributed systems, at least if you want something correct that won't lose or corrupt your data.

The best reference you can have to write a raft client is Ongaro's dissertation, chapter 6. I think that is a must read for anyone using a raft library, because the raft library should not be opinionated in that regard, therefore as application developer consuming raft and coming up with a client you'll have to understand what you're doing.

kleijnweb commented 5 years ago

I think you misunderstand, as what I'm trying to do is no different from what people say they have done just a couple messages above. But I completely overlooked that there is *Raft.Leader(), which will do just fine. Thanks for responding though.

freeekanayaka commented 5 years ago

Ah yes, I misunderstood then. I assumed you knew about Raft.Leader(), but you found that it would occasionally return an empty string, as it's expected.

waeljammal commented 5 years ago

If you guys want, the way I've done this:

https://github.com/paralin/raft-grpc

GRPC for the Raft transport, and then I send a RPC over GPRC to do the apply. But using GRPC for all of the raft traffic has been working great. In my code I have a connection pool to the peers.

This does not work with latest version of raft, missing the TimeoutNow method.

Also I'm finding it tedious myself to forward all requests to Apply a util, or simpler way of accomplishing this without writing another layer and exposing yet another port would be appreciated, I can not piggy back on my public RPC port I have to create a second server with private port causing me to have to configure rest port, rpc port, raft rpc port, raft port. Ports ports everywhere haha.

tomqwpl commented 4 years ago

I am also researching Raft and looking into this Raft implementation, and the most obvious issue when trying to work out how to adopt this is how to forward things like Apply to the leader. I notice that the etcd Raft library does do this, it forwards Apply to the leader automatically. However the etcd library doesn't look as nicely constructed as this. From what I can see Consul uses a separate gRPC connection and uses ALPN so that the same port is used for both.

The only issue I can see with raft-grpc would be the size of the messages for "install snapshot". My guess is that snapshots could be pretty large, but I suspect gRPC request streaming could be used for that.

Jille commented 4 years ago

I wrote some example code https://github.com/Jille/raft-grpc-example

It uses https://github.com/Jille/raft-grpc-transport to use gRPC for communication between nodes. It supports the v3 protocol, and is easier to use than https://github.com/paralin/raft-grpc.

It uses https://github.com/Jille/raft-grpc-leader-rpc to allow you to easily send RPCs to your leader. Clients connect to all Raft nodes and use health checks to figure out who the leader is. Add in some retries and you don't have to think about it anymore :)

tomqwpl commented 4 years ago

@paralin Really? I'll be interested in what you think the better alternative is?

paralin commented 4 years ago

@tomqwpl Apologies for my last message, it was intended for an issue posted on my raft-grpc repo, which I havent' updated in a long time (so I was saying that particular repo is obsolete).

JoanFM commented 2 years ago

Hello team, and thanks for the great library.

I was wondering if by any chance there has been any progress on this matter. I believe like @tomqwpl that this would be a killer feature so that one can treat from a client side a cluster in a unitary way in a transparent way.

Is there clear documentation on how consul or some other services handle these limitations? I could not find one

Thanks in advance!

abovestratosphere commented 7 months ago

I'm also scoping out this library and this would be a great feature to add. I agree with tomqwpl above that while the ectd raft implementation supports proxying log append proposals from followers to the leader, it's overall not as easy to use as this one.