| 
          // Original code with Dispatcher | 
        
        
           | 
          package main | 
        
        
           | 
          
 | 
        
        
           | 
          import ( | 
        
        
           | 
          	_ "expvar" | 
        
        
           | 
          	"flag" | 
        
        
           | 
          	"fmt" | 
        
        
           | 
          	"net/http" | 
        
        
           | 
          	_ "net/http/pprof" | 
        
        
           | 
          	"os" | 
        
        
           | 
          	"time" | 
        
        
           | 
          ) | 
        
        
           | 
          
 | 
        
        
           | 
          // Job holds the attributes needed to perform unit of work. | 
        
        
           | 
          type Job struct { | 
        
        
           | 
          	Name  string | 
        
        
           | 
          	Delay time.Duration | 
        
        
           | 
          } | 
        
        
           | 
          
 | 
        
        
           | 
          // NewWorker creates takes a numeric id and a channel w/ worker pool. | 
        
        
           | 
          func NewWorker(id int, workerPool chan chan Job) Worker { | 
        
        
           | 
          	return Worker{ | 
        
        
           | 
          		id:         id, | 
        
        
           | 
          		jobQueue:   make(chan Job), | 
        
        
           | 
          		workerPool: workerPool, | 
        
        
           | 
          		quitChan:   make(chan bool), | 
        
        
           | 
          	} | 
        
        
           | 
          } | 
        
        
           | 
          
 | 
        
        
           | 
          type Worker struct { | 
        
        
           | 
          	id         int | 
        
        
           | 
          	jobQueue   chan Job | 
        
        
           | 
          	workerPool chan chan Job | 
        
        
           | 
          	quitChan   chan bool | 
        
        
           | 
          } | 
        
        
           | 
          
 | 
        
        
           | 
          func (w Worker) start() { | 
        
        
           | 
          	go func() { | 
        
        
           | 
          		for { | 
        
        
           | 
          			// Add my jobQueue to the worker pool. | 
        
        
           | 
          			w.workerPool <- w.jobQueue | 
        
        
           | 
          
 | 
        
        
           | 
          			select { | 
        
        
           | 
          			case job := <-w.jobQueue: | 
        
        
           | 
          				// Dispatcher has added a job to my jobQueue. | 
        
        
           | 
          				fmt.Printf("worker%d: started %s, blocking for %f seconds\n", w.id, job.Name, job.Delay.Seconds()) | 
        
        
           | 
          				time.Sleep(job.Delay) | 
        
        
           | 
          				fmt.Printf("worker%d: completed %s!\n", w.id, job.Name) | 
        
        
           | 
          			case <-w.quitChan: | 
        
        
           | 
          				// We have been asked to stop. | 
        
        
           | 
          				fmt.Printf("worker%d stopping\n", w.id) | 
        
        
           | 
          				return | 
        
        
           | 
          			} | 
        
        
           | 
          		} | 
        
        
           | 
          	}() | 
        
        
           | 
          } | 
        
        
           | 
          
 | 
        
        
           | 
          func (w Worker) stop() { | 
        
        
           | 
          	go func() { | 
        
        
           | 
          		w.quitChan <- true | 
        
        
           | 
          	}() | 
        
        
           | 
          } | 
        
        
           | 
          
 | 
        
        
           | 
          // NewDispatcher creates, and returns a new Dispatcher object. | 
        
        
           | 
          func NewDispatcher(jobQueue chan Job, maxWorkers int) *Dispatcher { | 
        
        
           | 
          	workerPool := make(chan chan Job, maxWorkers) | 
        
        
           | 
          
 | 
        
        
           | 
          	return &Dispatcher{ | 
        
        
           | 
          		jobQueue:   jobQueue, | 
        
        
           | 
          		maxWorkers: maxWorkers, | 
        
        
           | 
          		workerPool: workerPool, | 
        
        
           | 
          	} | 
        
        
           | 
          } | 
        
        
           | 
          
 | 
        
        
           | 
          type Dispatcher struct { | 
        
        
           | 
          	workerPool chan chan Job | 
        
        
           | 
          	maxWorkers int | 
        
        
           | 
          	jobQueue   chan Job | 
        
        
           | 
          } | 
        
        
           | 
          
 | 
        
        
           | 
          func (d *Dispatcher) run() { | 
        
        
           | 
          	for i := 0; i < d.maxWorkers; i++ { | 
        
        
           | 
          		worker := NewWorker(i+1, d.workerPool) | 
        
        
           | 
          		worker.start() | 
        
        
           | 
          	} | 
        
        
           | 
          
 | 
        
        
           | 
          	go d.dispatch() | 
        
        
           | 
          } | 
        
        
           | 
          
 | 
        
        
           | 
          func (d *Dispatcher) dispatch() { | 
        
        
           | 
          	for { | 
        
        
           | 
          		select { | 
        
        
           | 
          		case job := <-d.jobQueue: | 
        
        
           | 
          			go func() { | 
        
        
           | 
          				fmt.Printf("fetching workerJobQueue for: %s\n", job.Name) | 
        
        
           | 
          				workerJobQueue := <-d.workerPool | 
        
        
           | 
          				fmt.Printf("adding %s to workerJobQueue\n", job.Name) | 
        
        
           | 
          				workerJobQueue <- job | 
        
        
           | 
          			}() | 
        
        
           | 
          		} | 
        
        
           | 
          	} | 
        
        
           | 
          } | 
        
        
           | 
          
 | 
        
        
           | 
          func requestHandler(w http.ResponseWriter, r *http.Request, jobQueue chan Job) { | 
        
        
           | 
          	// Make sure we can only be called with an HTTP POST request. | 
        
        
           | 
          	if r.Method != "POST" { | 
        
        
           | 
          		w.Header().Set("Allow", "POST") | 
        
        
           | 
          		w.WriteHeader(http.StatusMethodNotAllowed) | 
        
        
           | 
          		return | 
        
        
           | 
          	} | 
        
        
           | 
          
 | 
        
        
           | 
          	// Parse the delay. | 
        
        
           | 
          	delay, err := time.ParseDuration(r.FormValue("delay")) | 
        
        
           | 
          	if err != nil { | 
        
        
           | 
          		http.Error(w, "Bad delay value: "+err.Error(), http.StatusBadRequest) | 
        
        
           | 
          		return | 
        
        
           | 
          	} | 
        
        
           | 
          
 | 
        
        
           | 
          	// Validate delay is in range 1 to 10 seconds. | 
        
        
           | 
          	if delay.Seconds() < 1 || delay.Seconds() > 10 { | 
        
        
           | 
          		http.Error(w, "The delay must be between 1 and 10 seconds, inclusively.", http.StatusBadRequest) | 
        
        
           | 
          		return | 
        
        
           | 
          	} | 
        
        
           | 
          
 | 
        
        
           | 
          	// Set name and validate value. | 
        
        
           | 
          	name := r.FormValue("name") | 
        
        
           | 
          	if name == "" { | 
        
        
           | 
          		http.Error(w, "You must specify a name.", http.StatusBadRequest) | 
        
        
           | 
          		return | 
        
        
           | 
          	} | 
        
        
           | 
          
 | 
        
        
           | 
          	// Create Job and push the work onto the jobQueue. | 
        
        
           | 
          	job := Job{Name: name, Delay: delay} | 
        
        
           | 
          	jobQueue <- job | 
        
        
           | 
          
 | 
        
        
           | 
          	// Render success. | 
        
        
           | 
          	w.WriteHeader(http.StatusCreated) | 
        
        
           | 
          } | 
        
        
           | 
          
 | 
        
        
           | 
          func main() { | 
        
        
           | 
          	var ( | 
        
        
           | 
          		maxWorkers   = flag.Int("max_workers", 5, "The number of workers to start") | 
        
        
           | 
          		maxQueueSize = flag.Int("max_queue_size", 100, "The size of job queue") | 
        
        
           | 
          		port         = flag.String("port", "8080", "The server port") | 
        
        
           | 
          	) | 
        
        
           | 
          	flag.Parse() | 
        
        
           | 
          
 | 
        
        
           | 
          	// Create the job queue. | 
        
        
           | 
          	jobQueue := make(chan Job, *maxQueueSize) | 
        
        
           | 
          
 | 
        
        
           | 
          	// Start the dispatcher. | 
        
        
           | 
          	dispatcher := NewDispatcher(jobQueue, *maxWorkers) | 
        
        
           | 
          	dispatcher.run() | 
        
        
           | 
          
 | 
        
        
           | 
          	// Start the HTTP handler. | 
        
        
           | 
          	http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) { | 
        
        
           | 
          		requestHandler(w, r, jobQueue) | 
        
        
           | 
          	}) | 
        
        
           | 
          	log.Fatal(http.ListenAndServe(":"+*port, nil)) | 
        
        
           | 
          } |