Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save oiooj/a6c54ac9d5473f7def63b6bd40ab899c to your computer and use it in GitHub Desktop.

Select an option

Save oiooj/a6c54ac9d5473f7def63b6bd40ab899c to your computer and use it in GitHub Desktop.
Raft Example
package main
import (
"bytes"
"fmt"
"math"
"strings"
"time"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"golang.org/x/net/context"
)
const hb = 1
type node struct {
id int
store *raft.MemoryStorage
pstore map[string]string
ctx context.Context
ctxCancel context.CancelFunc
cfg *raft.Config
node raft.Node
ticker <-chan time.Time
done <-chan struct{}
}
func newNode(id int, peers []raft.Peer) *node {
n := &node{}
ctx, cancel := context.WithCancel(context.Background())
n.ctx = ctx
n.ctxCancel = cancel
n.store = raft.NewMemoryStorage()
n.cfg = &raft.Config{
ID: uint64(id),
ElectionTick: 10 * hb,
HeartbeatTick: hb,
Storage: n.store,
MaxSizePerMsg: math.MaxUint16,
MaxInflightMsgs: 256,
}
n.pstore = make(map[string]string)
n.id = id
n.node = raft.StartNode(n.cfg, peers)
return n
}
func (n *node) run() {
n.ticker = time.Tick(time.Second)
for {
select {
case <-n.ticker:
n.node.Tick()
case rd := <-n.node.Ready():
n.saveToStorage(rd, rd.Entries, rd.Snapshot)
n.send(rd.Messages)
if !raft.IsEmptySnap(rd.Snapshot) {
n.processSnapshot(rd.Snapshot)
}
for _, entry := range rd.CommittedEntries {
n.process(entry)
if entry.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
n.node.ApplyConfChange(cc)
}
}
n.node.Advance()
case <-n.done:
return
}
}
}
func (n *node) saveToStorage(rd raft.Ready, entries []raftpb.Entry, snapshot raftpb.Snapshot) {
fmt.Println("Storing entries, node =", n.id)
n.store.Append(entries)
if !raft.IsEmptyHardState(rd.HardState) {
fmt.Println("Setting hard state, node =", n.id)
n.store.SetHardState(rd.HardState)
}
if !raft.IsEmptySnap(snapshot) {
fmt.Println("Applying snapshot, node =", n.id)
n.store.ApplySnapshot(snapshot)
}
}
func (n *node) send(messages []raftpb.Message) {
fmt.Println("*** Messages from NODE ", n.id, "***")
fmt.Println("Count:", len(messages))
for _, m := range messages {
fmt.Println(raft.DescribeMessage(m, nil))
// send message to other node
nodes[int(m.To)].receive(n.ctx, m)
}
fmt.Println("***************")
}
func (n *node) processSnapshot(snapshot raftpb.Snapshot) {
fmt.Println("Applying snapshot on", n.id, ":", snapshot)
n.store.ApplySnapshot(snapshot)
}
func (n *node) process(entry raftpb.Entry) {
fmt.Println("processing entry on ", n.id, ":", entry)
if entry.Type == raftpb.EntryNormal && entry.Data != nil {
fmt.Println("normal message:", string(entry.Data))
parts := bytes.SplitN(entry.Data, []byte(":"), 2)
fmt.Println(string(parts[0]), " = ", string(parts[1]))
n.pstore[string(parts[0])] = string(parts[1])
}
}
func (n *node) receive(ctx context.Context, message raftpb.Message) {
fmt.Println("Received message, node =", n.id)
n.node.Step(ctx, message)
}
var (
nodes = make(map[int]*node)
)
func main() {
nodes[1] = newNode(1, []raft.Peer{{ID: 1}, {ID: 2}, {ID: 3}})
nodes[1].node.Campaign(nodes[1].ctx)
go nodes[1].run()
nodes[2] = newNode(2, []raft.Peer{{ID: 1}, {ID: 2}, {ID: 3}})
go nodes[2].run()
nodes[3] = newNode(3, []raft.Peer{{ID: 1}, {ID: 2}, {ID: 3}})
go nodes[3].run()
// Wait for leader, is there a better way to do this
for nodes[1].node.Status().Lead != 1 {
time.Sleep(100 * time.Millisecond)
}
nodes[2].node.Propose(nodes[2].ctx, []byte("mykey:myvalue"))
// Wait for proposed entry to be commited in cluster
// Probably a better way to check this
time.Sleep(100 * time.Millisecond)
fmt.Println(strings.Repeat("#", 20))
for i, node := range nodes {
fmt.Println("Node", i)
for k, v := range node.pstore {
fmt.Println(k, " = ", v)
}
fmt.Println("")
}
fmt.Println(strings.Repeat("#", 20))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment