package main import ( "fmt" "sync" // "runtime" ) // turn int arguments into channel func gen(done <-chan struct{}, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, n := range nums { select { case out <- n: case <-done: return } } }() return out } // take a channel of ints and square them func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n: case <-done: return } } }() return out } func infiniteCounter() <-chan int { out := make(chan int) current := 0 go func() { for { out <- current current += 1 } }() return out } // func main() { // runtime.GOMAXPROCS(runtime.NumCPU()) // var wg sync.WaitGroup // counterC := infiniteCounter() // for i := 0; i < 10000; i++ { // wg.Add(1) // go func(id int, counter <-chan int) { // defer wg.Done() // for i := 0; i < 10000; i++ { // value := <-counter // fmt.Printf("Worker %d: %d\n", id, value) // } // }(i, counterC) // } // wg.Wait() // } // func main() { // c := gen(2, 3) // out := sq(c) // fmt.Println(<-out) // fmt.Println(<-out) // } // func main() { // // Set up the pipeline and consume the output. // for n := range sq(sq(gen(2, 3))) { // fmt.Println(n) // 16 then 81 // } // } // func merge(cs ...<-chan int) <-chan int { // var wg sync.WaitGroup // out := make(chan int) // // Start an output goroutine for each input channel in cs. // output := func(c <-chan int) { // for n := range c { // out <- n // } // wg.Done() // } // wg.Add(len(cs)) // for _, c := range cs { // go output(c) // } // go func() { // wg.Wait() // close(out) // }() // return out // } func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } go func() { wg.Wait() close(out) }() return out } func main() { done := make(chan struct{}) defer close(done) in := gen(done, 5, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(done, in) c2 := sq(done, in) // The merge function converts a list of channels to a single channel // by starting a goroutine for each inbound channel that copies // the values to the sole outbound channel. // for n := range merge(c1, c2) { // fmt.Println(n) // } // This is a resource leak: goroutines consume memory and runtime resources, // and heap references in goroutine stacks keep data from being garbage // collected. Goroutines are not garbage collected; they must exit on their own. // out := merge(c1, c2) // fmt.Println(<-out) // 4 or 9 // panic(nil) out := merge(done, c1, c2) fmt.Println(<-out) panic(nil) }