-
-
Save blinkinglight/8c83f72672c86e50076554d375b59d99 to your computer and use it in GitHub Desktop.
Revisions
-
harlow revised this gist
Dec 22, 2015 . 1 changed file with 53 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,53 @@ package main import ( _ "expvar" "fmt" "math/rand" "sync" "time" ) type job struct { name string duration time.Duration } type worker struct { id int } func (w worker) process(j job) { fmt.Printf("worker%d: started %s, working for %fs\n", w.id, j.name, j.duration.Seconds()) time.Sleep(j.duration) fmt.Printf("worker%d: completed %s!\n", w.id, j.name) } func main() { wg := &sync.WaitGroup{} jobCh := make(chan job) // start workers for i := 0; i < 10; i++ { wg.Add(1) w := worker{i} go func(w worker) { for j := range jobCh { w.process(j) } wg.Done() }(w) } // add jobs to queue for i := 0; i < 100; i++ { name := fmt.Sprintf("job-%d", i) duration := time.Duration(rand.Intn(1000)) * time.Millisecond fmt.Printf("adding: %s %s\n", name, duration) jobCh <- job{name, duration} } // close jobCh and wait for workers to complete close(jobCh) wg.Wait() } -
harlow revised this gist
Dec 22, 2015 . 1 changed file with 0 additions and 54 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,54 +0,0 @@ -
harlow revised this gist
Dec 22, 2015 . 1 changed file with 3 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -27,10 +27,10 @@ The test run with Pprof show performance characteristics remain the same between ## Run the Application Boot either the `worker_original.go` or the `worker_refactored.go` applications. Use flags to adjust the `max_workers` and `max_queue_size` to override the default values. $ go run worker_original.go -max_workers 5 cURL the application from another terminal window: $ for i in {1..15}; do curl localhost:8080/work -d name=job$i -d delay=$(expr $i % 9 + 1)s; done -
harlow revised this gist
Dec 22, 2015 . 4 changed files with 19 additions and 23 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -19,15 +19,15 @@ Simplify the worker queue by removing the `Dispatcher`. * Creates workers directly and passes job queue to them https://gist.github.com/harlow/dbcd639cf8d396a2ab73#file-worker_refactored-go ### Performance The test run with Pprof show performance characteristics remain the same between both examples. ## Run the Application $ go run worker_original.go -max_workers 5 Use flags to adjust the `max_workers` and `max_queue_size` to override the default values. File renamed without changes.This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -20,8 +20,9 @@ type worker struct { } func (w worker) process(j job) { fmt.Printf("worker%d: started %s, working for %f seconds\n", w.id, j.name, j.duration.Seconds()) time.Sleep(j.duration) fmt.Printf("worker%d: completed %s!\n", w.id, j.name) } func requestHandler(jobCh chan job, w http.ResponseWriter, r *http.Request) { This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -18,42 +18,37 @@ type worker struct { } func (w worker) process(j job) { fmt.Printf("worker%d: started %s, working for %f seconds\n", w.id, j.Name, j.Delay.Seconds()) time.Sleep(j.duration) fmt.Printf("worker%d: completed %s!\n", w.id, j.Name) } func main() { wg := &sync.WaitGroup{} jobCh := make(chan job) // start workers for i := 0; i < 10; i++ { wg.Add(1) w := &worker{i} go func(w worker) { for j := range jobCh { w.process(j) } wg.Done() }(w) } // add jobs to queue for i := 0; i < 100; i++ { name := fmt.Sprintf("job-%d", i) duration := time.Duration(rand.Intn(1000)) * time.Millisecond fmt.Printf("adding: %s %s\n", name, duration) jobCh <- job{name, duration} } // close jobCh and wait for workers to be done close(jobCh) wg.Wait() } -
harlow revised this gist
Dec 13, 2015 . 1 changed file with 10 additions and 28 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -15,35 +15,13 @@ type job struct { duration time.Duration } type worker struct { id int } func (w worker) process(j job) { fmt.Printf("worker %d: started %s, duration: %f seconds\n", w.id, j.name, j.duration.Seconds()) time.Sleep(j.duration) } func requestHandler(jobCh chan job, w http.ResponseWriter, r *http.Request) { @@ -99,8 +77,12 @@ func main() { // create workers for i := 0; i < *maxWorkers; i++ { w := worker{i} go func(w worker) { for j := range jobCh { w.process(j) } }(w) } // handler for adding jobs -
harlow revised this gist
Dec 13, 2015 . 1 changed file with 59 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,59 @@ package main import ( _ "expvar" "fmt" "math/rand" "sync" "time" ) type job struct { name string duration time.Duration } type worker struct { id int } func (w worker) process(j job) { fmt.Printf("worker%d: doing %s (%s)\n", w.id, j.name, j.duration) time.Sleep(j.duration) } func startWorkers(jobCh chan job, wg *sync.WaitGroup) { for i := 0; i < 10; i++ { w := &worker{i} wg.Add(1) go func(w *worker) { for j := range jobCh { w.process(j) } wg.Done() }(w) } } func addJobs(jobCh chan job) { for i := 0; i < 100; i++ { name := fmt.Sprintf("job-%d", i) duration := time.Duration(rand.Intn(1000)) * time.Millisecond job := job{name, duration} fmt.Printf("adding: %s %s\n", job.name, job.duration) jobCh <- job } close(jobCh) } func main() { wg := &sync.WaitGroup{} jobCh := make(chan job) // start workers and add jobs startWorkers(jobCh, wg) addJobs(jobCh) // wait for workers to complete wg.Wait() } -
harlow revised this gist
Dec 13, 2015 . 1 changed file with 117 additions and 117 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,159 +2,159 @@ package main import ( _ "expvar" "flag" "fmt" "net/http" _ "net/http/pprof" "os" "time" ) // Job holds the attributes needed to perform unit of work. type Job struct { Name string Delay time.Duration } // NewWorker creates takes a numeric id and a channel w/ worker pool. func NewWorker(id int, workerPool chan chan Job) Worker { return Worker{ id: id, jobQueue: make(chan Job), workerPool: workerPool, quitChan: make(chan bool), } } type Worker struct { id int jobQueue chan Job workerPool chan chan Job quitChan chan bool } func (w Worker) start() { go func() { for { // Add my jobQueue to the worker pool. w.workerPool <- w.jobQueue select { case job := <-w.jobQueue: // Dispatcher has added a job to my jobQueue. fmt.Printf("worker%d: started %s, blocking for %f seconds\n", w.id, job.Name, job.Delay.Seconds()) time.Sleep(job.Delay) fmt.Printf("worker%d: completed %s!\n", w.id, job.Name) case <-w.quitChan: // We have been asked to stop. fmt.Printf("worker%d stopping\n", w.id) return } } }() } func (w Worker) stop() { go func() { w.quitChan <- true }() } // NewDispatcher creates, and returns a new Dispatcher object. func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher { workerPool := make(chan chan Job, maxWorkers) return &Dispatcher{ jobQueue: jobQueue, maxWorkers: maxWorkers, workerPool: workerPool, } } type Dispatcher struct { workerPool chan chan Job maxWorkers int jobQueue chan Job } func (d *Dispatcher) run() { for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(i+1, d.workerPool) worker.start() } go d.dispatch() } func (d *Dispatcher) dispatch() { for { select { case job := <-d.jobQueue: go func() { fmt.Printf("fetching workerJobQueue for: %s\n", job.Name) workerJobQueue := <-d.workerPool fmt.Printf("adding %s to workerJobQueue\n", job.Name) workerJobQueue <- job }() } } } func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) { // Make sure we can only be called with an HTTP POST request. if r.Method != "POST" { w.Header().Set("Allow", "POST") w.WriteHeader(http.StatusMethodNotAllowed) return } // Parse the delay. delay, err := time.ParseDuration(r.FormValue("delay")) if err != nil { http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest) return } // Validate delay is in range 1 to 10 seconds. if delay.Seconds() < 1 || delay.Seconds() > 10 { http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest) return } // Set name and validate value. name := r.FormValue("name") if name == "" { http.Error(w, "You must specify a name.", http.StatusBadRequest) return } // Create Job and push the work onto the jobQueue. job := Job{Name: name, Delay: delay} jobQueue <- job // Render success. w.WriteHeader(http.StatusCreated) } func main() { var ( maxWorkers = flag.Int("max_workers", 5, "The number of workers to start") maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue") port = flag.String("port", "8080", "The server port") ) flag.Parse() // Create the job queue. jobQueue := make(chan Job, *maxQueueSize) // Start the dispatcher. dispatcher := NewDispatcher(jobQueue, *maxWorkers) dispatcher.run() // Start the HTTP handler. http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) { requestHandler(w, r, jobQueue) }) log.Fatal(http.ListenAndServe(":"+*port, nil)) } -
harlow revised this gist
Dec 13, 2015 . 2 changed files with 32 additions and 41 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -12,11 +12,10 @@ Small refactorings made to original code: * Use non-exported private methods * Remove global variables * Bring the flags closer to their usage in main() ### Step 2 Simplify the worker queue by removing the `Dispatcher`. * Creates workers directly and passes job queue to them @@ -34,4 +33,4 @@ Use flags to adjust the `max_workers` and `max_queue_size` to override the defau cURL from another terminal window: $ for i in {1..15}; do curl localhost:8080/work -d name=job$i -d delay=$(expr $i % 9 + 1)s; done This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -10,42 +10,34 @@ import ( "time" ) type job struct { name string duration time.Duration } func newWorker(id int) *worker { return &worker{ id: id, quitChan: make(chan bool), } } type worker struct { id int quitChan chan bool } func (w worker) process(jobCh chan job) { for { select { case j := <-jobCh: fmt.Printf("worker %d: started %s, duration: %f seconds\n", w.id, j.name, j.duration.Seconds()) time.Sleep(j.duration) case <-w.quitChan: fmt.Printf("worker %d: stopped\n", w.id) return } } } func (w worker) stop() { @@ -54,23 +46,23 @@ func (w worker) stop() { }() } func requestHandler(jobCh chan job, w http.ResponseWriter, r *http.Request) { // Make sure we can only be called with an HTTP POST request. if r.Method != "POST" { w.Header().Set("Allow", "POST") w.WriteHeader(http.StatusMethodNotAllowed) return } // Parse the durations. duration, err := time.ParseDuration(r.FormValue("delay")) if err != nil { http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest) return } // Validate delay is in range 1 to 10 seconds. if duration.Seconds() < 1 || duration.Seconds() > 10 { http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest) return } @@ -82,11 +74,11 @@ func requestHandler(jobQueue chan Job, w http.ResponseWriter, r *http.Request) { return } // Create Job and push the work onto the jobCh. job := job{name, duration} go func() { fmt.Printf("added: %s %s\n", job.name, job.duration) jobCh <- job }() // Render success. @@ -102,18 +94,18 @@ func main() { ) flag.Parse() // create job channel jobCh := make(chan job, *maxQueueSize) // create workers for i := 0; i < *maxWorkers; i++ { worker := newWorker(i) go worker.process(jobCh) } // handler for adding jobs http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) { requestHandler(jobCh, w, r) }) log.Fatal(http.ListenAndServe(":"+*port, nil)) } -
harlow revised this gist
Nov 29, 2015 . 1 changed file with 2 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -22,7 +22,7 @@ Simplify the worker queue by removing the `Dispatcher`. https://gist.github.com/harlow/dbcd639cf8d396a2ab73#file-example_worker_only-go ### Performance The test run with Pprof show performance characteristics remain the same between both examples. @@ -34,6 +34,4 @@ Use flags to adjust the `max_workers` and `max_queue_size` to override the defau cURL from another terminal window: $ for i in {1..15}; do curl localhost:8080/work -d name=job$i -d delay=$(expr $i % 9 + 1)s; done -
harlow revised this gist
Nov 29, 2015 . 2 changed files with 2 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -34,6 +34,6 @@ Use flags to adjust the `max_workers` and `max_queue_size` to override the defau cURL from another terminal window: $ for i in {1..15}; do curl localhost:8080/work -d name=job$i -d delay=$(expr $i % 9 + 1)s; done This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -85,6 +85,7 @@ func requestHandler(jobQueue chan Job, w http.ResponseWriter, r *http.Request) { // Create Job and push the work onto the jobQueue. job := Job{Name: name, Delay: delay} go func() { fmt.Printf("added: %s %s\n", job.Name, job.Delay) jobQueue <- job }() -
harlow revised this gist
Nov 29, 2015 . 1 changed file with 1 addition and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,4 +1,3 @@ package main import ( @@ -105,7 +104,7 @@ func main() { // Create the job queue. jobQueue := make(chan Job, *maxQueueSize) // Create new workers with access to jobQueue for i := 0; i < *maxWorkers; i++ { worker := newWorker(i, jobQueue) worker.start() -
harlow revised this gist
Nov 29, 2015 . 2 changed files with 2 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,4 @@ // Original code with Dispatcher package main import ( This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,4 @@ // Refactored code with Dispatcher removed package main import ( -
harlow revised this gist
Nov 29, 2015 . 3 changed files with 115 additions and 100 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,11 +2,12 @@ A running example of the code from: * http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang * http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html ### Step 1 Small refactorings made to original code: * Use non-exported private methods * Remove global variables @@ -15,20 +16,24 @@ I made a few adjustments to the code: ### Step 2 Simplify the worker queue by removing the `Dispatcher`. * Creates workers directly and passes job queue to them https://gist.github.com/harlow/dbcd639cf8d396a2ab73#file-example_worker_only-go ## Performance The test run with Pprof show performance characteristics remain the same between both examples. ## Run the Application $ go run example_dispatcher.go -max_workers 5 Use flags to adjust the `max_workers` and `max_queue_size` to override the default values. cURL from another terminal window: $ for i in {1..15}; do curl localhost:8080/work -d name=$USER$i -d delay=$(expr $i % 9 + 1)s; done This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -138,8 +138,9 @@ func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) { func main() { var ( maxWorkers = flag.Int("max_workers", 5, "The number of workers to start") maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue") port = flag.String("port", "8080", "The server port") ) flag.Parse() @@ -154,5 +155,5 @@ func main() { http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) { requestHandler(w, r, jobQueue) }) log.Fatal(http.ListenAndServe(":"+*port, nil)) } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,109 +1,118 @@ package main import ( _ "expvar" "flag" "fmt" "log" "net/http" _ "net/http/pprof" "time" ) // Job holds the attributes needed to perform unit of work. type Job struct { Name string Delay time.Duration } func newWorker(id int, jobQueue chan Job) worker { return worker{ id: id, jobQueue: jobQueue, quitChan: make(chan bool), } } type worker struct { id int jobQueue chan Job quitChan chan bool } func (w worker) start() { go func() { for { select { case job := <-w.jobQueue: // Dispatcher has added a job to my jobQueue. fmt.Printf("worker%d: started %s, blocking for %f seconds\n", w.id, job.Name, job.Delay.Seconds()) time.Sleep(job.Delay) fmt.Printf("worker%d: completed %s!\n", w.id, job.Name) case <-w.quitChan: // We have been asked to stop. fmt.Printf("worker%d stopping\n", w.id) return } } }() } func (w worker) stop() { go func() { w.quitChan <- true }() } func requestHandler(jobQueue chan Job, w http.ResponseWriter, r *http.Request) { // Make sure we can only be called with an HTTP POST request. if r.Method != "POST" { w.Header().Set("Allow", "POST") w.WriteHeader(http.StatusMethodNotAllowed) return } // Parse the delay. delay, err := time.ParseDuration(r.FormValue("delay")) if err != nil { http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest) return } // Validate delay is in range 1 to 10 seconds. if delay.Seconds() < 1 || delay.Seconds() > 10 { http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest) return } // Set name and validate value. name := r.FormValue("name") if name == "" { http.Error(w, "You must specify a name.", http.StatusBadRequest) return } // Create Job and push the work onto the jobQueue. job := Job{Name: name, Delay: delay} go func() { jobQueue <- job }() // Render success. w.WriteHeader(http.StatusCreated) return } func main() { var ( maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue") maxWorkers = flag.Int("max_workers", 5, "The number of workers to start") port = flag.String("port", "8080", "The server port") ) flag.Parse() // Create the job queue. jobQueue := make(chan Job, *maxQueueSize) // Create new workers and add to queue for i := 0; i < *maxWorkers; i++ { worker := newWorker(i, jobQueue) worker.start() } // Start the HTTP handler. http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) { requestHandler(jobQueue, w, r) }) log.Fatal(http.ListenAndServe(":"+*port, nil)) } -
harlow revised this gist
Nov 29, 2015 . 1 changed file with 4 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -86,8 +86,10 @@ func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) { func main() { // Parse the command-line flags. var( maxWorkers = flag.Int("max_workers", 5, "The number of workers to start") maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue") ) flag.Parse() // Create the job queue. -
harlow revised this gist
Nov 29, 2015 . 1 changed file with 8 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -4,13 +4,21 @@ A running example of the code from: http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html ### Step 1 I made a few adjustments to the code: * Use non-exported private methods * Remove global variables * Bring the flags closer to their usage in main() * Usage ### Step 2 Removing the `Dispatcher`. A simplified example that removes the dispatcher and creates workers directly. https://gist.github.com/harlow/dbcd639cf8d396a2ab73#file-example_worker_only-go ## Run the Application $ PORT=5000 go run example_dispatcher.go -max_workers 5 @@ -21,10 +29,6 @@ Curl from another terminal window: $ for i in {1..15}; do curl localhost:5001/work -d name=$USER$i -d delay=$(expr $i % 9 + 1)s; done ## Performance From what I can tell using Pprof the performance characteristics seem to remain the same between both examples. -
harlow revised this gist
Nov 29, 2015 . 1 changed file with 4 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -137,9 +137,10 @@ func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) { } func main() { var ( maxWorkers = flag.Int("max_workers", 5, "The number of workers to start") maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue") ) flag.Parse() // Create the job queue. -
harlow revised this gist
Nov 29, 2015 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -15,7 +15,7 @@ I made a few adjustments to the code: $ PORT=5000 go run example_dispatcher.go -max_workers 5 Use flags to adjust the `max_workers` and `max_queue_size` to override the default values. Curl from another terminal window: -
harlow revised this gist
Nov 29, 2015 . 1 changed file with 4 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -6,10 +6,10 @@ http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang http I made a few adjustments to the code: * Use non-exported private methods * Remove global variables * Bring the flags closer to their usage in main() * Usage ## Run the Application -
harlow revised this gist
Jul 13, 2015 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -11,7 +11,7 @@ Remove global variables Bring the flags closer to their usage in main() Usage ## Run the Application $ PORT=5000 go run example_dispatcher.go -max_workers 5 @@ -21,7 +21,7 @@ Curl from another terminal window: $ for i in {1..15}; do curl localhost:5001/work -d name=$USER$i -d delay=$(expr $i % 9 + 1)s; done ## Remove the Dispatcher A simplified example that skips the dispatcher and creates workers directly. See `example_worker_only.go` for code. -
harlow revised this gist
Jul 13, 2015 . 1 changed file with 6 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -11,7 +11,7 @@ Remove global variables Bring the flags closer to their usage in main() Usage ## Run the application $ PORT=5000 go run example_dispatcher.go -max_workers 5 @@ -23,4 +23,8 @@ Curl from another terminal window: ## Remove Dispatcher A simplified example that skips the dispatcher and creates workers directly. See `example_worker_only.go` for code. ## Performance From what I can tell using Pprof the performance characteristics seem to remain the same between both examples. -
harlow revised this gist
Jul 13, 2015 . 1 changed file with 3 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,4 +1,4 @@ # Golang Job Queue A running example of the code from: @@ -21,6 +21,6 @@ Curl from another terminal window: $ for i in {1..15}; do curl localhost:5001/work -d name=$USER$i -d delay=$(expr $i % 9 + 1)s; done ## Remove Dispatcher A simplified example that skips the dispatcher and creates workers directly. See `example_worker_only.go` for code. The performance characteristics seem to remain the same. -
harlow revised this gist
Jul 13, 2015 . 4 changed files with 173 additions and 188 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,31 +2,25 @@ A running example of the code from: http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html I made a few adjustments to the code: Use non-exported private methods Remove global variables Bring the flags closer to their usage in main() Usage ## Run the application: $ PORT=5000 go run example_dispatcher.go -max_workers 5 Use flags to adjust the max_workers and max_queue_size to override the default values. Curl from another terminal window: $ for i in {1..15}; do curl localhost:5001/work -d name=$USER$i -d delay=$(expr $i % 9 + 1)s; done Remove Dispatcher A simplified example that skips the dispatcher and creates workers directly. See main_worker_only.go the performance characteristics seem to remain the same. This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,159 +0,0 @@ This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,157 @@ package main import ( "flag" "fmt" "net/http" "os" "time" _ "expvar" _ "net/http/pprof" ) // Job holds the attributes needed to perform unit of work. type Job struct { Name string Delay time.Duration } // NewWorker creates takes a numeric id and a channel w/ worker pool. func NewWorker(id int, workerPool chan chan Job) Worker { return Worker{ id: id, jobQueue: make(chan Job), workerPool: workerPool, quitChan: make(chan bool), } } type Worker struct { id int jobQueue chan Job workerPool chan chan Job quitChan chan bool } func (w Worker) start() { go func() { for { // Add my jobQueue to the worker pool. w.workerPool <- w.jobQueue select { case job := <-w.jobQueue: // Dispatcher has added a job to my jobQueue. fmt.Printf("worker%d: started %s, blocking for %f seconds\n", w.id, job.Name, job.Delay.Seconds()) time.Sleep(job.Delay) fmt.Printf("worker%d: completed %s!\n", w.id, job.Name) case <-w.quitChan: // We have been asked to stop. fmt.Printf("worker%d stopping\n", w.id) return } } }() } func (w Worker) stop() { go func() { w.quitChan <- true }() } // NewDispatcher creates, and returns a new Dispatcher object. func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher { workerPool := make(chan chan Job, maxWorkers) return &Dispatcher{ jobQueue: jobQueue, maxWorkers: maxWorkers, workerPool: workerPool, } } type Dispatcher struct { workerPool chan chan Job maxWorkers int jobQueue chan Job } func (d *Dispatcher) run() { for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(i+1, d.workerPool) worker.start() } go d.dispatch() } func (d *Dispatcher) dispatch() { for { select { case job := <-d.jobQueue: go func() { fmt.Printf("fetching workerJobQueue for: %s\n", job.Name) workerJobQueue := <-d.workerPool fmt.Printf("adding %s to workerJobQueue\n", job.Name) workerJobQueue <- job }() } } } func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) { // Make sure we can only be called with an HTTP POST request. if r.Method != "POST" { w.Header().Set("Allow", "POST") w.WriteHeader(http.StatusMethodNotAllowed) return } // Parse the delay. delay, err := time.ParseDuration(r.FormValue("delay")) if err != nil { http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest) return } // Validate delay is in range 1 to 10 seconds. if delay.Seconds() < 1 || delay.Seconds() > 10 { http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest) return } // Set name and validate value. name := r.FormValue("name") if name == "" { http.Error(w, "You must specify a name.", http.StatusBadRequest) return } // Create Job and push the work onto the jobQueue. job := Job{Name: name, Delay: delay} jobQueue <- job // Render success. w.WriteHeader(http.StatusCreated) } func main() { // Parse the command-line flags. maxWorkers := flag.Int("max_workers", 5, "The number of workers to start") maxQueueSize := flag.Int("max_queue_size", 100, "The size of job queue") flag.Parse() // Create the job queue. jobQueue := make(chan Job, *maxQueueSize) // Start the dispatcher. dispatcher := NewDispatcher(jobQueue, *maxWorkers) dispatcher.run() // Start the HTTP handler. http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) { requestHandler(w, r, jobQueue) }) http.ListenAndServe(":"+os.Getenv("PORT"), nil) } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,5 +1,3 @@ package main import ( @@ -8,6 +6,8 @@ import ( "net/http" "os" "time" _ "expvar" _ "net/http/pprof" ) // Job holds the attributes needed to perform unit of work. @@ -36,23 +36,15 @@ func (w Worker) start() { for { select { case job := <-w.jobQueue: // Dispatcher has added a job to my jobQueue. fmt.Printf("worker%d: started %s, blocking for %f seconds\n", w.id, job.Name, job.Delay.Seconds()) time.Sleep(job.Delay) fmt.Printf("worker%d: completed %s!\n", w.id, job.Name) } } }() } func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) { // Make sure we can only be called with an HTTP POST request. if r.Method != "POST" { @@ -86,9 +78,10 @@ func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) { go func() { jobQueue <- job }() // Render success. w.WriteHeader(http.StatusCreated) return } func main() { -
harlow renamed this gist
Jul 13, 2015 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
harlow revised this gist
Jul 13, 2015 . 2 changed files with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes.File renamed without changes. -
harlow revised this gist
Jul 13, 2015 . 2 changed files with 4 additions and 2 deletions.There are no files selected for viewing
File renamed without changes.This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -83,8 +83,10 @@ func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) { // Create Job and push the work onto the jobQueue. job := Job{Name: name, Delay: delay} go func() { jobQueue <- job }() // Render success. w.WriteHeader(http.StatusCreated) } -
harlow revised this gist
Jul 12, 2015 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -27,6 +27,6 @@ Curl from another terminal window: for i in {1..15}; do curl localhost:5000/work -d name=$USER$i -d delay=$(expr $i % 5)s; done ``` ## Remove Dispatcher A simplified example that skips the dispatcher and creates workers directly. See `main_worker_only.go` the performance characteristics seem to remain the same. -
harlow revised this gist
Jul 12, 2015 . 2 changed files with 117 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -25,4 +25,8 @@ Curl from another terminal window: ``` for i in {1..15}; do curl localhost:5000/work -d name=$USER$i -d delay=$(expr $i % 5)s; done ``` ## Worker Only Queue A simplified example that skips the dispatcher and creates workers directly. See `main_worker_only.go` the performance characteristics seem to remain the same. This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,112 @@ // Removed dispatcher and create workers directly in main() package main import ( "flag" "fmt" "net/http" "os" "time" ) // Job holds the attributes needed to perform unit of work. type Job struct { Name string Delay time.Duration } // NewWorker creates takes a numeric id and a channel w/ worker pool. func NewWorker(id int, jobQueue chan Job) Worker { return Worker{ id: id, jobQueue: jobQueue, quitChan: make(chan bool), } } type Worker struct { id int jobQueue chan Job quitChan chan bool } func (w Worker) start() { go func() { for { select { case job := <-w.jobQueue: fmt.Printf("worker%d: started %s, blocking for %f seconds\n", w.id, job.Name, job.Delay.Seconds()) time.Sleep(job.Delay) fmt.Printf("worker%d: completed %s!\n", w.id, job.Name) case <-w.quitChan: fmt.Printf("worker%d stopping\n", w.id) return } } }() } func (w Worker) stop() { go func() { w.quitChan <- true }() } func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) { // Make sure we can only be called with an HTTP POST request. if r.Method != "POST" { w.Header().Set("Allow", "POST") w.WriteHeader(http.StatusMethodNotAllowed) return } // Parse the delay. delay, err := time.ParseDuration(r.FormValue("delay")) if err != nil { http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest) return } // Validate delay is in range 1 to 10 seconds. if delay.Seconds() < 1 || delay.Seconds() > 10 { http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest) return } // Set name and validate value. name := r.FormValue("name") if name == "" { http.Error(w, "You must specify a name.", http.StatusBadRequest) return } // Create Job and push the work onto the jobQueue. job := Job{Name: name, Delay: delay} jobQueue <- job // Render success. w.WriteHeader(http.StatusCreated) } func main() { // Parse the command-line flags. maxWorkers := flag.Int("max_workers", 5, "The number of workers to start") maxQueueSize := flag.Int("max_queue_size", 100, "The size of job queue") flag.Parse() // Create the job queue. jobQueue := make(chan Job, *maxQueueSize) // Start the dispatcher. for i := 0; i < *maxWorkers; i++ { worker := NewWorker(i, jobQueue) worker.start() } // Start the HTTP handler. http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) { requestHandler(w, r, jobQueue) }) http.ListenAndServe(":"+os.Getenv("PORT"), nil) } -
harlow revised this gist
Jul 12, 2015 . 1 changed file with 7 additions and 12 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -45,7 +45,7 @@ func (d *Dispatcher) dispatch() { select { case job := <-d.jobQueue: go func() { fmt.Printf("adding %s to workerJobQueue\n", job.Name) workerJobQueue := <-d.workerPool workerJobQueue <- job }() @@ -83,9 +83,9 @@ func (w Worker) start() { select { case job := <-w.jobQueue: // Wait for dispatcher to add job to jobQueue fmt.Printf("worker%d: started job %s, blocking for %f seconds\n", w.id, job.Name, job.Delay.Seconds()) time.Sleep(job.Delay) fmt.Printf("worker%d: completed %s!\n", w.id, job.Name) case <-w.quitChan: // We have been asked to stop fmt.Printf("worker%d stopping\n", w.id) @@ -117,29 +117,24 @@ func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) { return } // Validate delay is in range 1 to 10 seconds. if delay.Seconds() < 1 || delay.Seconds() > 10 { http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest) return } // Set name and validate value. name := r.FormValue("name") if name == "" { http.Error(w, "You must specify a name.", http.StatusBadRequest) return } // Create Job and push the work onto the jobQueue. job := Job{Name: name, Delay: delay} jobQueue <- job // Render success. w.WriteHeader(http.StatusCreated) } -
harlow revised this gist
Jul 12, 2015 . 1 changed file with 0 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -10,7 +10,6 @@ I made a few adjustments to the code: * Use non-exported private methods * Remove global variables * Bring the flags closer to their usage in main() ## Usage -
harlow revised this gist
Jul 12, 2015 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -10,6 +10,7 @@ I made a few adjustments to the code: * Use non-exported private methods * Remove global variables * Bring the flags closer to their usage in main() * Give workers an ID they can log with ## Usage
NewerOlder