Skip to content

Instantly share code, notes, and snippets.

@mkingori
Last active July 9, 2019 12:44
Show Gist options
  • Save mkingori/ea0136957fce414742e48a40fede0ede to your computer and use it in GitHub Desktop.
Save mkingori/ea0136957fce414742e48a40fede0ede to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"log"
"time"
)
var (
maxPoolQueue = 100
maxPoolWorker = 3
)
// Pool holds the params for the worker pool
type Pool struct {
// WG *sync.WaitGroup
Jobs chan int
Results chan int
Errors chan error
}
// NewPool initializes the worker pool
func NewPool() *Pool {
return &Pool{
// WG: &sync.WaitGroup{},
Jobs: make(chan int, maxPoolQueue),
Results: make(chan int, maxPoolQueue),
Errors: make(chan error, maxPoolQueue),
}
}
// Run runs the worker pool
func (p *Pool) Run() error {
for w := 1; w <= maxPoolWorker; w++ {
go p.worker(w)
}
for j := 1; j <= maxPoolQueue; j++ {
p.Jobs <- j
// p.WG.Add(1)
}
close(p.Jobs)
for a := 1; a <= maxPoolQueue; a++ {
select {
case err := <-p.Errors:
return err
default:
fmt.Println(<-p.Results)
// p.WG.Done()
}
}
// p.WG.Wait()
return nil
}
func (p *Pool) worker(id int) {
for j := range p.Jobs {
fmt.Println("Worker", id, "processing job", j)
// Simulation of error to be returned
if j == 20 {
p.Errors <- fmt.Errorf("Error occured on job %d", j)
continue
}
p.Results <- j * 2
time.Sleep(time.Second)
}
}
func main() {
p := NewPool()
if err := p.Run(); err != nil {
log.Fatal(err)
}
}
@dmigwi
Copy link

dmigwi commented Jul 9, 2019

I still can't tell why you decided to use sync wait groups and channels together. In my opinion the channels alone should do the work.

The other thing you should put into consideration is that data from the buffered channels: Job, Results or Error cannot be read till the channels are full. Find more information here: https://tour.golang.org/concurrency/3

I would have recommended an optimized version of the code if only I knew its intended application.

@mkingori
Copy link
Author

mkingori commented Jul 9, 2019

I was just working with the concept of allocate, process and gather with worker pools as a part of learning.

That being said, there is no way for the worker routines to close the results channel to mark that they are all done especially when we don't know the number of results we expect to help us know the number of results to read from the channel.

I guess the its possible to read from the buffered channels when they are not full. Issues will arise when you try to receive from an empty channel, or send to a full channel.

@dmigwi
Copy link

dmigwi commented Jul 9, 2019

I have dropped the usage of &sync.WaitGroup{} in that code snippet and it works just fine. Same as when the wait group was in place.

@dmigwi
Copy link

dmigwi commented Jul 9, 2019

Here is my code that works:
package main

import (
	"fmt"
	"log"
	"sync"
	"time"
)

var (
	maxPoolQueue  = 100
	maxPoolWorker = 3
)

// Pool holds the params for the worker pool
type Pool struct {
	WG *sync.WaitGroup

	Jobs    chan int
	Results chan int
	Errors  chan error
}

// NewPool initializes the worker pool
func NewPool() *Pool {
	return &Pool{
		// WG:      &sync.WaitGroup{},
		Jobs:    make(chan int, maxPoolQueue),
		Results: make(chan int, maxPoolQueue),
		Errors:  make(chan error, maxPoolQueue),
	}
}

// Run runs the worker pool
func (p *Pool) Run() error {
	for w := 1; w <= maxPoolWorker; w++ {
		go p.worker(w)
	}

	for j := 1; j <= maxPoolQueue; j++ {
		p.Jobs <- j
		// p.WG.Add(1)
	}

	close(p.Jobs)

	for a := 1; a <= maxPoolQueue; a++ {
		select {
		case err := <-p.Errors:
			return err
		default:
			fmt.Println(<-p.Results)
			// p.WG.Done()
		}
	}

	p.WG.Wait()

	return nil
}

func (p *Pool) worker(id int) {

	for j := range p.Jobs {

		fmt.Println("Worker", id, "processing job", j)

		// Simulation of error to be returned
		if j == 20 {
			p.Errors <- fmt.Errorf("Error occured on job %d", j)
			continue
		}

		p.Results <- j * 2
		time.Sleep(time.Second)
	}
}

func main() {

	p := NewPool()

	if err := p.Run(); err != nil {
		log.Fatal(err)
	}
}

@mkingori
Copy link
Author

mkingori commented Jul 9, 2019

I agree that it should work. I added the WaitGroups to make the code more reliable. Anyway i am just learning and maybe its ideal not to have WaitGroups and Channels together.

Anyway thanks so much @dmigwi for the insights.

@dmigwi
Copy link

dmigwi commented Jul 9, 2019

there is no way for the worker routines to close the results channel to mark that they are all done especially when we don't know the number of results we expect to help us know the number of results to read from the channel.

There is but I would recommend a specific implementation based on the planned use case, some are not worth the hustle.

@mkingori
Copy link
Author

mkingori commented Jul 9, 2019

Okay. Thanks

@dmigwi
Copy link

dmigwi commented Jul 9, 2019

I added the WaitGroups to make the code more reliable

I hold an opposing view on this matter. To me the less code you use to implement a feature is directly proportional to how readable the code is. Also other implementations could make your code more readable but slower, so its often recommended that you evaluate all the choices at hand.

@mkingori
Copy link
Author

mkingori commented Jul 9, 2019

😎

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment