package main import ( "context" "net/url" "os" "os/signal" "strings" "syscall" "time" "github.com/alecthomas/kong" "github.com/spikeekips/mitum/base" "github.com/spikeekips/mitum/base/key" "github.com/spikeekips/mitum/base/node" "github.com/spikeekips/mitum/launch" mitumcmds "github.com/spikeekips/mitum/launch/cmds" "github.com/spikeekips/mitum/network" "github.com/spikeekips/mitum/network/discovery" "github.com/spikeekips/mitum/network/discovery/memberlist" quicnetwork "github.com/spikeekips/mitum/network/quic" "github.com/spikeekips/mitum/util" "github.com/spikeekips/mitum/util/cache" "github.com/spikeekips/mitum/util/encoder" jsonenc "github.com/spikeekips/mitum/util/encoder/json" "github.com/spikeekips/mitum/util/logging" "golang.org/x/xerrors" ) var ( encs *encoder.Encoders enc encoder.Encoder log logging.Logger ) func init() { enc = jsonenc.NewEncoder() encs = encoder.NewEncoders() for _, e := range []encoder.Encoder{enc} { if err := encs.AddEncoder(e); err != nil { panic(err) } } for i := range launch.EncoderTypes { if err := encs.AddType(launch.EncoderTypes[i]); err != nil { panic(err) } } for i := range launch.EncoderHinters { if err := encs.AddHinter(launch.EncoderHinters[i]); err != nil { panic(err) } } if err := encs.Initialize(); err != nil { panic(err) } } type Commands struct { Run RunNodeCommand `cmd:"" name:"run"` } func main() { kctx := kong.Parse(&Commands{ Run: NewRunNodeCommand(), }, mitumcmds.LogVars, ) kctx.FatalIfErrorf(kctx.Run()) } type RunNodeCommand struct { Node string `arg:""` Privatekey string `arg:""` Bind string `arg:""` Network string `arg:""` NetworkID string `arg:""` Nodes []string `name:"node"` NoLeave bool `name:"no-leave"` *mitumcmds.LogFlags local *node.Local connInfo network.ConnInfo network *url.URL nodes []memberlist.ConnInfo networkID base.NetworkID } func NewRunNodeCommand() RunNodeCommand { return RunNodeCommand{ LogFlags: &mitumcmds.LogFlags{}, } } func (cmd *RunNodeCommand) Run() error { if err := cmd.parseFlags(); err != nil { return err } log.Info().Msg("memberlist discovery started") defer log.Info().Msg("memberlist discovery stopped") nt, err := cmd.quicServer() if err != nil { return err } dis, err := cmd.discovery() if err != nil { return err } _ = nt.Handler(memberlist.DefaultDiscoveryPath).Handler(dis.Handler(func(memberlist.NodeMessage) error { return nil })) if err = nt.Start(); err != nil { return err } defer func() { _ = nt.Stop() }() if err := dis.Start(); err != nil { return err } var joiningCanceled bool if err := dis.Join(cmd.nodes, 3); err != nil { if !xerrors.Is(err, memberlist.JoiningCanceledError) { return err } joiningCanceled = true } if joiningCanceled { go keepTryingJoining(dis, cmd.nodes) } go hookJoinedNodes(dis) sctx, stopfunc := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP, ) defer stopfunc() <-sctx.Done() if cmd.NoLeave { return dis.GracefuleStop(time.Second * 10) } return dis.Stop() } func (cmd *RunNodeCommand) parseFlags() error { i, err := mitumcmds.SetupLoggingFromFlags(cmd.LogFlags, os.Stdout) if err != nil { return err } log = i no, err := base.DecodeAddressFromString(cmd.Node, enc) if err != nil { return xerrors.Errorf("failed to decode node: %w", err) } privatekey, err := key.DecodePrivatekey(enc, cmd.Privatekey) if err != nil { return xerrors.Errorf("failed to decode privatekey: %w", err) } connInfo, err := network.NormalizeNodeURL(cmd.Network) if err != nil { return xerrors.Errorf("wrong network url, %q: %w", cmd.Network, err) } cmd.connInfo = connInfo cmd.network = connInfo.URL() cmd.local = node.NewLocal(no, privatekey) founds := map[string]bool{} for i := range cmd.Nodes { connInfo, err := network.NormalizeNodeURL(cmd.Nodes[i]) if err != nil { return xerrors.Errorf("invalid discovery node, %q: %w", cmd.Nodes[i], err) } else if _, found := founds[connInfo.URL().String()]; found { continue } cmd.nodes = append(cmd.nodes, memberlist.NewConnInfoWithConnInfo("", connInfo)) } n := strings.TrimSpace(cmd.NetworkID) if n == "" { return xerrors.Errorf("empty NetworkID") } cmd.networkID = base.NetworkID([]byte(n)) log.Debug(). Str("node", cmd.Node). Str("privatekey", cmd.Privatekey). Str("network_id", string(cmd.networkID)). Interface("nodes", cmd.nodes). Str("network", cmd.Network). Str("network_parsed", cmd.network.String()). Msg("flags") return nil } func (cmd *RunNodeCommand) discovery() (*memberlist.Discovery, error) { ch, err := discovery.LoadNodeChannel(cmd.connInfo, encs, time.Second*2) if err != nil { return nil, err } np := network.NewNodepool(cmd.local, ch) dg := discovery.NewNodepoolDelegate(np, encs, time.Second*2) _ = dg.SetLogger(log) dis := memberlist.NewDiscovery(cmd.local, cmd.connInfo, cmd.networkID, enc) _ = dis.SetLogger(log) _ = dis.SetNotifyJoin(dg.NotifyJoin). SetNotifyLeave(dg.NotifyLeave). SetNotifyUpdate(dg.NotifyUpdate) return dis, dis.Initialize() } func (cmd *RunNodeCommand) quicServer() (*quicnetwork.Server, error) { ca, err := cache.NewCacheFromURI("gcache://") if err != nil { return nil, err } priv, err := util.GenerateED25519Privatekey() if err != nil { return nil, err } certs, err := util.GenerateTLSCerts(cmd.network.Host, priv) if err != nil { return nil, err } qs, err := quicnetwork.NewPrimitiveQuicServer(cmd.Bind, certs) if err != nil { return nil, err } nqs, err := quicnetwork.NewServer(qs, encs, enc, ca) if err != nil { return nil, err } if err := nqs.Initialize(); err != nil { return nil, err } return nqs, nil } func hookJoinedNodes(dis *memberlist.Discovery) { ticker := time.NewTicker(time.Second * 1) defer ticker.Stop() known := map[string]memberlist.ConnInfo{} i := -1 for range ticker.C { i++ newMap := map[string]memberlist.ConnInfo{} var updated bool ms := dis.Nodes() for j := range ms { n := ms[j].(memberlist.NodeConnInfo).ConnInfo newMap[n.Address] = n if _, found := known[n.Address]; !found { updated = true } } if i%30 == 0 || updated || len(newMap) != len(known) { known = newMap log.Debug().Interface("nodes", newMap).Msg("joined nodes") i = 1 } } } func keepTryingJoining(dis *memberlist.Discovery, nodes []memberlist.ConnInfo) { if err := dis.Join(nodes, -1); err != nil { log.Error().Err(err).Msg("failed to join") } }