Last active
June 11, 2025 13:27
-
-
Save c4pt0r/0cf957c606268cf212cce3a2edaf6607 to your computer and use it in GitHub Desktop.
Revisions
-
c4pt0r revised this gist
May 9, 2022 . 1 changed file with 20 additions and 21 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -90,11 +90,14 @@ func createJob(db *sql.DB, job *Job) error { ?, ? )` res, err := txn.Exec(sql, job.CreatedAt, job.UpdatedAt, job.Status, job.WorkerID, job.FinishedAt, job.Data) if err != nil { return err } jobID, err := res.LastInsertId() if err != nil { return err } err = txn.Commit() if err != nil { txn.Rollback() @@ -110,17 +113,23 @@ func createWorker(db *sql.DB, worker *Worker) error { if err != nil { return err } stmt := ` INSERT INTO workers ( name ) VALUES ( ? )` res, err := txn.Exec(stmt, worker.Name) if err != nil { return err } workerID, err := res.LastInsertId() if err != nil { return err } err = txn.Commit() if err != nil { txn.Rollback() @@ -130,23 +139,13 @@ func createWorker(db *sql.DB, worker *Worker) error { return nil } func workerFetchJob(db *sql.DB, worker *Worker) (*Job, error) { /* Here's the magic. TiDB supports pessimistic transactions, And in TiDB 6.0 locks for pessimistic transactions are distributed and in-memory, which means that concurrent transactions become less costly to acquire locks. for more details: https://github.com/tikv/rfcs/blob/master/text/0077-in-memory-pessimistic-locks.md */ stmt := ` SELECT @@ -159,7 +158,7 @@ func workerFetchJob(db *sql.DB, worker *Worker) (*Job, error) { data FROM jobs WHERE status = ? ORDER BY created_at DESC LIMIT 1 FOR UPDATE` txn, err := db.Begin() -
c4pt0r revised this gist
May 8, 2022 . 1 changed file with 7 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -141,6 +141,13 @@ func getLastInsertID(txn *sql.Tx) int64 { } func workerFetchJob(db *sql.DB, worker *Worker) (*Job, error) { /* Here's the magic. TiDB supports pessimistic transactions, And in TiDB 6.0 locks for pessimistic transactions are distributed and in-memory, which means that concurrent transactions become less costly to acquire locks. for more details: https://github.com/tikv/rfcs/blob/master/text/0077-in-memory-pessimistic-locks.md */ stmt := ` SELECT id, -
c4pt0r revised this gist
May 8, 2022 . 1 changed file with 5 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -3,9 +3,11 @@ package main import ( "database/sql" "fmt" "log" "math/rand" "time" "github.com/fatih/color" _ "github.com/go-sql-driver/mysql" ) @@ -220,7 +222,6 @@ func workLoop(db *sql.DB, n int) { if err != nil { panic(err) } go func() { for { job, err := workerFetchJob(db, &worker) @@ -231,7 +232,7 @@ func workLoop(db *sql.DB, n int) { time.Sleep(10 * time.Millisecond) continue } log.Printf("%s %s get job id: %d\n", color.YellowString("[FETCH]"), worker.Name, job.ID) // simulate work time.Sleep(500 * time.Millisecond) job.Status = JobStatusFinished @@ -241,6 +242,7 @@ func workLoop(db *sql.DB, n int) { if err != nil { panic(err) } log.Printf("%s %s finish job id: %d\n", color.GreenString("[DONE]"), worker.Name, job.ID) } }() } @@ -263,7 +265,6 @@ func main() { // create 100 workers workLoop(db, 100) // create random jobs, every 100ms for { job := Job{ @@ -278,7 +279,7 @@ func main() { if err != nil { panic(err) } log.Printf("%s create job id: %d\n", color.RedString("[NEWJOB]"), job.ID) time.Sleep(100 * time.Millisecond) } } -
c4pt0r created this gist
May 8, 2022 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,284 @@ package main import ( "database/sql" "fmt" "math/rand" "time" _ "github.com/go-sql-driver/mysql" ) type JobStatus int const ( JobStatusInitialized JobStatus = iota JobStatusRunning JobStatusFinished JobStatusFailed ) type Job struct { ID int64 CreatedAt int64 UpdatedAt int64 FinishedAt int64 Status JobStatus WorkerID int64 Data string } type Worker struct { ID int64 Name string } func createDB(db *sql.DB) error { stmt := ` CREATE TABLE IF NOT EXISTS jobs ( id BIGINT AUTO_RANDOM, created_at BIGINT, updated_at BIGINT, status BIGINT, worker_id BIGINT, finished_at BIGINT, data TEXT, PRIMARY KEY (id), KEY(finished_at), KEY(created_at), KEY(status), KEY(worker_id) );` _, err := db.Exec(stmt) if err != nil { return err } stmt = ` CREATE TABLE IF NOT EXISTS workers ( id BIGINT AUTO_RANDOM, name TEXT, PRIMARY KEY (id) );` _, err = db.Exec(stmt) if err != nil { return err } return nil } func createJob(db *sql.DB, job *Job) error { txn, err := db.Begin() defer txn.Rollback() if err != nil { return err } sql := ` INSERT INTO jobs ( created_at, updated_at, status, worker_id, finished_at, data ) VALUES ( ?, ?, ?, ?, ?, ? )` _, err = txn.Exec(sql, job.CreatedAt, job.UpdatedAt, job.Status, job.WorkerID, job.FinishedAt, job.Data) if err != nil { return err } jobID := getLastInsertID(txn) err = txn.Commit() if err != nil { txn.Rollback() return err } job.ID = jobID return nil } func createWorker(db *sql.DB, worker *Worker) error { txn, err := db.Begin() defer txn.Rollback() if err != nil { return err } sql := ` INSERT INTO workers ( name ) VALUES ( ? )` _, err = txn.Exec(sql, worker.Name) if err != nil { return err } workerID := getLastInsertID(txn) err = txn.Commit() if err != nil { txn.Rollback() return err } worker.ID = workerID return nil } func getLastInsertID(txn *sql.Tx) int64 { var id int64 sql := `SELECT LAST_INSERT_ID()` err := txn.QueryRow(sql).Scan(&id) if err != nil { return 0 } return id } func workerFetchJob(db *sql.DB, worker *Worker) (*Job, error) { stmt := ` SELECT id, created_at, updated_at, status, worker_id, finished_at, data FROM jobs WHERE status = ? ORDER BY created_at ASC LIMIT 1 FOR UPDATE` txn, err := db.Begin() defer txn.Rollback() if err != nil { return nil, err } var job Job err = txn.QueryRow(stmt, JobStatusInitialized).Scan( &job.ID, &job.CreatedAt, &job.UpdatedAt, &job.Status, &job.WorkerID, &job.FinishedAt, &job.Data, ) if err != nil && err != sql.ErrNoRows { return nil, err } if job.ID == 0 { return nil, nil } stmt = ` UPDATE jobs SET worker_id = ?, updated_at = ?, status = ? WHERE id = ?` _, err = txn.Exec(stmt, worker.ID, time.Now().Unix(), JobStatusRunning, job.ID) if err != nil { return nil, err } err = txn.Commit() if err != nil { return nil, err } return &job, nil } func updateJob(db *sql.DB, job *Job) error { stmt := ` UPDATE jobs SET updated_at = ?, status = ?, finished_at = ?, data = ? WHERE id = ?` _, err := db.Exec(stmt, job.UpdatedAt, job.Status, job.FinishedAt, job.Data, job.ID) if err != nil { return err } return nil } func randomString(length int) string { var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") b := make([]rune, length) for i := range b { b[i] = letters[rand.Intn(len(letters))] } return string(b) } func workLoop(db *sql.DB, n int) { for i := 0; i < n; i++ { worker := Worker{ Name: fmt.Sprintf("worker-%d", i), } err := createWorker(db, &worker) if err != nil { panic(err) } fmt.Println("worker id:", worker.ID) go func() { for { job, err := workerFetchJob(db, &worker) if err != nil { panic(err) } if job == nil { time.Sleep(10 * time.Millisecond) continue } fmt.Printf("%s get job id: %d\n", worker.Name, job.ID) // simulate work time.Sleep(500 * time.Millisecond) job.Status = JobStatusFinished job.FinishedAt = time.Now().Unix() job.WorkerID = 0 err = updateJob(db, job) if err != nil { panic(err) } } }() } } func main() { db, err := sql.Open("mysql", "root:@tcp(localhost:4000)/test") if err != nil { panic(err) } db.SetConnMaxLifetime(time.Minute * 3) db.SetMaxOpenConns(50) db.SetMaxIdleConns(50) err = createDB(db) if err != nil { panic(err) } // create 100 workers workLoop(db, 100) // create random jobs, every 100ms for { job := Job{ CreatedAt: time.Now().Unix(), UpdatedAt: time.Now().Unix(), Status: JobStatusInitialized, WorkerID: 0, FinishedAt: 0, Data: randomString(1024), } err = createJob(db, &job) if err != nil { panic(err) } fmt.Printf("create job id: %d\n", job.ID) time.Sleep(100 * time.Millisecond) } }