ktoso / akka-raft

A toy project implementing RAFT on top of Akka Cluster (not prod ready)
http://blog.project13.pl
Apache License 2.0
280 stars 42 forks source link

Minor review comments #13

Closed patriknw closed 10 years ago

patriknw commented 10 years ago

Great work Konrad! I was curious to see how you had used Akka in this interesting project. Wrote a few minor comments. Let me know if you have any questions.

--- a/README.md
+++ b/README.md
@@ -7,6 +7,8 @@ This is an akka based implementation of the Raft consensus algorithm.

 It is akka-cluster (which is _experimental_) aware, and supports the additional features of raft such as: mambership changes and (_not yet_) snapshotting.

+PN: akka-cluster is not experimental
+
 **This is still work in progress and has not been stress tested (athough it is tested on multiple nodes already)**

 Basic info
@@ -24,6 +26,8 @@ class WordConcatRaftActor extends RaftActor {

   var words = ListBuffer[String]()

+PN: val + mutable collection, or better var + immutable collection. I would use var Vector here  
+
   /** 
    * Called when a command is determined by Raft to be safe to apply; 
    * Application results are sent back to the client issuing the command.
@@ -38,6 +42,7 @@ class WordConcatRaftActor extends RaftActor {
     case GetWords =>
       log.info("Replying with {}", words.toList)
       words.toList
+      
   }
 }

@@ -51,6 +56,9 @@ members foreach { _ ! ChangeConfiguration(clusterConfiguration)
 // todo implement re-routing if you send to a non-leader
 // then send messages to it; the state machine will only be applied when consensus has been reached about a value
 leader ! ClientRequest(AppendWord("I"))
+
+PN: perhaps illustrate how the leader ref is created/retrieved?
+
 leader ! ClientRequest(AppendWord("like"))
 leader ! ClientRequest(AppendWord("capybaras"))

diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf
index d6ddc00..8a4f1bb 100644
--- a/src/main/resources/application.conf
+++ b/src/main/resources/application.conf
@@ -1,3 +1,4 @@
+# I think it's dangerous to include an application.conf in a library like this
 akka {
   loglevel = "INFO"
   stdout-loglevel = "INFO"
diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf
index df79cf7..cabdb27 100644
--- a/src/main/resources/reference.conf
+++ b/src/main/resources/reference.conf
@@ -19,6 +19,7 @@ akka {

     # When propagating entries among members, AppendEntries can carry multiple log entries.
     # Use this valud to tweak this number as it depends on the characteristics of your Commands.
+#PN: s/valud/value/    
     default-append-entries-batch-size = 5

     # When turned on, will push events like "entry 1 committed" onto the eventStream, mostly designed for testing,
diff --git a/src/main/scala/pl/project13/scala/akka/raft/Candidate.scala b/src/main/scala/pl/project13/scala/akka/raft/Candidate.scala
index 9c74cf1..c2a5819 100644
--- a/src/main/scala/pl/project13/scala/akka/raft/Candidate.scala
+++ b/src/main/scala/pl/project13/scala/akka/raft/Candidate.scala
@@ -18,6 +18,7 @@ private[raft] trait Candidate {
         goto(Follower) using m.forFollower
       } else {
         log.info(s"Initializing election (among ${m.config.members.size} nodes) for ${m.currentTerm}")
+        // PN: log.info("Initializing election (among {} nodes) for {}", m.config.members.size, m.currentTerm) 

         val request = RequestVote(m.currentTerm, self, replicatedLog.lastTerm, replicatedLog.lastIndex)
         m.membersExceptSelf foreach { _ ! request }
diff --git a/src/main/scala/pl/project13/scala/akka/raft/RaftActor.scala b/src/main/scala/pl/project13/scala/akka/raft/RaftActor.scala
index 3f9eed3..3d3a08b 100644
--- a/src/main/scala/pl/project13/scala/akka/raft/RaftActor.scala
+++ b/src/main/scala/pl/project13/scala/akka/raft/RaftActor.scala
@@ -134,10 +134,13 @@ abstract class RaftActor extends Actor with LoggingFSM[RaftState, Metadata]
     require(toMs > fromMs, s"to ($to) must be greater than from ($from) in order to create valid election timeout.")

     (fromMs + Random.nextInt(toMs.toInt - fromMs.toInt)).millis
+    //PN: scala.concurrent.forkjoin.ThreadLocalRandom
   }

   @inline private[raft] def electionTimeoutStillValid(since: Long) = {
     val stillValid = electionTimeoutDieOn < System.currentTimeMillis()
+    //PN: I would use System.nanoTime for measuring durations (currentTimeMillis may jump)
+    //    Here you might find scala.concurrent.duration.Deadline useful

     if (stillValid)
       log.info(s"Timeout reached (since: $since, ago: ${System.currentTimeMillis() - since})")
diff --git a/src/main/scala/pl/project13/scala/akka/raft/cluster/ClusterRaftActor.scala b/src/main/scala/pl/project13/scala/akka/raft/cluster/ClusterRaftActor.scala
index 9eb375d..cbc37ba 100644
--- a/src/main/scala/pl/project13/scala/akka/raft/cluster/ClusterRaftActor.scala
+++ b/src/main/scala/pl/project13/scala/akka/raft/cluster/ClusterRaftActor.scala
@@ -38,6 +38,7 @@ trait ClusterRaftActor extends RaftActor {
     case MemberUp(member) =>
         log.info("Node is Up: {}, selecting and adding actors to Raft cluster..", member.address)
         val memberSelection = context.actorSelection(RootActorPath(member.address) / "user" / "member-*")
+        //PN: make the name configurable?
         memberSelection ! Identify(member.address)

     case ActorIdentity(address, Some(raftActorRef)) =>
@@ -51,10 +52,12 @@ trait ClusterRaftActor extends RaftActor {
     case UnreachableMember(member) =>
       log.info("Node detected as unreachable: {}", member)
       // todo remove from raft ???
+      // PN: perhaps, but note that it can become reachable again

     case MemberRemoved(member, previousStatus) =>
       log.info("Member is Removed: {} after {}", member.address, previousStatus)
       // todo remove from raft ???
+      // PN: yes, but note that an ActorIdentity for an removed member might come in after MemberRemoved

     case _: MemberEvent =>
       // ignore
diff --git a/src/main/scala/pl/project13/scala/akka/raft/example/cluster/WordConcatClusterApp.scala b/src/main/scala/pl/project13/scala/akka/raft/example/cluster/WordConcatClusterApp.scala
index bda9dfd..129b097 100644
--- a/src/main/scala/pl/project13/scala/akka/raft/example/cluster/WordConcatClusterApp.scala
+++ b/src/main/scala/pl/project13/scala/akka/raft/example/cluster/WordConcatClusterApp.scala
@@ -14,7 +14,9 @@ object WordConcatClusterApp extends App {
   val system = ActorSystem("RaftSystem", config)

   val member = system.actorOf(Props(classOf[WordConcatClusterRaftActor]))
+  //PN: wasn't the expected name "member-*" ?

   Cluster(system).subscribe(member)
+  //PN: remove, subscribe is done inside ClusterRaftActor

 }
diff --git a/src/main/scala/pl/project13/scala/akka/raft/example/cluster/WordConcatClusterRaftActor.scala b/src/main/scala/pl/project13/scala/akka/raft/example/cluster/WordConcatClusterRaftActor.scala
index eff52f6..cfbb1e6 100644
--- a/src/main/scala/pl/project13/scala/akka/raft/example/cluster/WordConcatClusterRaftActor.scala
+++ b/src/main/scala/pl/project13/scala/akka/raft/example/cluster/WordConcatClusterRaftActor.scala
@@ -10,6 +10,7 @@ class WordConcatClusterRaftActor extends ClusterRaftActor {
    type Command = Cmnd

    var words = ListBuffer[String]()
+   // PN: I would use a Vector

    /** Called when a command is determined by Raft to be safe to apply */
    def apply = {
diff --git a/src/multi-jvm/scala/pl/project13/scala/akka/raft/cluster/ClusterWithManyMembersOnEachNodeElectionSpec.scala b/src/multi-jvm/scala/pl/project13/scala/akka/raft/cluster/ClusterWithManyMembersOnEachNodeElectionSpec.scala
index f4fda3d..e99a608 100644
--- a/src/multi-jvm/scala/pl/project13/scala/akka/raft/cluster/ClusterWithManyMembersOnEachNodeElectionSpec.scala
+++ b/src/multi-jvm/scala/pl/project13/scala/akka/raft/cluster/ClusterWithManyMembersOnEachNodeElectionSpec.scala
@@ -42,6 +42,9 @@ abstract class ClusterWithManyMembersOnEachNodeElectionSpec extends RaftClusterS
         system.actorOf(Props[WordConcatClusterRaftActor], s"member-$idx")
       }
     }
+    //PN: I think you should start the raft actors first, and then have a barrier,
+    //    otherwise there is a risk that some node receives MemberUp and sends Identify
+    //    before the target actor is started

     // start additional members
     runOn(first) {
-- 
ktoso commented 10 years ago

Awesome!, thanks a lof for the comments Patrick!

Some of these I totally missed, so super grateful about the feedback - I'll address those today after work :-)

Q1: While we're at it, I did have a question about style of passing data around in an FSM.

So my replicatedLog is as an actor's field (same with nextIndex and matchIndex): https://github.com/ktoso/akka-raft/blob/master/src/main/scala/pl/project13/scala/akka/raft/RaftActor.scala#L28 On one hand it feels normal to have this here - it's the member's state; but I was thinking that I use the FSM state (Meta, LeaderMeta etc) for passing around "some" (where "some" is not clearly defined yet) parts of the state. Do you think it's worth it to move everything (replicatedLog and friends) to the Meta class, and keep the actor's body more empty? Testing wise I do access replicatedLog direcly via underlyingActor in a few "single state" tests.

Q2: I'm thinking if it's worth-while to provide an RaftClientActor, which would take care of tracking "who is the leader right now", so you could raft ! "test" without having (as user of this pattern), to worry about reacting to responses like "oh but the leader isn't me anymore, it's member-3". Might be a nice bonus to make this lib more usable I guess, thoughts?

Something like:

trait RaftClientActor {
  var leader: Option[ActorRef] = None
  // pseudocode
  def aroundReceive(...) = msg match {
    case LeaderChanged(newLeader, inReplyToThisMsg) => // so we sent the message to a follower / candidate, it told us about the new leader
      leader = newLeader
      leader ! inReplyToThisMsg
    case _ => 
      receive(msg)
  }
}

class Client extends RaftClientActor {
  def receive = { case _ => leader ! "hello" }
}

So it would hide the fact the leader is changing from the end-user-api, and try to redeliver the message to the right leader. Do you think it's a good idea to provide this?

Thanks a a lot in advance :-)

patriknw commented 10 years ago

Q1: It's possible to put all state in the FSM, but if it's something that exists for all FSM states it can be more convenient to have them as instance variables as you have done.

Q2: RaftClientActor is a good idea. An alternative (not sure if it's better or not) to a trait is to provide RaftClientActor as a concrete actor that you send the messages via, like a proxy. It could also stash messages if there is no leader at the moment.

ktoso commented 10 years ago

Yes, I considered a proxy actor too hm hm, let's see what looks nicer when implementeed... Thanks for the feedback! :-)

ktoso commented 10 years ago

Thanks again for the comments, addressed most of them, some left for the weekend :-)