Skip to content

Instantly share code, notes, and snippets.

@YuriyNasretdinov
Created October 23, 2021 18:51
Show Gist options
  • Select an option

  • Save YuriyNasretdinov/be43cf200c2c3a160f13e3b5d784aa3c to your computer and use it in GitHub Desktop.

Select an option

Save YuriyNasretdinov/be43cf200c2c3a160f13e3b5d784aa3c to your computer and use it in GitHub Desktop.

Revisions

  1. YuriyNasretdinov created this gist Oct 23, 2021.
    68 changes: 68 additions & 0 deletions inserter-improved.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,68 @@
    package main
    import (
    "flag"
    "log"
    "time"
    "sync"
    "sync/atomic"
    "github.com/valyala/fasthttp"
    )
    var (
    kittenhouse = flag.Bool("kittenhouse", false, "Whether or not to send INSERTs in kittenhouse format")
    clickhouse = flag.Bool("clickhouse-async", false, "Whether or not to use clickhouse-async")
    clickhouseWait = flag.Bool("clickhouse-wait", false, "Whether or not to wait until INSERT has completed before sending a new request")
    persistent = flag.Bool("persistent", false, "(for kittenhouse) Whether or not use persistent mode")
    addr = flag.String("addr", "127.0.0.1:8124", "Address of bulk inserter")
    requests = flag.Int("total-requests", 1000000, "total number of single requests to send")
    concurrency = flag.Int("concurrency", 5000, "concurrency of the requests")
    )
    func main() {
    flag.Parse()
    table := `InsertTest_buffer(id)`
    postURL := "http://" + (*addr) + "/?query=INSERT%20INTO%20" + table + "%20VALUES"
    if *clickhouse {
    waitParam := "wait_for_async_insert=0"
    if *clickhouseWait {
    waitParam = "wait_for_async_insert=1"
    }
    postURL = "http://" + (*addr) + "/?" + waitParam + "&async_insert=1&query=INSERT%20INTO%20" + table + "%20FORMAT%20TSV"
    }
    bodyStr := "(1)"
    if *clickhouse {
    bodyStr = "1\n"
    }
    body := []byte(bodyStr)
    start := time.Now()
    var wg sync.WaitGroup
    cl := &fasthttp.Client{
    MaxConnsPerHost: 50000,
    }

    requestsLeft := int32(*requests)

    log.Printf("Sending %d total requests to %q with concurrency %d", *requests, postURL, *concurrency)
    for j := 0; j < *concurrency; j++ {
    wg.Add(1)
    go func() {
    for atomic.AddInt32(&requestsLeft, -1) >= 0 {
    req := fasthttp.AcquireRequest()
    resp := fasthttp.AcquireResponse()
    req.Reset()
    resp.Reset()
    req.SetBodyRaw(body)
    req.SetRequestURI(postURL)
    req.Header.SetMethod("POST")
    err := cl.Do(req, resp)
    if err != nil {
    log.Fatalf("Request failed: %v", err)
    }
    if resp.StatusCode() != 200 {
    log.Fatalf("ClickHouse responded with status code %d: %s", resp.StatusCode(), resp)
    }
    }
    wg.Done()
    }()
    }
    wg.Wait()
    log.Printf("QPS: %.1f (for %s)", float64((*requests))/(float64(time.Since(start))/float64(time.Second)), time.Since(start))
    }