Created
October 23, 2021 18:51
-
-
Save YuriyNasretdinov/be43cf200c2c3a160f13e3b5d784aa3c to your computer and use it in GitHub Desktop.
Revisions
-
YuriyNasretdinov created this gist
Oct 23, 2021 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,68 @@ package main import ( "flag" "log" "time" "sync" "sync/atomic" "github.com/valyala/fasthttp" ) var ( kittenhouse = flag.Bool("kittenhouse", false, "Whether or not to send INSERTs in kittenhouse format") clickhouse = flag.Bool("clickhouse-async", false, "Whether or not to use clickhouse-async") clickhouseWait = flag.Bool("clickhouse-wait", false, "Whether or not to wait until INSERT has completed before sending a new request") persistent = flag.Bool("persistent", false, "(for kittenhouse) Whether or not use persistent mode") addr = flag.String("addr", "127.0.0.1:8124", "Address of bulk inserter") requests = flag.Int("total-requests", 1000000, "total number of single requests to send") concurrency = flag.Int("concurrency", 5000, "concurrency of the requests") ) func main() { flag.Parse() table := `InsertTest_buffer(id)` postURL := "http://" + (*addr) + "/?query=INSERT%20INTO%20" + table + "%20VALUES" if *clickhouse { waitParam := "wait_for_async_insert=0" if *clickhouseWait { waitParam = "wait_for_async_insert=1" } postURL = "http://" + (*addr) + "/?" + waitParam + "&async_insert=1&query=INSERT%20INTO%20" + table + "%20FORMAT%20TSV" } bodyStr := "(1)" if *clickhouse { bodyStr = "1\n" } body := []byte(bodyStr) start := time.Now() var wg sync.WaitGroup cl := &fasthttp.Client{ MaxConnsPerHost: 50000, } requestsLeft := int32(*requests) log.Printf("Sending %d total requests to %q with concurrency %d", *requests, postURL, *concurrency) for j := 0; j < *concurrency; j++ { wg.Add(1) go func() { for atomic.AddInt32(&requestsLeft, -1) >= 0 { req := fasthttp.AcquireRequest() resp := fasthttp.AcquireResponse() req.Reset() resp.Reset() req.SetBodyRaw(body) req.SetRequestURI(postURL) req.Header.SetMethod("POST") err := cl.Do(req, resp) if err != nil { log.Fatalf("Request failed: %v", err) } if resp.StatusCode() != 200 { log.Fatalf("ClickHouse responded with status code %d: %s", resp.StatusCode(), resp) } } wg.Done() }() } wg.Wait() log.Printf("QPS: %.1f (for %s)", float64((*requests))/(float64(time.Since(start))/float64(time.Second)), time.Since(start)) }