Closed teslashibe closed 3 months ago
@mudler , @restevens402 and I added this into the future release as part of refactoring and cleaning up of the P2P layer
This is tangentially related to https://github.com/masa-finance/masa-oracle/issues/427. https://github.com/masa-finance/masa-oracle/issues/427 talks about who can be validator and who can't - in this case instead we want to make sure that only validators can write nodeData
.
However when I spoke with @jdutchak it looks like nodeData
isn't part of the ledger but only constructed by the node - isn't just an internal view of the node state?
This seems also to be depending on having a consensus algorithm first ( https://github.com/masa-finance/masa-oracle/issues/496 )
Implement a hybrid solution for a distributed data storage and synchronization system using BadgerDB for local storage, libp2p DHT for distributed data access, a custom Data Submission Protocol for submitting 'nodeData', and Raft consensus for coordinating writes among validator nodes, with a focus on maintaining consistent nodeData
across the network.
import "github.com/dgraph-io/badger/v3"
func initBadgerDB(path string) (*badger.DB, error) {
opts := badger.DefaultOptions(path)
return badger.Open(opts)
}
import (
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
)
func setupDHT(ctx context.Context, host host.Host) (*dht.IpfsDHT, error) {
kdht, err := dht.New(ctx, host)
if err != nil {
return nil, err
}
if err = kdht.Bootstrap(ctx); err != nil {
return nil, err
}
return kdht, nil
}
const DataSubmissionProtocol = "/masa/data-submission/1.0.0"
func (vn *ValidatorNode) setupDataSubmissionProtocol() {
vn.host.SetStreamHandler(protocol.ID(DataSubmissionProtocol), vn.handleDataSubmission)
}
func (vn *ValidatorNode) handleDataSubmission(stream network.Stream) {
// Read data from stream
// Validate data
// Process data through Raft consensus
}
import (
"github.com/hashicorp/raft"
)
func setupRaftCluster(nodeID string, fsm raft.FSM, transport raft.Transport, peers []string) (*raft.Raft, error) {
config := raft.DefaultConfig()
config.LocalID = raft.ServerID(nodeID)
// Calculate the number of peers needed for quorum
peerCount := len(peers) + 1 // Include self
quorum := (peerCount / 2) + 1
logrus.Infof("Raft cluster size: %d, Quorum required: %d", peerCount, quorum)
// Configure the cluster
configuration := raft.Configuration{}
for _, peerID := range peers {
server := raft.Server{
ID: raft.ServerID(peerID),
Address: raft.ServerAddress(peerID),
}
configuration.Servers = append(configuration.Servers, server)
}
// Add self to the configuration
configuration.Servers = append(configuration.Servers, raft.Server{
ID: raft.ServerID(nodeID),
Address: raft.ServerAddress(nodeID),
})
// Create the Raft instance
r, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshotStore, transport)
if err != nil {
return nil, err
}
// Bootstrap the cluster if this is the first run
future := r.BootstrapCluster(configuration)
if err := future.Error(); err != nil && err != raft.ErrCantBootstrap {
return nil, err
}
return r, nil
}
// In ValidatorNode initialization
peers := []string{"peer1ID", "peer2ID", "peer3ID"} // Get this list dynamically
raftInstance, err := setupRaftCluster(vn.nodeData.PeerId.String(), fsm, transport, peers)
if err != nil {
return nil, fmt.Errorf("failed to set up Raft cluster: %v", err)
}
vn.raft = raftInstance
type ValidatorNode struct {
host host.Host
dht *dht.IpfsDHT
db *badger.DB
raft *raft.Raft
raftConfig *raft.Config
nodeData *pubsub.NodeData
}
func (vn *ValidatorNode) handleDataSubmission(stream network.Stream) {
// Read and validate data
// ...
// Prepare Raft command
cmd := Command{
Op: "set",
Key: getKey(data),
Value: data,
}
cmdData, _ := json.Marshal(cmd)
// Propose to Raft cluster
future := vn.raft.Apply(cmdData, 5*time.Second)
if err := future.Error(); err != nil {
// Handle error
return
}
// Publish to DHT
vn.dht.PutValue(context.Background(), getKey(data), data)
}
func (vn *ValidatorNode) startPeriodicSync() {
ticker := time.NewTicker(5 * time.Minute)
for range ticker.C {
vn.syncWithDHT()
}
}
func (vn *ValidatorNode) syncWithDHT() {
// Iterate through BadgerDB and update DHT
err := vn.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
k := item.Key()
err := item.Value(func(v []byte) error {
return vn.dht.PutValue(context.Background(), k, v)
})
if err != nil {
return err
}
}
return nil
})
if err != nil {
logrus.Errorf("Error syncing with DHT: %v", err)
}
}
import "github.com/sirupsen/logrus"
func init() {
logrus.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
})
logrus.SetLevel(logrus.InfoLevel)
}
// Use throughout the code
logrus.Infof("Handling node data for: %s", data.PeerId)
logrus.Errorf("Error updating node data: %v", err)
func TestValidatorNodeSync(t *testing.T) {
// Set up test ValidatorNode
// ...
// Add test data
testData := []byte("test data")
vn.handleDataSubmission(testData)
// Force sync
vn.syncWithDHT()
// Verify data in DHT
value, err := vn.dht.GetValue(context.Background(), getKey(testData))
assert.NoError(t, err)
assert.Equal(t, testData, value)
}
func (vn *ValidatorNode) setupLeaderElection() {
go func() {
for {
select {
case isLeader := <-vn.raft.LeaderCh():
if isLeader {
logrus.Info("This node has been elected leader")
vn.becomeLeader()
} else {
logrus.Info("This node is no longer the leader")
vn.stepDownAsLeader()
}
}
}
}()
}
func (vn *ValidatorNode) becomeLeader() {
// Implement leader-specific logic
}
func (vn *ValidatorNode) stepDownAsLeader() {
// Implement follower-specific logic
}
import "github.com/prometheus/client_golang/prometheus"
var (
raftApplyDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "raft_apply_duration_seconds",
Help: "Duration of Raft apply operations",
})
)
func init() {
prometheus.MustRegister(raftApplyDuration)
}
func (vn *ValidatorNode) handleDataSubmission(stream network.Stream) {
// ... existing code ...
timer := prometheus.NewTimer(raftApplyDuration)
future := vn.raft.Apply(cmdData, 5*time.Second)
timer.ObserveDuration()
// ... rest of the code ...
}
func (vn *ValidatorNode) discoveryHandler(peer peer.AddrInfo) {
if vn.shouldAddPeer(peer) {
vn.raft.AddVoter(raft.ServerID(peer.ID.String()), raft.ServerAddress(peer.Addrs[0].String()), 0, 0)
}
}
func (vn *ValidatorNode) shouldAddPeer(peer peer.AddrInfo) bool {
// Implement logic to decide if a peer should be added to the Raft cluster
}
import "github.com/libp2p/go-libp2p/p2p/security/tls"
func setupSecureHost() (host.Host, error) {
priv, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1)
if err != nil {
return nil, err
}
tlsTransport, err := tls.New(priv)
if err != nil {
return nil, err
}
return libp2p.New(
libp2p.Identity(priv),
libp2p.Security(tls.ID, tlsTransport),
)
}
type DataMigrator struct {
currentVersion int
migrations map[int]func(*badger.Txn) error
}
func (dm *DataMigrator) Migrate(db *badger.DB) error {
return db.Update(func(txn *badger.Txn) error {
for v := dm.currentVersion + 1; v <= len(dm.migrations); v++ {
if err := dm.migrations[v](txn); err != nil {
return err
}
dm.currentVersion = v
}
return nil
})
}
import "go.opentelemetry.io/otel"
func (vn *ValidatorNode) handleDataSubmission(stream network.Stream) {
ctx, span := otel.Tracer("validator").Start(context.Background(), "handle_data_submission")
defer span.End()
// ... existing code with added tracing ...
}
func (vn *ValidatorNode) healthCheck(w http.ResponseWriter, r *http.Request) {
status := struct {
BadgerDB string `json:"badgerdb"`
Raft string `json:"raft"`
DHT string `json:"dht"`
}{
BadgerDB: vn.checkBadgerDB(),
Raft: vn.checkRaft(),
DHT: vn.checkDHT(),
}
json.NewEncoder(w).Encode(status)
}
func (vn *ValidatorNode) adjustClusterSize(newSize int) error {
currentSize := len(vn.raft.GetConfiguration().Configuration().Servers)
if newSize == currentSize {
return nil
}
if newSize > currentSize {
// Add new nodes
for i := currentSize; i < newSize; i++ {
// Logic to add a new node
}
} else {
// Remove nodes
for i := currentSize; i > newSize; i-- {
// Logic to remove a node
}
}
return nil
}
Implement a solution to maintain consistent nodeData across the network using LevelDB for local storage, libp2p DHT for distributed data access, and Raft consensus for coordinating writes among validator nodes, while maintaining a clear distinction between validator nodes and peers.
An example version of the LevelDB wrapper that removes caching and focuses on direct storage:
import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
type LevelDBWrapper struct {
db *leveldb.DB
}
func NewLevelDBWrapper(path string) (*LevelDBWrapper, error) {
opts := &opt.Options{
WriteBuffer: 64 * 1024 * 1024, // 64MB
CompactionTableSize: 2 * 1024 * 1024, // 2MB
CompactionTotalSize: 10 * 1024 * 1024, // 10MB
MaxOpenFiles: 1000,
DisableSeeksCompaction: true,
}
db, err := leveldb.OpenFile(path, opts)
if err != nil {
return nil, err
}
return &LevelDBWrapper{db: db}, nil
}
func (l *LevelDBWrapper) Put(key []byte, value []byte) error {
return l.db.Put(key, value, nil)
}
func (l *LevelDBWrapper) Get(key []byte) ([]byte, error) {
return l.db.Get(key, nil)
}
func (l *LevelDBWrapper) Delete(key []byte) error {
return l.db.Delete(key, nil)
}
func (l *LevelDBWrapper) Close() error {
return l.db.Close()
}
import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
func NewLevelDBWrapper(path string) (*LevelDBWrapper, error) {
opts := &opt.Options{
BlockCacheCapacity: 32 * 1024 * 1024, // 32MB
WriteBuffer: 64 * 1024 * 1024, // 64MB
CompactionTableSize: 2 * 1024 * 1024, // 2MB
CompactionTotalSize: 10 * 1024 * 1024, // 10MB
MaxOpenFiles: 1000,
DisableSeeksCompaction: true,
}
db, err := leveldb.OpenFile(path, opts)
if err != nil {
return nil, err
}
return &LevelDBWrapper{db: db}, nil
}
import (
"github.com/hashicorp/raft"
)
type LevelDBFSM struct {
db *LevelDBWrapper
nodeData *pubsub.NodeData
}
func (l *LevelDBFSM) Apply(log *raft.Log) interface{} {
var c Command
if err := json.Unmarshal(log.Data, &c); err != nil {
return err
}
switch c.Op {
case "set":
err := l.db.Put(context.Background(), datastore.NewKey(c.Key), c.Value)
if err != nil {
return err
}
return l.updateNodeData(c.Key, c.Value)
case "updateNodeData":
var nodeData pubsub.NodeData
if err := json.Unmarshal(c.Value, &nodeData); err != nil {
return err
}
return l.updateNodeData(c.Key, c.Value)
// ... other operations ...
}
}
func (l *LevelDBFSM) updateNodeData(key string, value []byte) error {
// Update nodeData based on the key and value
return nil
}
type ValidatorNode struct {
host host.Host
dht *dht.IpfsDHT
levelDB *LevelDBWrapper
raft *raft.Raft
raftConfig *raft.Config
nodeData *pubsub.NodeData
isValidator bool
}
const DataSubmissionProtocol = "/masa/data-submission/1.0.0"
func (vn *ValidatorNode) setupDataSubmissionProtocol() {
vn.host.SetStreamHandler(protocol.ID(DataSubmissionProtocol), vn.handleDataSubmission)
}
func (vn *ValidatorNode) handleDataSubmission(stream network.Stream) {
defer stream.Close()
// Authenticate the peer
if !vn.authenticatePeer(stream.Conn().RemotePeer()) {
logrus.Errorf("Unauthorized data submission attempt from %s", stream.Conn().RemotePeer())
return
}
var data pubsub.NodeData
if err := json.NewDecoder(stream).Decode(&data); err != nil {
logrus.Errorf("Error decoding submitted data: %v", err)
return
}
if err := vn.validateSubmittedData(&data); err != nil {
logrus.Errorf("Invalid data submission: %v", err)
return
}
if err := vn.processNodeDataUpdate(&data); err != nil {
logrus.Errorf("Error processing node data update: %v", err)
return
}
if err := json.NewEncoder(stream).Encode("ACK"); err != nil {
logrus.Errorf("Error sending ACK: %v", err)
}
}
func (vn *ValidatorNode) authenticatePeer(peerID peer.ID) bool {
// Implement authentication logic
return true
}
func (vn *ValidatorNode) validateSubmittedData(data *pubsub.NodeData) error {
// Implement validation logic
return nil
}
func (vn *ValidatorNode) processNodeDataUpdate(data *pubsub.NodeData) error {
cmd := Command{
Op: "updateNodeData",
Key: data.PeerId,
Value: data,
}
cmdData, _ := json.Marshal(cmd)
future := vn.raft.Apply(cmdData, 5*time.Second)
return future.Error()
}
type PeerNode struct {
host host.Host
dht *dht.IpfsDHT
nodeData *pubsub.NodeData
}
func (pn *PeerNode) submitNodeData(ctx context.Context) error {
validatorPeer, err := pn.findValidator()
if err != nil {
return fmt.Errorf("failed to find a validator: %w", err)
}
stream, err := pn.host.NewStream(ctx, validatorPeer, protocol.ID(DataSubmissionProtocol))
if err != nil {
return fmt.Errorf("failed to open stream to validator: %w", err)
}
defer stream.Close()
if err := json.NewEncoder(stream).Encode(pn.nodeData); err != nil {
return fmt.Errorf("failed to send node data: %w", err)
}
var ack string
if err := json.NewDecoder(stream).Decode(&ack); err != nil {
return fmt.Errorf("failed to receive ACK: %w", err)
}
if ack != "ACK" {
return fmt.Errorf("unexpected response from validator: %s", ack)
}
return nil
}
func (pn *PeerNode) findValidator() (peer.ID, error) {
// Query the DHT for validator nodes
validatorKey := "/masa/validators"
validatorPeers, err := pn.dht.GetValues(context.Background(), validatorKey)
if err != nil {
return "", fmt.Errorf("failed to find validators: %w", err)
}
// Select a random validator from the list
if len(validatorPeers) == 0 {
return "", fmt.Errorf("no validators found")
}
randomIndex := rand.Intn(len(validatorPeers))
return peer.ID(validatorPeers[randomIndex]), nil
}
func WriteData(node *ValidatorNode, key string, value []byte) error {
cmd := Command{
Op: "set",
Key: key,
Value: value,
}
cmdData, _ := json.Marshal(cmd)
future := node.raft.Apply(cmdData, 5*time.Second)
if err := future.Error(); err != nil {
return fmt.Errorf("failed to apply Raft log: %w", err)
}
return nil
}
func iterateAndPublish(ctx context.Context, node *ValidatorNode) {
snapshot, err := node.raft.Snapshot()
if err != nil {
logrus.Errorf("Error getting Raft snapshot: %v", err)
return
}
defer snapshot.Close()
snapshotData, err := io.ReadAll(snapshot)
if err != nil {
logrus.Errorf("Error reading Raft snapshot: %v", err)
return
}
var records []Record
if err := json.Unmarshal(snapshotData, &records); err != nil {
logrus.Errorf("Error unmarshalling snapshot data: %v", err)
return
}
for _, record := range records {
if err := node.dht.PutValue(ctx, record.Key, record.Value); err != nil {
logrus.Debugf("Error publishing to DHT: %v", err)
}
}
}
func monitorNodeData(ctx context.Context, node *ValidatorNode) {
syncInterval := time.Second * 60
ticker := time.NewTicker(syncInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
nodeData := node.nodeData
jsonData, _ := json.Marshal(nodeData)
if err := WriteData(node, node.Host.ID().String(), jsonData); err != nil {
logrus.Errorf("Error writing node data: %v", err)
}
if err := node.dht.PutValue(ctx, node.Host.ID().String(), jsonData); err != nil {
logrus.Errorf("Error publishing node data to DHT: %v", err)
}
case <-ctx.Done():
return
}
}
}
func (vn *ValidatorNode) addNodeToCluster(nodeID string, addr string) error {
return vn.raft.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(addr), 0, 0).Error()
}
func (vn *ValidatorNode) removeNodeFromCluster(nodeID string) error {
return vn.raft.RemoveServer(raft.ServerID(nodeID), 0, 0).Error()
}
func (vn *ValidatorNode) registerAsValidator() error {
validatorKey := "/masa/validators"
return vn.dht.PutValue(context.Background(), validatorKey, []byte(vn.host.ID().String()))
}
func (pn *PeerNode) findValidators() ([]peer.ID, error) {
validatorKey := "/masa/validators"
validatorPeers, err := pn.dht.GetValues(context.Background(), validatorKey)
if err != nil {
return nil, fmt.Errorf("failed to find validators: %w", err)
}
var validators []peer.ID
for _, peerID := range validatorPeers {
validators = append(validators, peer.ID(peerID))
}
return validators, nil
}
func (vn *ValidatorNode) authenticatePeer(peerID peer.ID) bool {
// Implement authentication logic, e.g., check against a whitelist or verify a signature
return true // Placeholder
}
func (pn *PeerNode) encryptData(data []byte) ([]byte, error) {
// Implement encryption logic
return data, nil // Placeholder
}
func (vn *ValidatorNode) decryptData(data []byte) ([]byte, error) {
// Implement decryption logic
return data, nil // Placeholder
}
This spike outlines the process of transitioning our decentralized data protocol from Raft consensus to HotStuff consensus. The goal is to improve scalability, performance, and Byzantine fault tolerance while maintaining the existing functionality and integration with LevelDB and libp2p DHT.
import (
"crypto"
"github.com/gohotstuff/hotstuff" // Hypothetical HotStuff library
)
type HotStuffEngine struct {
privateKey crypto.PrivateKey
publicKey crypto.PublicKey
validators map[string]*Validator
currentView uint64
highestQC *QuorumCertificate
pendingBlock *Block
committedBlocks []*Block
fsm *LevelDBFSM
}
func NewHotStuffEngine(privateKey crypto.PrivateKey, validators map[string]*Validator, fsm *LevelDBFSM) *HotStuffEngine {
// Initialize HotStuff engine
}
func (e *HotStuffEngine) ProposeBlock(transactions []Transaction) (*Block, error) {
// Leader creates and proposes a new block
}
func (e *HotStuffEngine) ValidateBlock(block *Block) bool {
// Validators check the validity of the proposed block
}
type QuorumCertificate struct {
BlockHash []byte
View uint64
Signatures map[string][]byte
}
func (e *HotStuffEngine) CreateQC(block *Block) (*QuorumCertificate, error) {
// Create and gather signatures for a QC
}
type ValidatorNode struct {
host host.Host
dht *dht.IpfsDHT
levelDB *LevelDBWrapper
hotStuff *HotStuffEngine
nodeData *pubsub.NodeData
isValidator bool
}
func (vn *ValidatorNode) StartConsensus() error {
// Initialize and start HotStuff consensus
}
type LevelDBFSM struct {
db *LevelDBWrapper
nodeData *pubsub.NodeData
}
func (l *LevelDBFSM) ApplyBlock(block *Block) error {
// Apply the transactions in the committed block to LevelDB
for _, tx := range block.Transactions {
if err := l.db.Put(tx.Key, tx.Value); err != nil {
return err
}
}
return l.updateNodeData(block)
}
func (l *LevelDBFSM) updateNodeData(block *Block) error {
// Update nodeData based on the transactions in the block
return nil
}
func WriteData(node *ValidatorNode, key string, value []byte) error {
transaction := Transaction{Key: key, Value: value}
block, err := node.hotStuff.ProposeBlock([]Transaction{transaction})
if err != nil {
return fmt.Errorf("failed to propose block: %w", err)
}
// Wait for the block to be committed
committed := <-node.hotStuff.CommitChan
if committed.Hash() != block.Hash() {
return fmt.Errorf("proposed block was not committed")
}
return nil
}
func (vn *ValidatorNode) SyncView() error {
// Implement view synchronization logic
return nil
}
func iterateAndPublish(ctx context.Context, node *ValidatorNode) {
latestBlock := node.hotStuff.GetLatestCommittedBlock()
for _, tx := range latestBlock.Transactions {
if err := node.dht.PutValue(ctx, tx.Key, tx.Value); err != nil {
logrus.Debugf("Error publishing to DHT: %v", err)
}
}
}
func (e *HotStuffEngine) DetectByzantineBehavior(node *ValidatorNode) bool {
// Implement Byzantine behavior detection
return false
}
func (e *HotStuffEngine) InitiateViewChange() error {
// Implement view-change protocol
return nil
}
func (vn *ValidatorNode) AddNodeToCluster(nodeID string, pubKey crypto.PublicKey) error {
return vn.hotStuff.AddValidator(nodeID, pubKey)
}
func (vn *ValidatorNode) RemoveNodeFromCluster(nodeID string) error {
return vn.hotStuff.RemoveValidator(nodeID)
}
type ConsensusEngine interface {
ProposeBlock(transactions []Transaction) (*Block, error)
ValidateBlock(block *Block) bool
// Other common methods
}
func (vn *ValidatorNode) NegotiateConsensusVersion(peer peer.ID) (string, error) {
// Implement version negotiation logic
return "hotstuff", nil
}
@teslashibe I'm a bit confused by the comments - this card as it is formulated would be an implementation one, can't be picked up until we groom out https://github.com/masa-finance/masa-oracle/issues/496 and we agree on what should look like, no?
For instance, I found good reads about hotstuff here: https://decentralizedthoughts.github.io/2021-07-17-simplifying-raft-with-chaining/ but there seems to be no implementation already done on top of libp2p (while for raft-only, there is) - but that would require its own spike around after we work on having a basic form of consensus
Update: re raft/libp2p, I've pushed https://github.com/mudler/go-libp2p-simple-raft as an example on how to integrate with libp2p directly the raft library and updated the consensus ticket accordingly
Yeah agree on your comments @mudler we do need to groom #496 first and decide there what we will use for consensus.
output from planning:
I think we have covered already the spike with @teslashibe 's investigations above. I'm now closing this one with follow ups tracked in: https://github.com/masa-finance/masa-oracle/issues/496 and https://github.com/masa-finance/masa-oracle/issues/427
Problem
nodeData
is currently updated from goroutines in thenodeData
and should be limited to only trusted gossip and connections from a validator. This means that only a validator can update nodeData on the network. It is important to note that the only time node flags should be updated is when the node joins the network. This means that any node can spoof an update by gossiping adversarialnodeData
to the network.Note
Currently it appears that the validator is constantly refreshing node data to all peers in a broadcast like process. Instead of the validator broadcasting once with a signed and timestamped (when it was sent by the validator - sign plus send)
nodeData
that is then gossiped between nodes.