Skip to content

Instantly share code, notes, and snippets.

@mkingori
Last active July 9, 2019 12:44
Show Gist options
  • Select an option

  • Save mkingori/ea0136957fce414742e48a40fede0ede to your computer and use it in GitHub Desktop.

Select an option

Save mkingori/ea0136957fce414742e48a40fede0ede to your computer and use it in GitHub Desktop.

Revisions

  1. mkingori revised this gist Jul 9, 2019. 1 changed file with 6 additions and 7 deletions.
    13 changes: 6 additions & 7 deletions workerpool.go
    Original file line number Diff line number Diff line change
    @@ -3,7 +3,6 @@ package main
    import (
    "fmt"
    "log"
    "sync"
    "time"
    )

    @@ -14,7 +13,7 @@ var (

    // Pool holds the params for the worker pool
    type Pool struct {
    WG *sync.WaitGroup
    // WG *sync.WaitGroup

    Jobs chan int
    Results chan int
    @@ -24,7 +23,7 @@ type Pool struct {
    // NewPool initializes the worker pool
    func NewPool() *Pool {
    return &Pool{
    WG: &sync.WaitGroup{},
    // WG: &sync.WaitGroup{},
    Jobs: make(chan int, maxPoolQueue),
    Results: make(chan int, maxPoolQueue),
    Errors: make(chan error, maxPoolQueue),
    @@ -39,7 +38,7 @@ func (p *Pool) Run() error {

    for j := 1; j <= maxPoolQueue; j++ {
    p.Jobs <- j
    p.WG.Add(1)
    // p.WG.Add(1)
    }

    close(p.Jobs)
    @@ -50,11 +49,11 @@ func (p *Pool) Run() error {
    return err
    default:
    fmt.Println(<-p.Results)
    p.WG.Done()
    // p.WG.Done()
    }
    }

    p.WG.Wait()
    // p.WG.Wait()

    return nil
    }
    @@ -83,4 +82,4 @@ func main() {
    if err := p.Run(); err != nil {
    log.Fatal(err)
    }
    }
    }
  2. mkingori created this gist Jul 9, 2019.
    86 changes: 86 additions & 0 deletions workerpool.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,86 @@
    package main

    import (
    "fmt"
    "log"
    "sync"
    "time"
    )

    var (
    maxPoolQueue = 100
    maxPoolWorker = 3
    )

    // Pool holds the params for the worker pool
    type Pool struct {
    WG *sync.WaitGroup

    Jobs chan int
    Results chan int
    Errors chan error
    }

    // NewPool initializes the worker pool
    func NewPool() *Pool {
    return &Pool{
    WG: &sync.WaitGroup{},
    Jobs: make(chan int, maxPoolQueue),
    Results: make(chan int, maxPoolQueue),
    Errors: make(chan error, maxPoolQueue),
    }
    }

    // Run runs the worker pool
    func (p *Pool) Run() error {
    for w := 1; w <= maxPoolWorker; w++ {
    go p.worker(w)
    }

    for j := 1; j <= maxPoolQueue; j++ {
    p.Jobs <- j
    p.WG.Add(1)
    }

    close(p.Jobs)

    for a := 1; a <= maxPoolQueue; a++ {
    select {
    case err := <-p.Errors:
    return err
    default:
    fmt.Println(<-p.Results)
    p.WG.Done()
    }
    }

    p.WG.Wait()

    return nil
    }

    func (p *Pool) worker(id int) {

    for j := range p.Jobs {

    fmt.Println("Worker", id, "processing job", j)

    // Simulation of error to be returned
    if j == 20 {
    p.Errors <- fmt.Errorf("Error occured on job %d", j)
    continue
    }

    p.Results <- j * 2
    time.Sleep(time.Second)
    }
    }

    func main() {

    p := NewPool()

    if err := p.Run(); err != nil {
    log.Fatal(err)
    }
    }