package main import ( "context" "fmt" "log" "math/rand" "os" "time" "github.com/jackc/pgx/v5/pgxpool" "golang.org/x/sync/errgroup" ) const ( workerCount = 10 lockID = 42 ) var ( connPool *pgxpool.Pool logger *log.Logger ) func main() { ctx := context.Background() logger = log.New(os.Stdout, "", log.LstdFlags) var err error connPool, err = pgxpool.New(ctx, os.Getenv("POSTGRES_DSN")) must(err) grp, ctx := errgroup.WithContext(ctx) for i := 0; i < workerCount; i++ { i := i // infamous loop-closure workaround grp.Go(func() error { return worker(ctx, i) }) } must(grp.Wait()) } func worker(ctx context.Context, id int) error { logger.Printf("worker %d: starting\n", id) txn, err := connPool.Begin(ctx) must(err) const lockStatement = `SELECT pg_try_advisory_xact_lock($1)` jitter := time.Millisecond * time.Duration(rand.Intn(1000)-500) tick := time.NewTicker((time.Second * 4) + jitter) defer tick.Stop() for { select { case <-tick.C: row := txn.QueryRow(ctx, lockStatement, lockID) var hasLock bool if err := row.Scan(&hasLock); err != nil { return err } if hasLock { logger.Printf("worker %d: I have the lock\n", id) } else { logger.Printf("worker %d: I do not have the lock\n", id) } randomShouldYield := rand.Float64() > 0.7 if hasLock && randomShouldYield { log.Printf("worker %d: releasing lock and restarting", id) if err := txn.Commit(ctx); err != nil { return err } return worker(ctx, id) } case <-ctx.Done(): return fmt.Errorf("stopping worker %d: %w", id, ctx.Err()) } } } func must(err error) { if err != nil { panic(err) } }