Last active
July 9, 2019 12:44
-
-
Save mkingori/ea0136957fce414742e48a40fede0ede to your computer and use it in GitHub Desktop.
Revisions
-
mkingori revised this gist
Jul 9, 2019 . 1 changed file with 6 additions and 7 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -3,7 +3,6 @@ package main import ( "fmt" "log" "time" ) @@ -14,7 +13,7 @@ var ( // Pool holds the params for the worker pool type Pool struct { // WG *sync.WaitGroup Jobs chan int Results chan int @@ -24,7 +23,7 @@ type Pool struct { // NewPool initializes the worker pool func NewPool() *Pool { return &Pool{ // WG: &sync.WaitGroup{}, Jobs: make(chan int, maxPoolQueue), Results: make(chan int, maxPoolQueue), Errors: make(chan error, maxPoolQueue), @@ -39,7 +38,7 @@ func (p *Pool) Run() error { for j := 1; j <= maxPoolQueue; j++ { p.Jobs <- j // p.WG.Add(1) } close(p.Jobs) @@ -50,11 +49,11 @@ func (p *Pool) Run() error { return err default: fmt.Println(<-p.Results) // p.WG.Done() } } // p.WG.Wait() return nil } @@ -83,4 +82,4 @@ func main() { if err := p.Run(); err != nil { log.Fatal(err) } } -
mkingori created this gist
Jul 9, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,86 @@ package main import ( "fmt" "log" "sync" "time" ) var ( maxPoolQueue = 100 maxPoolWorker = 3 ) // Pool holds the params for the worker pool type Pool struct { WG *sync.WaitGroup Jobs chan int Results chan int Errors chan error } // NewPool initializes the worker pool func NewPool() *Pool { return &Pool{ WG: &sync.WaitGroup{}, Jobs: make(chan int, maxPoolQueue), Results: make(chan int, maxPoolQueue), Errors: make(chan error, maxPoolQueue), } } // Run runs the worker pool func (p *Pool) Run() error { for w := 1; w <= maxPoolWorker; w++ { go p.worker(w) } for j := 1; j <= maxPoolQueue; j++ { p.Jobs <- j p.WG.Add(1) } close(p.Jobs) for a := 1; a <= maxPoolQueue; a++ { select { case err := <-p.Errors: return err default: fmt.Println(<-p.Results) p.WG.Done() } } p.WG.Wait() return nil } func (p *Pool) worker(id int) { for j := range p.Jobs { fmt.Println("Worker", id, "processing job", j) // Simulation of error to be returned if j == 20 { p.Errors <- fmt.Errorf("Error occured on job %d", j) continue } p.Results <- j * 2 time.Sleep(time.Second) } } func main() { p := NewPool() if err := p.Run(); err != nil { log.Fatal(err) } }