package main import ( "bytes" "fmt" "net" "runtime" "sync" "sync/atomic" "time" "github.com/valyala/fasthttp" ) var ( // Important to do these conversions only once an not // every time we need them. strSlashA = []byte("/a") strSlashB = []byte("/b") strDefaultBody = []byte("test") strDestURLSlashA = []byte("http://127.0.0.1:1337/a") strDestURLSlashB = []byte("http://127.0.0.1:1337/b") client = &fasthttp.Client{ NoDefaultUserAgentHeader: true, // Don't send: User-Agent: fasthttp MaxConnsPerHost: 10000, ReadBufferSize: 4096, // Make sure to set this big enough that your whole request can be read at once. WriteBufferSize: 4096, // Same but for your response. ReadTimeout: time.Second, WriteTimeout: time.Second, MaxIdleConnDuration: time.Minute, DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this. } // Put everything in pools to prevent garbage. bytesPool = sync.Pool{ New: func() interface{} { b := make([]byte, 0) return &b }, } responsePool = sync.Pool{ New: func() interface{} { return make(chan *fasthttp.Response) }, } // Our request counters. aRequests int64 bRequests int64 ) func handler(ctx *fasthttp.RequestCtx) { path := ctx.Path() // Requests to /a will trigger 10 requests to /b if bytes.HasPrefix(path, strSlashA) { atomic.AddInt64(&aRequests, 1) // This is the proper way to handle a []byte in a pool. _combinedBodies := bytesPool.Get().(*[]byte) combinedBodies := (*_combinedBodies)[:0] defer func() { *_combinedBodies = combinedBodies bytesPool.Put(_combinedBodies) }() c := responsePool.Get().(chan *fasthttp.Response) defer responsePool.Put(c) for i := 0; i < 10; i++ { go func() { req := fasthttp.AcquireRequest() res := fasthttp.AcquireResponse() req.SetRequestURIBytes(strDestURLSlashB) if err := fasthttp.Do(req, res); err != nil { println(err.Error()) } fasthttp.ReleaseRequest(req) // Don't release res here, that will be done at the other side of the channel. c <- res }() } // We know we started 10 goroutines, so also read 10 responses from the channel. for i := 0; i < 10; i++ { res := <-c if res.StatusCode() != fasthttp.StatusOK { println(res.StatusCode()) } else { // Combine all response bodies. combinedBodies = append(combinedBodies, res.Body()...) fasthttp.ReleaseResponse(res) } } // Send back all 10 response bodies combined. ctx.SetBody(combinedBodies) ctx.Response.SetStatusCode(fasthttp.StatusOK) } else if bytes.HasPrefix(path, strSlashB) { atomic.AddInt64(&bRequests, 1) // Just send some default body. ctx.SetBody(strDefaultBody) ctx.Response.SetStatusCode(fasthttp.StatusOK) } else { ctx.Response.SetStatusCode(fasthttp.StatusNotFound) } } func main() { // Completely disable memory profiling if we aren't going to use it. // If we don't do this the profiler will take a sample every 0.5MiB bytes allocated. runtime.MemProfileRate = 0 ln, err := net.Listen("tcp4", "127.0.0.1:1337") if err != nil { panic(err) } defer ln.Close() s := &fasthttp.Server{ Handler: handler, NoDefaultServerHeader: true, // Don't send Server: fasthttp ReadBufferSize: 4096, // Make sure these are big enough. WriteBufferSize: 4096, ReadTimeout: time.Second, WriteTimeout: time.Second, IdleTimeout: time.Minute, // This can be long for keep-alive connections. DisableHeaderNamesNormalizing: true, // If you're not going to look at headers or know the casing you can set this. NoDefaultContentType: true, // Don't send Content-Type: text/plain if no Content-Type is set manually. } go func() { if err := s.Serve(ln); err != nil { panic(err) } }() for i := 0; i < 4; i++ { go func() { for { req := fasthttp.AcquireRequest() res := fasthttp.AcquireResponse() req.SetRequestURIBytes(strDestURLSlashA) if err := client.Do(req, res); err != nil { println(err.Error()) } if len(res.Body()) != 10*len(strDefaultBody) { println(string(res.Body())) } fasthttp.ReleaseRequest(req) fasthttp.ReleaseResponse(res) } }() } // Print the request counters every second. for { time.Sleep(time.Second) a := atomic.SwapInt64(&aRequests, 0) b := atomic.SwapInt64(&bRequests, 0) fmt.Println(a, b, runtime.NumGoroutine()) } }