package bigetc import ( "database/sql" "time" "github.com/c4pt0r/log" _ "github.com/go-sql-driver/mysql" ) // BigEtc, a PoC implementation of Etcd's important APIs: Watch, Get, Set // The core idea is: // 1. TiDB is a scalable database with **SQL** semantics. // 2. TiDB supports secondary indexes. very fast lookup. // 3. TiDB supports transactions, with pessimistic concurrency control // 4. TiDB's pessimistic concurrency control is based on MVCC and pessimistic lock is in-memory. (https://docs.pingcap.com/tidb/dev/pessimistic-transaction#in-memory-pessimistic-lock) // 5. TiDB's lock is row-level, totally scalable. // 6. TiDB uses multi-raft architecture to achieve strong consistency and auto-failover (https://tikv.org/blog/building-distributed-storage-system-on-raft/) type BigEtc struct { dsn string db *sql.DB watchers map[string]chan string versions map[string]int64 } func New(dsn string) *BigEtc { return &BigEtc{ dsn: dsn, watchers: make(map[string]chan string), versions: make(map[string]int64), } } func (b *BigEtc) Open() error { var err error b.db, err = sql.Open("mysql", b.dsn) if err != nil { return err } return b.createTables() } func (b *BigEtc) createTables() error { _, err := b.db.Exec(` CREATE TABLE IF NOT EXISTS _bigetc_store ( k VARCHAR(255) NOT NULL, v VARCHAR(255) NOT NULL, version BIGINT NOT NULL DEFAULT 0, PRIMARY KEY (k) ) `) if err != nil { return err } return nil } func (b *BigEtc) Close() error { return b.db.Close() } func (b *BigEtc) Get(key string) (string, bool, error) { var value string err := b.db.QueryRow(` SELECT v FROM _bigetc_store WHERE k = ? ORDER BY version DESC LIMIT 1 `, key).Scan(&value) if err != nil { if err == sql.ErrNoRows { return "", false, nil } return "", false, err } return value, true, nil } func (b *BigEtc) Set(key string, value string) error { txn, err := b.db.Begin() if err != nil { return err } defer txn.Rollback() _, err = txn.Exec(` SELECT k FROM _bigetc_store WHERE k = ? FOR UPDATE `, key) if err != nil { return err } // if using INSERT here instead of UPSERT, we can keep change history feed _, err = txn.Exec(` INSERT INTO _bigetc_store (k, v, version) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE v = VALUES(v), version = version + 1 `, key, value, 0) if err != nil { return err } return txn.Commit() } func (b *BigEtc) getMaxVersion(key string) (int64, error) { var version int64 err := b.db.QueryRow(` SELECT MAX(version) FROM _bigetc_store WHERE k = ? `, key).Scan(&version) if err != nil { return 0, err } return version, nil } func (b *BigEtc) Watch(key string) <-chan string { ch := make(chan string) go func() { for { var err error // get local version version, ok := b.versions[key] if !ok { version, err = b.getMaxVersion(key) if err != nil { if err == sql.ErrNoRows { b.Set(key, "") } else { log.Error(err) } } b.versions[key] = version } // get remote version remoteVersion, err := b.getMaxVersion(key) if err != nil { log.Error(err) continue } // if remote version is greater than local version, get value if remoteVersion > version { value, _, err := b.Get(key) if err != nil { log.Error(err) continue } ch <- value b.versions[key] = remoteVersion } else { // if remote version is less than or equal to local version, sleep time.Sleep(time.Millisecond * 100) } } }() return ch }