-
-
Save mkingori/ea0136957fce414742e48a40fede0ede to your computer and use it in GitHub Desktop.
| package main | |
| import ( | |
| "fmt" | |
| "log" | |
| "time" | |
| ) | |
| var ( | |
| maxPoolQueue = 100 | |
| maxPoolWorker = 3 | |
| ) | |
| // Pool holds the params for the worker pool | |
| type Pool struct { | |
| // WG *sync.WaitGroup | |
| Jobs chan int | |
| Results chan int | |
| Errors chan error | |
| } | |
| // NewPool initializes the worker pool | |
| func NewPool() *Pool { | |
| return &Pool{ | |
| // WG: &sync.WaitGroup{}, | |
| Jobs: make(chan int, maxPoolQueue), | |
| Results: make(chan int, maxPoolQueue), | |
| Errors: make(chan error, maxPoolQueue), | |
| } | |
| } | |
| // Run runs the worker pool | |
| func (p *Pool) Run() error { | |
| for w := 1; w <= maxPoolWorker; w++ { | |
| go p.worker(w) | |
| } | |
| for j := 1; j <= maxPoolQueue; j++ { | |
| p.Jobs <- j | |
| // p.WG.Add(1) | |
| } | |
| close(p.Jobs) | |
| for a := 1; a <= maxPoolQueue; a++ { | |
| select { | |
| case err := <-p.Errors: | |
| return err | |
| default: | |
| fmt.Println(<-p.Results) | |
| // p.WG.Done() | |
| } | |
| } | |
| // p.WG.Wait() | |
| return nil | |
| } | |
| func (p *Pool) worker(id int) { | |
| for j := range p.Jobs { | |
| fmt.Println("Worker", id, "processing job", j) | |
| // Simulation of error to be returned | |
| if j == 20 { | |
| p.Errors <- fmt.Errorf("Error occured on job %d", j) | |
| continue | |
| } | |
| p.Results <- j * 2 | |
| time.Sleep(time.Second) | |
| } | |
| } | |
| func main() { | |
| p := NewPool() | |
| if err := p.Run(); err != nil { | |
| log.Fatal(err) | |
| } | |
| } |
I was just working with the concept of allocate, process and gather with worker pools as a part of learning.
That being said, there is no way for the worker routines to close the results channel to mark that they are all done especially when we don't know the number of results we expect to help us know the number of results to read from the channel.
I guess the its possible to read from the buffered channels when they are not full. Issues will arise when you try to receive from an empty channel, or send to a full channel.
I have dropped the usage of &sync.WaitGroup{} in that code snippet and it works just fine. Same as when the wait group was in place.
package main
import (
"fmt"
"log"
"sync"
"time"
)
var (
maxPoolQueue = 100
maxPoolWorker = 3
)
// Pool holds the params for the worker pool
type Pool struct {
WG *sync.WaitGroup
Jobs chan int
Results chan int
Errors chan error
}
// NewPool initializes the worker pool
func NewPool() *Pool {
return &Pool{
// WG: &sync.WaitGroup{},
Jobs: make(chan int, maxPoolQueue),
Results: make(chan int, maxPoolQueue),
Errors: make(chan error, maxPoolQueue),
}
}
// Run runs the worker pool
func (p *Pool) Run() error {
for w := 1; w <= maxPoolWorker; w++ {
go p.worker(w)
}
for j := 1; j <= maxPoolQueue; j++ {
p.Jobs <- j
// p.WG.Add(1)
}
close(p.Jobs)
for a := 1; a <= maxPoolQueue; a++ {
select {
case err := <-p.Errors:
return err
default:
fmt.Println(<-p.Results)
// p.WG.Done()
}
}
p.WG.Wait()
return nil
}
func (p *Pool) worker(id int) {
for j := range p.Jobs {
fmt.Println("Worker", id, "processing job", j)
// Simulation of error to be returned
if j == 20 {
p.Errors <- fmt.Errorf("Error occured on job %d", j)
continue
}
p.Results <- j * 2
time.Sleep(time.Second)
}
}
func main() {
p := NewPool()
if err := p.Run(); err != nil {
log.Fatal(err)
}
}I agree that it should work. I added the WaitGroups to make the code more reliable. Anyway i am just learning and maybe its ideal not to have WaitGroups and Channels together.
Anyway thanks so much @dmigwi for the insights.
there is no way for the worker routines to close the results channel to mark that they are all done especially when we don't know the number of results we expect to help us know the number of results to read from the channel.
There is but I would recommend a specific implementation based on the planned use case, some are not worth the hustle.
Okay. Thanks
I added the WaitGroups to make the code more reliable
I hold an opposing view on this matter. To me the less code you use to implement a feature is directly proportional to how readable the code is. Also other implementations could make your code more readable but slower, so its often recommended that you evaluate all the choices at hand.
😎
I still can't tell why you decided to use sync wait groups and channels together. In my opinion the channels alone should do the work.
The other thing you should put into consideration is that data from the buffered channels:
Job,ResultsorErrorcannot be read till the channels are full. Find more information here: https://tour.golang.org/concurrency/3I would have recommended an optimized version of the code if only I knew its intended application.