Skip to content

Instantly share code, notes, and snippets.

@craigfurman
Created March 31, 2023 09:59
Show Gist options
  • Save craigfurman/20ca2e8b6440e1591f64ef94f0b1ee90 to your computer and use it in GitHub Desktop.
Save craigfurman/20ca2e8b6440e1591f64ef94f0b1ee90 to your computer and use it in GitHub Desktop.

Revisions

  1. craigfurman created this gist Mar 31, 2023.
    86 changes: 86 additions & 0 deletions postgres-pgx-distributed-lock.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,86 @@
    package main

    import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "os"
    "time"

    "github.com/jackc/pgx/v5/pgxpool"
    "golang.org/x/sync/errgroup"
    )

    const (
    workerCount = 10
    lockID = 42
    )

    var (
    connPool *pgxpool.Pool
    logger *log.Logger
    )

    func main() {
    ctx := context.Background()
    logger = log.New(os.Stdout, "", log.LstdFlags)
    var err error
    connPool, err = pgxpool.New(ctx, os.Getenv("POSTGRES_DSN"))
    must(err)

    grp, ctx := errgroup.WithContext(ctx)

    for i := 0; i < workerCount; i++ {
    i := i // infamous loop-closure workaround
    grp.Go(func() error {
    return worker(ctx, i)
    })
    }

    must(grp.Wait())
    }

    func worker(ctx context.Context, id int) error {
    logger.Printf("worker %d: starting\n", id)
    txn, err := connPool.Begin(ctx)
    must(err)

    const lockStatement = `SELECT pg_try_advisory_xact_lock($1)`

    jitter := time.Millisecond * time.Duration(rand.Intn(1000)-500)
    tick := time.NewTicker((time.Second * 4) + jitter)
    defer tick.Stop()
    for {
    select {
    case <-tick.C:
    row := txn.QueryRow(ctx, lockStatement, lockID)
    var hasLock bool
    if err := row.Scan(&hasLock); err != nil {
    return err
    }
    if hasLock {
    logger.Printf("worker %d: I have the lock\n", id)
    } else {
    logger.Printf("worker %d: I do not have the lock\n", id)
    }

    randomShouldYield := rand.Float64() > 0.7
    if hasLock && randomShouldYield {
    log.Printf("worker %d: releasing lock and restarting", id)
    if err := txn.Commit(ctx); err != nil {
    return err
    }
    return worker(ctx, id)
    }
    case <-ctx.Done():
    return fmt.Errorf("stopping worker %d: %w", id, ctx.Err())
    }
    }
    }

    func must(err error) {
    if err != nil {
    panic(err)
    }
    }