Created
December 11, 2018 16:13
-
-
Save banks/f8429dfbf6b3c8c0145afeb9be16caa4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"flag" | |
"fmt" | |
"net" | |
"os" | |
"os/signal" | |
"time" | |
"github.com/hashicorp/consul/agent/pool" | |
"github.com/hashicorp/consul/agent/structs" | |
"golang.org/x/time/rate" | |
) | |
func main() { | |
var nBlockers int | |
var nInstances int | |
var instanceChurn float64 | |
var serverAddr = &net.TCPAddr{net.ParseIP("127.0.0.1"), 8300, ""} | |
flag.IntVar(&nBlockers, "c", 10, "number of concurrent blocking query clients to run") | |
flag.IntVar(&nInstances, "n", 1000, "number of service instances to register") | |
flag.Float64Var(&instanceChurn, "r", 1.0, "instance churn rate in changes/s") | |
flag.Parse() | |
connPool := &pool.ConnPool{ | |
SrcAddr: nil, | |
LogOutput: os.Stderr, | |
MaxTime: time.Hour, | |
MaxStreams: 1000000, | |
TLSWrapper: nil, | |
ForceTLS: false, | |
} | |
fmt.Printf("==> Registering %d instances of test service...\n", nInstances) | |
// Register services | |
for i := 0; i < nInstances; i++ { | |
args := structs.RegisterRequest{ | |
Datacenter: "dc1", | |
Node: fmt.Sprintf("test-%08x", i), | |
Address: "127.0.0.1", | |
Service: &structs.NodeService{ | |
Service: "test", | |
Address: "", | |
Port: 10000 + i, | |
}, | |
} | |
var resp struct{} | |
err := connPool.RPC("dc1", serverAddr, 3, | |
"Catalog.Register", false, &args, &resp) | |
if err != nil { | |
panic(err) | |
} | |
} | |
fmt.Printf("==> Registered %d instances of test service\n", nInstances) | |
fmt.Printf("==> Starting %d blocking clients on health endpoint\n", nBlockers) | |
for i := 0; i < nBlockers; i++ { | |
go runBlocker(i, connPool, serverAddr) | |
} | |
fmt.Printf("==> Starting churn process at churn rate %f Hz\n", instanceChurn) | |
go doChurn(connPool, serverAddr, instanceChurn) | |
// Run until signal | |
ch := make(chan os.Signal) | |
signal.Notify(ch, os.Kill, os.Interrupt) | |
<-ch | |
} | |
func doChurn(connPool *pool.ConnPool, serverAddr net.Addr, r float64) { | |
lim := rate.NewLimiter(rate.Limit(r), 1) | |
dereg := true | |
for { | |
lim.Wait(context.Background()) | |
// Flip one instance | |
fmt.Printf(" churn dereg: %v\n", dereg) | |
if dereg { | |
args := structs.DeregisterRequest{ | |
Datacenter: "dc1", | |
Node: fmt.Sprintf("test-%08x", 0), | |
ServiceID: "test", | |
} | |
var resp struct{} | |
err := connPool.RPC("dc1", serverAddr, 3, | |
"Catalog.Deregister", false, &args, &resp) | |
if err != nil { | |
panic(err) | |
} | |
} else { | |
args := structs.RegisterRequest{ | |
Datacenter: "dc1", | |
Node: fmt.Sprintf("test-%08x", 0), | |
Address: "127.0.0.1", | |
Service: &structs.NodeService{ | |
Service: "test", | |
Address: "", | |
Port: 10000, | |
}, | |
} | |
var resp struct{} | |
err := connPool.RPC("dc1", serverAddr, 3, | |
"Catalog.Register", false, &args, &resp) | |
if err != nil { | |
panic(err) | |
} | |
} | |
dereg = !dereg | |
} | |
} | |
func runBlocker(n int, connPool *pool.ConnPool, serverAddr net.Addr) { | |
args := structs.ServiceSpecificRequest{ | |
Datacenter: "dc1", | |
ServiceName: "test", | |
Source: structs.QuerySource{ | |
Datacenter: "dc1", | |
Node: "test-1", | |
}, | |
QueryOptions: structs.QueryOptions{ | |
MaxQueryTime: 10 * time.Minute, | |
}, | |
} | |
for { | |
var resp *structs.IndexedCheckServiceNodes | |
err := connPool.RPC("dc1", serverAddr, 3, | |
"Health.ServiceNodes", false, &args, &resp) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Printf(" %08d Got blocking response at index %d\n", n, resp.Index) | |
args.QueryOptions.MinQueryIndex = resp.Index | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment