Created
November 14, 2018 04:32
-
-
Save seafooler/0b134bc2858e963597b7bee3c2115a2a to your computer and use it in GitHub Desktop.
Concurrency Pattern in "Golang in Action": pool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // This sample program demonstrates how to use the pool package | |
| // to share a simulated set of database connections. | |
| package main | |
| import ( | |
| "io" | |
| "log" | |
| "math/rand" | |
| "sync" | |
| "sync/atomic" | |
| "time" | |
| "github.com/goinaction/code/chapter7/patterns/pool" | |
| ) | |
| const ( | |
| maxGoroutines = 25 // the number of routines to use. | |
| pooledResources = 2 // number of resources in the pool | |
| ) | |
| // dbConnection simulates a resource to share. | |
| type dbConnection struct { | |
| ID int32 | |
| } | |
| // Close implements the io.Closer interface so dbConnection | |
| // can be managed by the pool. Close performs any resource | |
| // release management. | |
| func (dbConn *dbConnection) Close() error { | |
| log.Println("Close: Connection", dbConn.ID) | |
| return nil | |
| } | |
| // idCounter provides support for giving each connection a unique id. | |
| var idCounter int32 | |
| // createConnection is a factory method that will be called by | |
| // the pool when a new connection is needed. | |
| func createConnection() (io.Closer, error) { | |
| id := atomic.AddInt32(&idCounter, 1) | |
| log.Println("Create: New Connection", id) | |
| return &dbConnection{id}, nil | |
| } | |
| // main is the entry point for all Go programs. | |
| func main() { | |
| var wg sync.WaitGroup | |
| wg.Add(maxGoroutines) | |
| // Create the pool to manage our connections. | |
| p, err := pool.New(createConnection, pooledResources) | |
| if err != nil { | |
| log.Println(err) | |
| } | |
| // Perform queries using connections from the pool. | |
| for query := 0; query < maxGoroutines; query++ { | |
| // Each goroutine needs its own copy of the query | |
| // value else they will all be sharing the same query | |
| // variable. | |
| go func(q int) { | |
| performQueries(q, p) | |
| wg.Done() | |
| }(query) | |
| } | |
| // Wait for the goroutines to finish. | |
| wg.Wait() | |
| // Close the pool. | |
| log.Println("Shutdown Program.") | |
| p.Close() | |
| } | |
| // performQueries tests the resource pool of connections. | |
| func performQueries(query int, p *pool.Pool) { | |
| // Acquire a connection from the pool. | |
| conn, err := p.Acquire() | |
| if err != nil { | |
| log.Println(err) | |
| return | |
| } | |
| // Release the connection back to the pool. | |
| defer p.Release(conn) | |
| // Wait to simulate a query response. | |
| time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) | |
| log.Printf("Query: QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Example provided with help from Fatih Arslan and Gabriel Aszalos. | |
| // Package pool manages a user defined set of resources. | |
| package pool | |
| import ( | |
| "errors" | |
| "io" | |
| "log" | |
| "sync" | |
| ) | |
| // Pool manages a set of resources that can be shared safely by | |
| // multiple goroutines. The resource being managed must implement | |
| // the io.Closer interface. | |
| type Pool struct { | |
| m sync.Mutex | |
| resources chan io.Closer | |
| factory func() (io.Closer, error) | |
| closed bool | |
| } | |
| // ErrPoolClosed is returned when an Acquire returns on a | |
| // closed pool. | |
| var ErrPoolClosed = errors.New("Pool has been closed.") | |
| // New creates a pool that manages resources. A pool requires a | |
| // function that can allocate a new resource and the size of | |
| // the pool. | |
| func New(fn func() (io.Closer, error), size uint) (*Pool, error) { | |
| if size <= 0 { | |
| return nil, errors.New("Size value too small.") | |
| } | |
| return &Pool{ | |
| factory: fn, | |
| resources: make(chan io.Closer, size), | |
| }, nil | |
| } | |
| // Acquire retrieves a resource from the pool. | |
| func (p *Pool) Acquire() (io.Closer, error) { | |
| select { | |
| // Check for a free resource. | |
| case r, ok := <-p.resources: | |
| log.Println("Acquire:", "Shared Resource") | |
| if !ok { | |
| return nil, ErrPoolClosed | |
| } | |
| return r, nil | |
| // Provide a new resource since there are none available. | |
| default: | |
| log.Println("Acquire:", "New Resource") | |
| return p.factory() | |
| } | |
| } | |
| // Release places a new resource onto the pool. | |
| func (p *Pool) Release(r io.Closer) { | |
| // Secure this operation with the Close operation. | |
| p.m.Lock() | |
| defer p.m.Unlock() | |
| // If the pool is closed, discard the resource. | |
| if p.closed { | |
| r.Close() | |
| return | |
| } | |
| select { | |
| // Attempt to place the new resource on the queue. | |
| case p.resources <- r: | |
| log.Println("Release:", "In Queue") | |
| // If the queue is already at cap we close the resource. | |
| default: | |
| log.Println("Release:", "Closing") | |
| r.Close() | |
| } | |
| } | |
| // Close will shutdown the pool and close all existing resources. | |
| func (p *Pool) Close() { | |
| // Secure this operation with the Release operation. | |
| p.m.Lock() | |
| defer p.m.Unlock() | |
| // If the pool is already close, don't do anything. | |
| if p.closed { | |
| return | |
| } | |
| // Set the pool as closed. | |
| p.closed = true | |
| // Close the channel before we drain the channel of its | |
| // resources. If we don't do this, we will have a deadlock. | |
| close(p.resources) | |
| // Close the resources | |
| for r := range p.resources { | |
| r.Close() | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment