intelsdi-x / snap

The open telemetry framework
http://snap-telemetry.io
Apache License 2.0
1.8k stars 294 forks source link

[RFC] - Tribe Design #298

Closed pittma closed 8 years ago

pittma commented 9 years ago

Abstract

Tribe implements a virally replicated, eventually consistent state machine which holds data relevant to a global view of a collection of Pulse agents. It is inspired by a gossip-like protocol called SWIM[1] which was developed at Cornell University. SWIM was developed to solve cluster membership for large gossip-based clusters. However, Tribe must solve higher-level problems aside from just membership. Each node must be able to respond to queries regarding the state of the cluster or another node. Each node must also be able to change the state of the cluster or the state of other nodes.

Design Components

Each node in a Tribe cluster must have a global view of the whole system. This global view consists of two primary components, cluster membership and cluster state. Each of these components are described in detail below.

Cluster Membership

SWIM

In SWIM, membership is replicated out to the cluster through ticks of a configurable length. When the tick fires on a node, if it has a state change to apply, it sends that state change out piggybacked on the regular ping message. This message is not multicast, and is sent to one node at a time. As node learn about the state change, they also forward it to their peers until a limit or timeout is reached on the change of state. This "infection" is exponential, as each infected node continues to choose another member to "infect" until the timeout is reached.

Failure in SWIM

In order to prevent false positives, SWIM uses its Suspicion Mechanism. A node is marked suspicious and then evicted by the following protocol:

  1. Node i pings node j.
  2. If j does not reply, i marks j as suspect. It then sends a Suspect Member message to n random non-faulty members {x,y,z}.
  3. a. If any node {x,y,z} is able to ping j, it unmarks j as a suspect and sends an Alive message out to the cluster.

    b. If none of {x,y,z} successfully ping j, once a time out is reached, i, x, y or z may disseminate a Confirm message to the cluster, alerting other members that j is faulty and should be dropped from the member list.

Tribe applies this failure to not only membership, but also to invariant violation inside the system.

Tribe's Approach

Health / Heartbeats / Pings

In Tribe, every interval i, node j pings node x. Piggybacked on that ping is a data structure which contains j's Aggregates and their logical clocks[2]. Upon receipt, x compares its logical clocks with those received from j. If j's logical clocks C are greater than x's, x requests the Vector Clocks[3] for the aggregates which are behind. If the logical clocks for x are greater than or equal to j's, x simply replies with an acknowledgement.

State Changes

Any node in a Tribe cluster can apply a state change to any Aggregate in the system. These changes are virally replicated out to the cluster, and are eventually consistent. As an example:

screen shot 2015-08-10 at 3 36 22 pm

Rather than choosing a single node after receiving a state change from an external actor, Tribe uses a fanout mechanism which propagates changes into the system f nodes at a time. This allows every node to be reached in logf(n) steps. However, because nodes are chosen at random, it is possible that not every node is reached if nlogf(n) = n. To protect against this, the limit for the number of times a message should be sent is described as logf-1(n). This heuristic causes the probability that every node will be reached to be increased by an order of magnitude.

Cluster State

In Tribe, the cluster state consists of the member list, as well the domain-specific data related to Pulse itself. This primarily consists of Tribes, their memberships, and their agreements.

Managing the State of a Cluster

A Tribe Cluster state can be broken up into components or objects called Aggregates. As described in CQRS[4] or DDD[5], an Aggregate is:

[A] cluster of associated objects that are treated as a unit for the purpose of data changes[4].

An Aggregate is described by an Aggregate Root. An Aggregate Root can be thought of as the class implementation, and an Aggregate an instance of that class:

// The Aggregate Root of a Thing.
class Thing {
    Thing();
    ~Thing();

    update();
}

int main() {
    // the aggregate t
    // t has a lifetime, and is destroyed when this stack frame is unwound.
    Thing t = Thing();
    t.update();
    // t's deconstructor is called implicitly here, ending the short lifetime of t.
}

In Tribe, we use these Aggregates to define the disparate objects which may receive state changes. Being explicit about these Aggregates beforehand allows us to also be explicit about their invariants.

We use an Aggregate's invariants to detect a merge conflict. When a merge conflict is detected, SWIM's failure protocol is used as an attempt to reconcile the conflict. When a node j receives an event e which violates an Aggregate's invariants, it asks a random selection of nodes from its memberlist for their Vector Clocks[3] on the Aggregate which is in violation.

If replaying the events for this Aggregate along with event e does not resolve the violation, the node shoots itself in the head, and disseminates a node failure message about itself. It then attempts a self-healing, where it attempts a rejoin and is given the latest state from whichever node receives its join event.

Tribe

In Tribe, there are 2 Aggregate Roots:

  1. Node - A Node in Tribe is an independent instance of a Pulse service which is participating in a Tribe cluster.
  2. Tribe - A Tribe is a unit which describes a subset of Nodes in a Tribe cluster. A Tribe has a member list of its own, as well as Agreements. An Agreement is an enumeration of rules which members of a Tribe must adhere to.
    The Node Aggregate

The Node Aggregate is bookended by NodeAdded and NodeRemoved or NodeFailed. The other state changes which may affect a Node are TribeJoined, TribeLeft, or Agreement noncompliance events (e.g. failed to load plugin).

Invariants:

  1. If node x is not present in node y's member list and y receives a RemoveNode or NodeFailed, then y must begin the failure protocol and attempt to reconcile its state.
  2. If node x is not present in node y's member list and y receives a {TribeJoined node: x} event, then y must begin the failure protocol and attempt to reconcile its state.
  3. If node x is not present in node y's member list and y receives a {TribeLeft node: x} event, then y must begin the failure protocol and attempt to reconcile its state.
    The Tribe Aggregate

The Tribe Aggregate is bookended by TribeCreated and TribeRemoved. The other state changes which may affect a Tribe are TribeJoined, TribeLeft, AgreementAdded, and AgreementRemoved.

Invariants:

  1. If node x receives an event {TribeJoined id: y} and Tribe y is not present in node x's state, then x must begin the failure protocol and attempt to reconcile its state.
  2. If node x receives an event {TribeLeft id: y} and Tribe y is not present in node x's state, then x must begin the failure protocol and attempt to reconcile its state.
  3. If node x receives an event {AgreementAdded tribe_id: y, "plugins"} and Tribe y is not present in node x's state, then x must begin the failure protocol and attempt to reconcile its state.
  4. If node x receives an event {AgreementRemoved tribe_id: y, "plugins"} and Tribe y is not present in node x's state, then x must begin the failure protocol and attempt to reconcile its state.

    Use cases

TODO - Describe the flow of node addition and removal, tribe creation and removal, and agreement creation and removal as they pertain to the algorithm described in the previous sections.

References

1. SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol: https://www.cs.cornell.edu/~asdas/research/dsn02-swim.pdf

2. Time, Clocks, and the Ordering of Events in a Distributed System: http://research.microsoft.com/en-us/um/people/lamport/pubs/time-clocks.pdf

3. Timestamps in Message-Passing Systems That Preserve the Partial Ordering: http://zoo.cs.yale.edu/classes/cs426/2012/lab/bib/fidge88timestamps.pdf

4. Command Query Responsibility Segregation: https://msdn.microsoft.com/en-us/library/dn568103.aspx

5. Domain-Driven Design: Tackling Complexity in the Heart of Software - ISBN 978-0321125217: http://www.amazon.com/Domain-Driven-Design-Tackling-Complexity-Software/dp/0321125215/ref=sr_1_1?ie=UTF8&s=books&qid=1238687848&sr=8-1

lynxbat commented 9 years ago

"tribe" is a clustering/management feature of Pulse. I feel we should change the language in the beginning

lynxbat commented 9 years ago

"Tribe implements a ..."

geauxvirtual commented 9 years ago

How did we go from a "ring" discussion to SWIM algorithm?

What is going to differentiate this from SERF (serfdom.io)?

jcooklin commented 9 years ago

In the several discussions that we had we eventually agreed on selecting random node/peers.

We are headed down the path of creating a library that will provide similar behavior to that of memberlist which serf depends and our integration of that library into pulse will deliver the tribe use cases. The model and how it is shared within the cluster is the differentiator. The fundamentals of gossiping membership in itself is not a differentiator.

mbbroberg commented 8 years ago

Is this a go as of our latest work @danielscottt @jcooklin @geauxvirtual

pittma commented 8 years ago

Marking this as closed, as our implementation is close enough as a practical implementation.