Skip to content

Instantly share code, notes, and snippets.

@c4pt0r
Last active June 11, 2025 13:27
Show Gist options
  • Select an option

  • Save c4pt0r/0cf957c606268cf212cce3a2edaf6607 to your computer and use it in GitHub Desktop.

Select an option

Save c4pt0r/0cf957c606268cf212cce3a2edaf6607 to your computer and use it in GitHub Desktop.

Revisions

  1. c4pt0r revised this gist May 9, 2022. 1 changed file with 20 additions and 21 deletions.
    41 changes: 20 additions & 21 deletions jq.go
    Original file line number Diff line number Diff line change
    @@ -90,11 +90,14 @@ func createJob(db *sql.DB, job *Job) error {
    ?,
    ?
    )`
    _, err = txn.Exec(sql, job.CreatedAt, job.UpdatedAt, job.Status, job.WorkerID, job.FinishedAt, job.Data)
    res, err := txn.Exec(sql, job.CreatedAt, job.UpdatedAt, job.Status, job.WorkerID, job.FinishedAt, job.Data)
    if err != nil {
    return err
    }
    jobID, err := res.LastInsertId()
    if err != nil {
    return err
    }
    jobID := getLastInsertID(txn)
    err = txn.Commit()
    if err != nil {
    txn.Rollback()
    @@ -110,17 +113,23 @@ func createWorker(db *sql.DB, worker *Worker) error {
    if err != nil {
    return err
    }
    sql := `
    stmt := `
    INSERT INTO workers (
    name
    ) VALUES (
    ?
    )`
    _, err = txn.Exec(sql, worker.Name)

    res, err := txn.Exec(stmt, worker.Name)
    if err != nil {
    return err
    }
    workerID := getLastInsertID(txn)

    workerID, err := res.LastInsertId()
    if err != nil {
    return err
    }

    err = txn.Commit()
    if err != nil {
    txn.Rollback()
    @@ -130,23 +139,13 @@ func createWorker(db *sql.DB, worker *Worker) error {
    return nil
    }

    func getLastInsertID(txn *sql.Tx) int64 {
    var id int64
    sql := `SELECT LAST_INSERT_ID()`
    err := txn.QueryRow(sql).Scan(&id)
    if err != nil {
    return 0
    }
    return id
    }

    func workerFetchJob(db *sql.DB, worker *Worker) (*Job, error) {
    /*
    Here's the magic.
    TiDB supports pessimistic transactions,
    And in TiDB 6.0 locks for pessimistic transactions are distributed and in-memory,
    which means that concurrent transactions become less costly to acquire locks.
    for more details: https://github.com/tikv/rfcs/blob/master/text/0077-in-memory-pessimistic-locks.md
    Here's the magic.
    TiDB supports pessimistic transactions,
    And in TiDB 6.0 locks for pessimistic transactions are distributed and in-memory,
    which means that concurrent transactions become less costly to acquire locks.
    for more details: https://github.com/tikv/rfcs/blob/master/text/0077-in-memory-pessimistic-locks.md
    */
    stmt := `
    SELECT
    @@ -159,7 +158,7 @@ func workerFetchJob(db *sql.DB, worker *Worker) (*Job, error) {
    data
    FROM jobs
    WHERE status = ?
    ORDER BY created_at ASC
    ORDER BY created_at DESC
    LIMIT 1
    FOR UPDATE`
    txn, err := db.Begin()
  2. c4pt0r revised this gist May 8, 2022. 1 changed file with 7 additions and 0 deletions.
    7 changes: 7 additions & 0 deletions jq.go
    Original file line number Diff line number Diff line change
    @@ -141,6 +141,13 @@ func getLastInsertID(txn *sql.Tx) int64 {
    }

    func workerFetchJob(db *sql.DB, worker *Worker) (*Job, error) {
    /*
    Here's the magic.
    TiDB supports pessimistic transactions,
    And in TiDB 6.0 locks for pessimistic transactions are distributed and in-memory,
    which means that concurrent transactions become less costly to acquire locks.
    for more details: https://github.com/tikv/rfcs/blob/master/text/0077-in-memory-pessimistic-locks.md
    */
    stmt := `
    SELECT
    id,
  3. c4pt0r revised this gist May 8, 2022. 1 changed file with 5 additions and 4 deletions.
    9 changes: 5 additions & 4 deletions jq.go
    Original file line number Diff line number Diff line change
    @@ -3,9 +3,11 @@ package main
    import (
    "database/sql"
    "fmt"
    "log"
    "math/rand"
    "time"

    "github.com/fatih/color"
    _ "github.com/go-sql-driver/mysql"
    )

    @@ -220,7 +222,6 @@ func workLoop(db *sql.DB, n int) {
    if err != nil {
    panic(err)
    }
    fmt.Println("worker id:", worker.ID)
    go func() {
    for {
    job, err := workerFetchJob(db, &worker)
    @@ -231,7 +232,7 @@ func workLoop(db *sql.DB, n int) {
    time.Sleep(10 * time.Millisecond)
    continue
    }
    fmt.Printf("%s get job id: %d\n", worker.Name, job.ID)
    log.Printf("%s %s get job id: %d\n", color.YellowString("[FETCH]"), worker.Name, job.ID)
    // simulate work
    time.Sleep(500 * time.Millisecond)
    job.Status = JobStatusFinished
    @@ -241,6 +242,7 @@ func workLoop(db *sql.DB, n int) {
    if err != nil {
    panic(err)
    }
    log.Printf("%s %s finish job id: %d\n", color.GreenString("[DONE]"), worker.Name, job.ID)
    }
    }()
    }
    @@ -263,7 +265,6 @@ func main() {

    // create 100 workers
    workLoop(db, 100)

    // create random jobs, every 100ms
    for {
    job := Job{
    @@ -278,7 +279,7 @@ func main() {
    if err != nil {
    panic(err)
    }
    fmt.Printf("create job id: %d\n", job.ID)
    log.Printf("%s create job id: %d\n", color.RedString("[NEWJOB]"), job.ID)
    time.Sleep(100 * time.Millisecond)
    }
    }
  4. c4pt0r created this gist May 8, 2022.
    284 changes: 284 additions & 0 deletions jq.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,284 @@
    package main

    import (
    "database/sql"
    "fmt"
    "math/rand"
    "time"

    _ "github.com/go-sql-driver/mysql"
    )

    type JobStatus int

    const (
    JobStatusInitialized JobStatus = iota
    JobStatusRunning
    JobStatusFinished
    JobStatusFailed
    )

    type Job struct {
    ID int64
    CreatedAt int64
    UpdatedAt int64
    FinishedAt int64
    Status JobStatus
    WorkerID int64
    Data string
    }

    type Worker struct {
    ID int64
    Name string
    }

    func createDB(db *sql.DB) error {
    stmt := `
    CREATE TABLE IF NOT EXISTS jobs (
    id BIGINT AUTO_RANDOM,
    created_at BIGINT,
    updated_at BIGINT,
    status BIGINT,
    worker_id BIGINT,
    finished_at BIGINT,
    data TEXT,
    PRIMARY KEY (id),
    KEY(finished_at),
    KEY(created_at),
    KEY(status),
    KEY(worker_id)
    );`
    _, err := db.Exec(stmt)
    if err != nil {
    return err
    }
    stmt = `
    CREATE TABLE IF NOT EXISTS workers (
    id BIGINT AUTO_RANDOM,
    name TEXT,
    PRIMARY KEY (id)
    );`
    _, err = db.Exec(stmt)
    if err != nil {
    return err
    }
    return nil
    }

    func createJob(db *sql.DB, job *Job) error {
    txn, err := db.Begin()
    defer txn.Rollback()
    if err != nil {
    return err
    }
    sql := `
    INSERT INTO jobs (
    created_at,
    updated_at,
    status,
    worker_id,
    finished_at,
    data
    ) VALUES (
    ?,
    ?,
    ?,
    ?,
    ?,
    ?
    )`
    _, err = txn.Exec(sql, job.CreatedAt, job.UpdatedAt, job.Status, job.WorkerID, job.FinishedAt, job.Data)
    if err != nil {
    return err
    }
    jobID := getLastInsertID(txn)
    err = txn.Commit()
    if err != nil {
    txn.Rollback()
    return err
    }
    job.ID = jobID
    return nil
    }

    func createWorker(db *sql.DB, worker *Worker) error {
    txn, err := db.Begin()
    defer txn.Rollback()
    if err != nil {
    return err
    }
    sql := `
    INSERT INTO workers (
    name
    ) VALUES (
    ?
    )`
    _, err = txn.Exec(sql, worker.Name)
    if err != nil {
    return err
    }
    workerID := getLastInsertID(txn)
    err = txn.Commit()
    if err != nil {
    txn.Rollback()
    return err
    }
    worker.ID = workerID
    return nil
    }

    func getLastInsertID(txn *sql.Tx) int64 {
    var id int64
    sql := `SELECT LAST_INSERT_ID()`
    err := txn.QueryRow(sql).Scan(&id)
    if err != nil {
    return 0
    }
    return id
    }

    func workerFetchJob(db *sql.DB, worker *Worker) (*Job, error) {
    stmt := `
    SELECT
    id,
    created_at,
    updated_at,
    status,
    worker_id,
    finished_at,
    data
    FROM jobs
    WHERE status = ?
    ORDER BY created_at ASC
    LIMIT 1
    FOR UPDATE`
    txn, err := db.Begin()
    defer txn.Rollback()
    if err != nil {
    return nil, err
    }

    var job Job
    err = txn.QueryRow(stmt, JobStatusInitialized).Scan(
    &job.ID,
    &job.CreatedAt,
    &job.UpdatedAt,
    &job.Status,
    &job.WorkerID,
    &job.FinishedAt,
    &job.Data,
    )
    if err != nil && err != sql.ErrNoRows {
    return nil, err
    }
    if job.ID == 0 {
    return nil, nil
    }
    stmt = `
    UPDATE jobs
    SET worker_id = ?, updated_at = ?, status = ?
    WHERE id = ?`
    _, err = txn.Exec(stmt, worker.ID, time.Now().Unix(), JobStatusRunning, job.ID)
    if err != nil {
    return nil, err
    }
    err = txn.Commit()
    if err != nil {
    return nil, err
    }
    return &job, nil
    }

    func updateJob(db *sql.DB, job *Job) error {
    stmt := `
    UPDATE jobs
    SET updated_at = ?, status = ?, finished_at = ?, data = ?
    WHERE id = ?`
    _, err := db.Exec(stmt, job.UpdatedAt, job.Status, job.FinishedAt, job.Data, job.ID)
    if err != nil {
    return err
    }
    return nil
    }

    func randomString(length int) string {
    var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
    b := make([]rune, length)
    for i := range b {
    b[i] = letters[rand.Intn(len(letters))]
    }
    return string(b)
    }

    func workLoop(db *sql.DB, n int) {
    for i := 0; i < n; i++ {
    worker := Worker{
    Name: fmt.Sprintf("worker-%d", i),
    }
    err := createWorker(db, &worker)
    if err != nil {
    panic(err)
    }
    fmt.Println("worker id:", worker.ID)
    go func() {
    for {
    job, err := workerFetchJob(db, &worker)
    if err != nil {
    panic(err)
    }
    if job == nil {
    time.Sleep(10 * time.Millisecond)
    continue
    }
    fmt.Printf("%s get job id: %d\n", worker.Name, job.ID)
    // simulate work
    time.Sleep(500 * time.Millisecond)
    job.Status = JobStatusFinished
    job.FinishedAt = time.Now().Unix()
    job.WorkerID = 0
    err = updateJob(db, job)
    if err != nil {
    panic(err)
    }
    }
    }()
    }
    }

    func main() {
    db, err := sql.Open("mysql", "root:@tcp(localhost:4000)/test")
    if err != nil {
    panic(err)
    }

    db.SetConnMaxLifetime(time.Minute * 3)
    db.SetMaxOpenConns(50)
    db.SetMaxIdleConns(50)

    err = createDB(db)
    if err != nil {
    panic(err)
    }

    // create 100 workers
    workLoop(db, 100)

    // create random jobs, every 100ms
    for {
    job := Job{
    CreatedAt: time.Now().Unix(),
    UpdatedAt: time.Now().Unix(),
    Status: JobStatusInitialized,
    WorkerID: 0,
    FinishedAt: 0,
    Data: randomString(1024),
    }
    err = createJob(db, &job)
    if err != nil {
    panic(err)
    }
    fmt.Printf("create job id: %d\n", job.ID)
    time.Sleep(100 * time.Millisecond)
    }
    }