package main import ( "encoding/binary" "encoding/json" "fmt" "net/http" _ "net/http/pprof" "os" "os/signal" "path/filepath" "time" "github.com/Sirupsen/logrus" "github.com/dgraph-io/badger" "github.com/dgraph-io/badger/options" "github.com/dgraph-io/badger/y" ) type writable struct { key []byte value []byte } var ( // store the first key we've seen so we can try and query it to see when it's expired from view firstKey []byte firstKeyStored bool // stopWriting = false ) func main() { go func() { // you can hit: // go tool pprof http://localhost:8001/debug/pprof/heap // go tool pprof http://localhost:8001/debug/pprof/profile logrus.Infof("starting debug web server....") logrus.Info(http.ListenAndServe("localhost:8001", nil)) }() done := make(chan struct{}) bt := NewBadgerTest() bt.Start() <-done // wait } type BadgerTest struct { db *badger.DB } func NewBadgerTest() *BadgerTest { dir := "/home/mrjn/badgertest" opts := badger.DefaultOptions opts.Dir = dir opts.ValueDir = dir opts.ValueLogLoadingMode = options.FileIO opts.TableLoadingMode = options.FileIO // opts.NumCompactors = 20 // opts.MaxTableSize = .25 * 1073741824 // .25GB opts.NumMemtables = 2 opts.ValueLogFileSize = 1 * 1073741824 // 2GB opts.SyncWrites = false bytes, _ := json.Marshal(&opts) logrus.Infof("BADGER OPTIONS=%s", string(bytes)) db, err := badger.Open(opts) if err != nil { panic(fmt.Sprintf("unable to open badger db; %s", err)) } bt := &BadgerTest{ db: db, } go bt.filecounts(dir) return bt } func (b *BadgerTest) Start() { a := y.NewCloser(3) c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) go func() { for range c { fmt.Println("Signaling...") a.SignalAndWait() fmt.Printf("Closing, error: %v\n", b.db.Close()) os.Exit(0) } }() workers := 1 for i := 0; i < workers; i++ { go b.write(a) } //go b.write() go b.badgerGC(a) go func() { defer a.Done() tick := time.NewTicker(1 * time.Minute) for { select { case <-tick.C: b.readKey() case <-a.HasBeenClosed(): fmt.Println("Returning from readKey") return } } }() //go func() { // tick := time.NewTicker(15 * time.Minute) // for _ = range tick.C { // stopWriting = true // return // } //}() } func (b *BadgerTest) Stop() { b.db.Close() logrus.Infof("%s shut down badger test", time.Now()) time.Sleep(1 * time.Second) } func (b *BadgerTest) filecounts(dir string) { ticker := time.NewTicker(60 * time.Second) for _ = range ticker.C { logFiles := int64(0) sstFiles := int64(0) _ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if filepath.Ext(path) == ".sst" { sstFiles++ } if filepath.Ext(path) == ".vlog" { logFiles++ } return nil }) logrus.Infof("%s updated gauges vlog=%d sst=%d", time.Now(), logFiles, sstFiles) } } func (b *BadgerTest) write(a *y.Closer) { defer a.Done() data := `{"randomstring":"6446D58D6DFAFD58586D3EA85A53F4A6B3CC057F933A22BB58E188A74AC8F663","refID":12495494,"testfield1234":"foo bar baz","date":"2018-01-01 12:00:00"}` batchSize := 1 rows := []writable{} var cnt uint64 for { select { case <-a.HasBeenClosed(): logrus.Infof("%s stopping writing, let it catch up", time.Now()) return default: } // cnt = (cnt + 1) % 5 cnt++ // ts := time.Now().UnixNano() buf := make([]byte, 8) // binary.BigEndian.PutUint64(buf[0:], uint64(ts)) binary.BigEndian.PutUint64(buf, cnt) // fmt.Printf("Key: %x\n", buf) if !firstKeyStored { firstKey = buf firstKeyStored = true logrus.Infof("%s firstkey stored %x", time.Now(), firstKey) } w := writable{key: buf, value: []byte(data)} rows = append(rows, w) if len(rows) > batchSize { b.saveRows(rows) rows = []writable{} } } } func (b *BadgerTest) saveRows(rows []writable) { // ttl := 30 * time.Minute ttl := 90 * time.Second txn := b.db.NewTransaction(true) for _, row := range rows { if err := txn.SetWithTTL(row.key, row.value, ttl); err == badger.ErrTxnTooBig { logrus.Infof("%s TX too big, committing...", time.Now()) _ = txn.Commit(nil) txn = b.db.NewTransaction(true) err = txn.SetWithTTL(row.key, row.value, ttl) } else if err != nil && err != badger.ErrTxnTooBig { logrus.Errorf("unable to set key-val in transaction; %s", err) } } err := txn.Commit(nil) if err != nil { logrus.Errorf("unable to commit transaction; %s", err) } } func (b *BadgerTest) readKey() { // at some point our first key should be expired err := b.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(firstKey)) if err != nil { return err } val, err := item.Value() if err != nil { return err } logrus.Infof("%s FIRSTKEY VALUE=%s", time.Now(), val) return nil }) if err != nil { logrus.Errorf("%s FIRSTKEY unable to read first key from badger; %s", time.Now(), err) } } func (b *BadgerTest) badgerGC(a *y.Closer) { defer a.Done() _, lastVlogSize := b.db.Size() logrus.Infof("lastVLOGSize=%v", lastVlogSize) ticker := time.NewTicker(2 * time.Minute) const GB = int64(1 << 30) for { select { case <-a.HasBeenClosed(): fmt.Println("Returning from GC") return case <-ticker.C: _, currentVlogSize := b.db.Size() if currentVlogSize < lastVlogSize+GB { continue } logrus.Infof("%s CLEANUP starting to purge keys", time.Now()) err := b.db.PurgeOlderVersions() if err != nil { logrus.Errorf("%s badgerOps unable to purge older versions; %s", time.Now(), err) } logrus.Infof("%s CLEANUP PurgeOlderVersions completed", time.Now()) // If size increased by 3.5 GB, then we run this 3 times. numTimes := (currentVlogSize - lastVlogSize) / GB for i := 0; i < int(numTimes); i++ { err := b.db.RunValueLogGC(0.5) if err != nil { logrus.Errorf("%s badgerOps unable to RunValueLogGC; %s", time.Now(), err) } logrus.Infof("%s CLEANUP RunValueLogGC completed iteration=%d", time.Now(), i) } _, lastVlogSize = b.db.Size() } } }