Skip to content

Instantly share code, notes, and snippets.

@spikeekips
Last active July 21, 2021 03:22
Show Gist options
  • Save spikeekips/ac490c4802fc90d2c988b87d24e92d27 to your computer and use it in GitHub Desktop.
Save spikeekips/ac490c4802fc90d2c988b87d24e92d27 to your computer and use it in GitHub Desktop.
mitum discovery command
package main
import (
"context"
"discovery/memberlist"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/alecthomas/kong"
"github.com/spikeekips/mitum/base"
"github.com/spikeekips/mitum/base/key"
"github.com/spikeekips/mitum/base/node"
"github.com/spikeekips/mitum/launch"
mitumcmds "github.com/spikeekips/mitum/launch/cmds"
"github.com/spikeekips/mitum/network"
quicnetwork "github.com/spikeekips/mitum/network/quic"
"github.com/spikeekips/mitum/util"
"github.com/spikeekips/mitum/util/cache"
"github.com/spikeekips/mitum/util/encoder"
jsonenc "github.com/spikeekips/mitum/util/encoder/json"
"github.com/spikeekips/mitum/util/logging"
"golang.org/x/xerrors"
)
var (
encs *encoder.Encoders
enc encoder.Encoder
log logging.Logger
)
func init() {
enc = jsonenc.NewEncoder()
encs = encoder.NewEncoders()
for _, e := range []encoder.Encoder{enc} {
if err := encs.AddEncoder(e); err != nil {
panic(err)
}
}
for i := range launch.EncoderTypes {
if err := encs.AddType(launch.EncoderTypes[i]); err != nil {
panic(err)
}
}
for i := range launch.EncoderHinters {
if err := encs.AddHinter(launch.EncoderHinters[i]); err != nil {
panic(err)
}
}
if err := encs.Initialize(); err != nil {
panic(err)
}
}
type Commands struct {
Run RunNodeCommand `cmd:"" name:"run"`
}
func main() {
kctx := kong.Parse(&Commands{
Run: NewRunNodeCommand(),
},
mitumcmds.LogVars,
)
kctx.FatalIfErrorf(kctx.Run())
}
type RunNodeCommand struct {
Node string `arg:""`
Privatekey string `arg:""`
Bind string `arg:""`
Network string `arg:""`
NetworkID string `arg:""`
Nodes []string `name:"node"`
NoLeave bool `name:"no-leave"`
*mitumcmds.LogFlags
local *node.Local
connInfo network.ConnInfo
network *url.URL
nodes []memberlist.ConnInfo
networkID base.NetworkID
}
func NewRunNodeCommand() RunNodeCommand {
return RunNodeCommand{
LogFlags: &mitumcmds.LogFlags{},
}
}
func (cmd *RunNodeCommand) Run() error {
if err := cmd.parseFlags(); err != nil {
return err
}
log.Info().Msg("memberlist discovery started")
defer log.Info().Msg("memberlist discovery stopped")
nt, err := cmd.quicServer()
if err != nil {
return err
}
dis, err := cmd.discovery()
if err != nil {
return err
}
_ = nt.Handler(memberlist.DefaultDiscoveryPath).Handler(dis.Handler(func(memberlist.NodeMessage) error {
return nil
}))
if err = nt.Start(); err != nil {
return err
}
defer func() {
_ = nt.Stop()
}()
if err := dis.Start(); err != nil {
return err
}
if err := dis.Join(cmd.nodes, 3); err != nil {
return err
}
go hookJoinedNodes(dis)
sctx, stopfunc := signal.NotifyContext(context.Background(),
syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP,
)
defer stopfunc()
<-sctx.Done()
if cmd.NoLeave {
return dis.GracefuleStop(time.Second * 10)
}
return dis.Stop()
}
func (cmd *RunNodeCommand) parseFlags() error {
i, err := mitumcmds.SetupLoggingFromFlags(cmd.LogFlags, os.Stdout)
if err != nil {
return err
}
log = i
no, err := base.DecodeAddressFromString(cmd.Node, enc)
if err != nil {
return xerrors.Errorf("failed to decode node: %w", err)
}
privatekey, err := key.DecodePrivatekey(enc, cmd.Privatekey)
if err != nil {
return xerrors.Errorf("failed to decode privatekey: %w", err)
}
connInfo, err := network.NormalizeNodeURL(cmd.Network)
if err != nil {
return xerrors.Errorf("wrong network url, %q: %w", cmd.Network, err)
}
cmd.connInfo = connInfo
cmd.network = connInfo.URL()
cmd.local = node.NewLocal(no, privatekey)
founds := map[string]bool{}
for i := range cmd.Nodes {
connInfo, err := network.NormalizeNodeURL(cmd.Nodes[i])
if err != nil {
return xerrors.Errorf("invalid discovery node, %q: %w", cmd.Nodes[i], err)
} else if _, found := founds[connInfo.URL().String()]; found {
continue
}
cmd.nodes = append(cmd.nodes, memberlist.NewConnInfoWithConnInfo("", connInfo))
}
n := strings.TrimSpace(cmd.NetworkID)
if n == "" {
return xerrors.Errorf("empty NetworkID")
}
cmd.networkID = base.NetworkID([]byte(n))
log.Debug().
Str("node", cmd.Node).
Str("privatekey", cmd.Privatekey).
Str("network_id", string(cmd.networkID)).
Interface("nodes", cmd.nodes).
Str("network", cmd.Network).
Str("network_parsed", cmd.network.String()).
Msg("flags")
return nil
}
func (cmd *RunNodeCommand) discovery() (*memberlist.Discovery, error) {
dis := memberlist.NewDiscovery(cmd.local, cmd.connInfo, cmd.networkID, enc)
_ = dis.SetLogger(log)
return dis, dis.Initialize()
}
func (cmd *RunNodeCommand) quicServer() (*quicnetwork.Server, error) {
ca, err := cache.NewCacheFromURI("gcache://")
if err != nil {
return nil, err
}
priv, err := util.GenerateED25519Privatekey()
if err != nil {
return nil, err
}
certs, err := util.GenerateTLSCerts(cmd.network.Host, priv)
if err != nil {
return nil, err
}
qs, err := quicnetwork.NewPrimitiveQuicServer(cmd.Bind, certs)
if err != nil {
return nil, err
}
nqs, err := quicnetwork.NewServer(qs, encs, enc, ca)
if err != nil {
return nil, err
}
if err := nqs.Initialize(); err != nil {
return nil, err
}
return nqs, nil
}
func hookJoinedNodes(dis *memberlist.Discovery) {
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
known := map[string]memberlist.ConnInfo{}
i := -1
for range ticker.C {
i++
newMap := map[string]memberlist.ConnInfo{}
var updated bool
ms := dis.Nodes()
for j := range ms {
n := ms[j].(memberlist.NodeConnInfo).ConnInfo
newMap[n.Address] = n
if _, found := known[n.Address]; !found {
updated = true
}
}
if i%30 == 0 || updated || len(newMap) != len(known) {
known = newMap
log.Debug().Interface("nodes", newMap).Msg("joined nodes")
i = 1
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment