Created
October 23, 2021 18:51
-
-
Save YuriyNasretdinov/be43cf200c2c3a160f13e3b5d784aa3c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"flag" | |
"log" | |
"time" | |
"sync" | |
"sync/atomic" | |
"github.com/valyala/fasthttp" | |
) | |
var ( | |
kittenhouse = flag.Bool("kittenhouse", false, "Whether or not to send INSERTs in kittenhouse format") | |
clickhouse = flag.Bool("clickhouse-async", false, "Whether or not to use clickhouse-async") | |
clickhouseWait = flag.Bool("clickhouse-wait", false, "Whether or not to wait until INSERT has completed before sending a new request") | |
persistent = flag.Bool("persistent", false, "(for kittenhouse) Whether or not use persistent mode") | |
addr = flag.String("addr", "127.0.0.1:8124", "Address of bulk inserter") | |
requests = flag.Int("total-requests", 1000000, "total number of single requests to send") | |
concurrency = flag.Int("concurrency", 5000, "concurrency of the requests") | |
) | |
func main() { | |
flag.Parse() | |
table := `InsertTest_buffer(id)` | |
postURL := "http://" + (*addr) + "/?query=INSERT%20INTO%20" + table + "%20VALUES" | |
if *clickhouse { | |
waitParam := "wait_for_async_insert=0" | |
if *clickhouseWait { | |
waitParam = "wait_for_async_insert=1" | |
} | |
postURL = "http://" + (*addr) + "/?" + waitParam + "&async_insert=1&query=INSERT%20INTO%20" + table + "%20FORMAT%20TSV" | |
} | |
bodyStr := "(1)" | |
if *clickhouse { | |
bodyStr = "1\n" | |
} | |
body := []byte(bodyStr) | |
start := time.Now() | |
var wg sync.WaitGroup | |
cl := &fasthttp.Client{ | |
MaxConnsPerHost: 50000, | |
} | |
requestsLeft := int32(*requests) | |
log.Printf("Sending %d total requests to %q with concurrency %d", *requests, postURL, *concurrency) | |
for j := 0; j < *concurrency; j++ { | |
wg.Add(1) | |
go func() { | |
for atomic.AddInt32(&requestsLeft, -1) >= 0 { | |
req := fasthttp.AcquireRequest() | |
resp := fasthttp.AcquireResponse() | |
req.Reset() | |
resp.Reset() | |
req.SetBodyRaw(body) | |
req.SetRequestURI(postURL) | |
req.Header.SetMethod("POST") | |
err := cl.Do(req, resp) | |
if err != nil { | |
log.Fatalf("Request failed: %v", err) | |
} | |
if resp.StatusCode() != 200 { | |
log.Fatalf("ClickHouse responded with status code %d: %s", resp.StatusCode(), resp) | |
} | |
} | |
wg.Done() | |
}() | |
} | |
wg.Wait() | |
log.Printf("QPS: %.1f (for %s)", float64((*requests))/(float64(time.Since(start))/float64(time.Second)), time.Since(start)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment