Skip to content

Instantly share code, notes, and snippets.

@blinkinglight
Forked from harlow/golang_job_queue.md
Created September 4, 2016 17:03
Show Gist options
  • Select an option

  • Save blinkinglight/8c83f72672c86e50076554d375b59d99 to your computer and use it in GitHub Desktop.

Select an option

Save blinkinglight/8c83f72672c86e50076554d375b59d99 to your computer and use it in GitHub Desktop.
Job queues in Golang

Job queues in Golang

A running example of the code from:

http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang http://nesv.github.io/golang/2014/02/25/worker-queues-in-go.html

I made a few adjustments to the code:

  • Use non-exported private methods
  • Remove global variables
  • Bring the flags closer to their usage in main()

Usage

Run the application:

PORT=5000 go run main.go -max_workers 5

Use flags to adjust the max_workers and max_queue_size to override the default values.

Curl from another terminal window:

for i in {1..15}; do curl localhost:5000/work -d name=$USER$i -d delay=$(expr $i % 5)s; done
package main
import (
"flag"
"fmt"
"net/http"
"os"
"time"
)
type Job struct {
Name string
Delay time.Duration
}
// NewDispatcher creates, and returns a new Dispatcher object.
func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher {
workerPool := make(chan chan Job, maxWorkers)
return &Dispatcher{
jobQueue: jobQueue,
maxWorkers: maxWorkers,
workerPool: workerPool,
}
}
type Dispatcher struct {
workerPool chan chan Job
maxWorkers int
jobQueue chan Job
}
func (d *Dispatcher) run() {
for i := 0; i < d.maxWorkers; i++ {
fmt.Println("Starting worker:", i+1)
worker := NewWorker(i+1, d.workerPool)
worker.start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-d.jobQueue:
go func() {
fmt.Printf("adding %s to workerJobQueue\n", job.Name)
workerJobQueue := <-d.workerPool
workerJobQueue <- job
}()
}
}
}
// NewWorker creates, and returns a new Worker object.
// It takes an id for identification and a reference to the
// worker pool from the dispatcher.
func NewWorker(id int, workerPool chan chan Job) Worker {
return Worker{
id: id,
jobQueue: make(chan Job),
workerPool: workerPool,
quitChan: make(chan bool),
}
}
type Worker struct {
id int
jobQueue chan Job
workerPool chan chan Job
quitChan chan bool
}
// This function "starts" the worker by starting a goroutine, that is
// an infinite "for-select" loop.
func (w Worker) start() {
go func() {
for {
// Add my job queue into the worker pool.
w.workerPool <- w.jobQueue
select {
case job := <-w.jobQueue:
// Wait for dispatcher to add job to jobQueue
fmt.Printf("worker%d: started job %s, blocking for %f seconds\n", w.id, job.Name, job.Delay.Seconds())
time.Sleep(job.Delay)
fmt.Printf("worker%d: completed %s!\n", w.id, job.Name)
case <-w.quitChan:
// We have been asked to stop
fmt.Printf("worker%d stopping\n", w.id)
return
}
}
}()
}
// Stop tells the worker to stop listening for work requests.
func (w Worker) stop() {
go func() {
w.quitChan <- true
}()
}
func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) {
// Make sure we can only be called with an HTTP POST request.
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Parse the delay.
delay, err := time.ParseDuration(r.FormValue("delay"))
if err != nil {
http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest)
return
}
// Validate delay is in range 1 to 10 seconds.
if delay.Seconds() < 1 || delay.Seconds() > 10 {
http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest)
return
}
// Set name and validate value.
name := r.FormValue("name")
if name == "" {
http.Error(w, "You must specify a name.", http.StatusBadRequest)
return
}
// Create Job and push the work onto the jobQueue.
job := Job{Name: name, Delay: delay}
jobQueue <- job
// Render success.
w.WriteHeader(http.StatusCreated)
}
func main() {
// Parse the command-line flags.
maxWorkers := flag.Int("max_workers", 5, "The number of workers to start")
maxQueueSize := flag.Int("max_queue_size", 100, "The size of job queue")
flag.Parse()
// Create the job queue.
jobQueue := make(chan Job, *maxQueueSize)
// Start the dispatcher.
dispatcher := NewDispatcher(jobQueue, *maxWorkers)
dispatcher.run()
// Start the HTTP handler.
http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) {
requestHandler(w, r, jobQueue)
})
http.ListenAndServe(":"+os.Getenv("PORT"), nil)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment