-
-
Save oiooj/a6c54ac9d5473f7def63b6bd40ab899c to your computer and use it in GitHub Desktop.
Raft Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package main | |
| import ( | |
| "bytes" | |
| "fmt" | |
| "math" | |
| "strings" | |
| "time" | |
| "github.com/coreos/etcd/raft" | |
| "github.com/coreos/etcd/raft/raftpb" | |
| "golang.org/x/net/context" | |
| ) | |
| const hb = 1 | |
| type node struct { | |
| id int | |
| store *raft.MemoryStorage | |
| pstore map[string]string | |
| ctx context.Context | |
| ctxCancel context.CancelFunc | |
| cfg *raft.Config | |
| node raft.Node | |
| ticker <-chan time.Time | |
| done <-chan struct{} | |
| } | |
| func newNode(id int, peers []raft.Peer) *node { | |
| n := &node{} | |
| ctx, cancel := context.WithCancel(context.Background()) | |
| n.ctx = ctx | |
| n.ctxCancel = cancel | |
| n.store = raft.NewMemoryStorage() | |
| n.cfg = &raft.Config{ | |
| ID: uint64(id), | |
| ElectionTick: 10 * hb, | |
| HeartbeatTick: hb, | |
| Storage: n.store, | |
| MaxSizePerMsg: math.MaxUint16, | |
| MaxInflightMsgs: 256, | |
| } | |
| n.pstore = make(map[string]string) | |
| n.id = id | |
| n.node = raft.StartNode(n.cfg, peers) | |
| return n | |
| } | |
| func (n *node) run() { | |
| n.ticker = time.Tick(time.Second) | |
| for { | |
| select { | |
| case <-n.ticker: | |
| n.node.Tick() | |
| case rd := <-n.node.Ready(): | |
| n.saveToStorage(rd, rd.Entries, rd.Snapshot) | |
| n.send(rd.Messages) | |
| if !raft.IsEmptySnap(rd.Snapshot) { | |
| n.processSnapshot(rd.Snapshot) | |
| } | |
| for _, entry := range rd.CommittedEntries { | |
| n.process(entry) | |
| if entry.Type == raftpb.EntryConfChange { | |
| var cc raftpb.ConfChange | |
| cc.Unmarshal(entry.Data) | |
| n.node.ApplyConfChange(cc) | |
| } | |
| } | |
| n.node.Advance() | |
| case <-n.done: | |
| return | |
| } | |
| } | |
| } | |
| func (n *node) saveToStorage(rd raft.Ready, entries []raftpb.Entry, snapshot raftpb.Snapshot) { | |
| fmt.Println("Storing entries, node =", n.id) | |
| n.store.Append(entries) | |
| if !raft.IsEmptyHardState(rd.HardState) { | |
| fmt.Println("Setting hard state, node =", n.id) | |
| n.store.SetHardState(rd.HardState) | |
| } | |
| if !raft.IsEmptySnap(snapshot) { | |
| fmt.Println("Applying snapshot, node =", n.id) | |
| n.store.ApplySnapshot(snapshot) | |
| } | |
| } | |
| func (n *node) send(messages []raftpb.Message) { | |
| fmt.Println("*** Messages from NODE ", n.id, "***") | |
| fmt.Println("Count:", len(messages)) | |
| for _, m := range messages { | |
| fmt.Println(raft.DescribeMessage(m, nil)) | |
| // send message to other node | |
| nodes[int(m.To)].receive(n.ctx, m) | |
| } | |
| fmt.Println("***************") | |
| } | |
| func (n *node) processSnapshot(snapshot raftpb.Snapshot) { | |
| fmt.Println("Applying snapshot on", n.id, ":", snapshot) | |
| n.store.ApplySnapshot(snapshot) | |
| } | |
| func (n *node) process(entry raftpb.Entry) { | |
| fmt.Println("processing entry on ", n.id, ":", entry) | |
| if entry.Type == raftpb.EntryNormal && entry.Data != nil { | |
| fmt.Println("normal message:", string(entry.Data)) | |
| parts := bytes.SplitN(entry.Data, []byte(":"), 2) | |
| fmt.Println(string(parts[0]), " = ", string(parts[1])) | |
| n.pstore[string(parts[0])] = string(parts[1]) | |
| } | |
| } | |
| func (n *node) receive(ctx context.Context, message raftpb.Message) { | |
| fmt.Println("Received message, node =", n.id) | |
| n.node.Step(ctx, message) | |
| } | |
| var ( | |
| nodes = make(map[int]*node) | |
| ) | |
| func main() { | |
| nodes[1] = newNode(1, []raft.Peer{{ID: 1}, {ID: 2}, {ID: 3}}) | |
| nodes[1].node.Campaign(nodes[1].ctx) | |
| go nodes[1].run() | |
| nodes[2] = newNode(2, []raft.Peer{{ID: 1}, {ID: 2}, {ID: 3}}) | |
| go nodes[2].run() | |
| nodes[3] = newNode(3, []raft.Peer{{ID: 1}, {ID: 2}, {ID: 3}}) | |
| go nodes[3].run() | |
| // Wait for leader, is there a better way to do this | |
| for nodes[1].node.Status().Lead != 1 { | |
| time.Sleep(100 * time.Millisecond) | |
| } | |
| nodes[2].node.Propose(nodes[2].ctx, []byte("mykey:myvalue")) | |
| // Wait for proposed entry to be commited in cluster | |
| // Probably a better way to check this | |
| time.Sleep(100 * time.Millisecond) | |
| fmt.Println(strings.Repeat("#", 20)) | |
| for i, node := range nodes { | |
| fmt.Println("Node", i) | |
| for k, v := range node.pstore { | |
| fmt.Println(k, " = ", v) | |
| } | |
| fmt.Println("") | |
| } | |
| fmt.Println(strings.Repeat("#", 20)) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment