Last active
March 4, 2023 19:22
-
-
Save akhenakh/38dbfea70dc36964e23acc19777f3869 to your computer and use it in GitHub Desktop.
Example of graceful shutdown with grpc healthserver * httpserver
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
readinessProbe: | |
exec: | |
command: ["/root/grpc_health_probe", "-addr=:6666"] | |
initialDelaySeconds: 1 | |
livenessProbe: | |
exec: | |
command: ["/root/grpc_health_probe", "-addr=:6666"] | |
initialDelaySeconds: 2 | |
imagePullPolicy: IfNotPresent |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
stdlog "log" | |
"net" | |
"net/http" | |
"os" | |
"os/signal" | |
"syscall" | |
"time" | |
"github.com/go-kit/kit/log/level" | |
"github.com/namsral/flag" | |
"golang.org/x/sync/errgroup" | |
"google.golang.org/grpc" | |
"google.golang.org/grpc/health" | |
healthpb "google.golang.org/grpc/health/grpc_health_v1" | |
"google.golang.org/grpc/keepalive" | |
) | |
const serviceName = "myserviced" | |
var ( | |
version = "no version" | |
httpPort = flag.Int("httpPort", 8888, "http port") | |
grpcPort = flag.Int("grpcPort", 9200, "grpc port") | |
healthPort = flag.Int("healthPort", 6666, "grpc health port") | |
logLevel = flag.String("logLevel", "INFO", "Log level, INFO|WARNING|DEBUG|ERROR") | |
gcpProjectID = flag.String("gcpProjectID", "none", "GCP Project ID") | |
dev = flag.Bool("dev", false, "Enable development tooling") | |
grpcServer *grpc.Server | |
grpcHealthServer *grpc.Server | |
httpServer *http.Server | |
) | |
func main() { | |
flag.Parse() | |
logger, err := probe.InitLogger(serviceName, *logLevel) | |
if err != nil { | |
stdlog.Fatal("can't init logger", err) | |
} | |
level.Info(logger).Log("msg", "Starting app", "version", version) | |
ctx := context.Background() | |
ctx, cancel := context.WithCancel(ctx) | |
defer cancel() | |
interrupt := make(chan os.Signal, 1) | |
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) | |
defer signal.Stop(interrupt) | |
g, ctx := errgroup.WithContext(ctx) | |
// web server metrics | |
g.Go(func() error { | |
httpServer = &http.Server{ | |
Addr: fmt.Sprintf(":%d", *httpPort), | |
ReadTimeout: 10 * time.Second, | |
WriteTimeout: 10 * time.Second, | |
} | |
level.Info(logger).Log("msg", fmt.Sprintf("HTTP Metrics server serving at :%d", *httpPort)) | |
if err := httpServer.ListenAndServe(); err != http.ErrServerClosed { | |
return err | |
} | |
return nil | |
}) | |
// gRPC Health Server | |
healthServer := health.NewServer() | |
g.Go(func() error { | |
grpcHealthServer = grpc.NewServer() | |
healthpb.RegisterHealthServer(grpcHealthServer, healthServer) | |
haddr := fmt.Sprintf(":%d", *healthPort) | |
hln, err := net.Listen("tcp", haddr) | |
if err != nil { | |
level.Error(logger).Log("msg", "gRPC Health server: failed to listen", "error", err) | |
os.Exit(2) | |
} | |
level.Info(logger).Log("msg", fmt.Sprintf("gRPC health server serving at %s", haddr)) | |
return grpcHealthServer.Serve(hln) | |
}) | |
// gRPC server | |
g.Go(func() error { | |
addr := fmt.Sprintf(":%d", *grpcPort) | |
ln, err := net.Listen("tcp", addr) | |
if err != nil { | |
level.Error(logger).Log("msg", "gRPC server: failed to listen", "error", err) | |
os.Exit(2) | |
} | |
server := &myservice.Server{ | |
Logger: logger, | |
Health: healthServer, | |
ServiceName: serviceName, | |
} | |
grpcServer = grpc.NewServer( | |
// MaxConnectionAge is just to avoid long connection, to facilitate load balancing | |
// MaxConnectionAgeGrace will torn them, default to infinity | |
grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: 2 * time.Minute}), | |
grpc.StatsHandler(&ocgrpc_propag.ServerHandler{}), | |
) | |
myservice.RegisterNyServiceServer(grpcServer, server) | |
level.Info(logger).Log("msg", fmt.Sprintf("gRPC server serving at %s", addr)) | |
healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", serviceName), healthpb.HealthCheckResponse_SERVING) | |
return grpcServer.Serve(ln) | |
}) | |
select { | |
case <-interrupt: | |
break | |
case <-ctx.Done(): | |
break | |
} | |
level.Warn(logger).Log("msg", "received shutdown signal") | |
cancel() | |
healthServer.SetServingStatus(fmt.Sprintf("grpc.health.v1.%s", serviceName), healthpb.HealthCheckResponse_NOT_SERVING) | |
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) | |
defer shutdownCancel() | |
if httpServer != nil { | |
_ = httpServer.Shutdown(shutdownCtx) | |
} | |
if grpcServer != nil { | |
grpcServer.GracefulStop() | |
} | |
if grpcHealthServer != nil { | |
grpcHealthServer.GracefulStop() | |
} | |
err = g.Wait() | |
if err != nil { | |
level.Error(logger).Log("msg", "server returning an error", "error", err) | |
os.Exit(2) | |
} | |
} |
Simply put the command in your Dockerfile eg:
FROM gcr.io/distroless/static
WORKDIR /root/
COPY geoipd .
COPY grpc_health_probe .
ENTRYPOINT ["./geoipd"]
readinessProbe
will execute the given command, being another image or the same image.
Why cancel() is called explicitly although there is a deferred call above?
We want cancel() to happen early, right after the signal catch.
The defer cancel() is superfluous. Good catch.
I think it's a redundant operation, as the inner context of errgroup is either already cancelled (case <-ctx.Done() due to error ) or will be cancelled immediately after first shutdown/stop. When context is cancelled outside errgroup-flow the group of "threads" will continue running despite cancellation, because the code inside g.Go() "threads" is not depend on cancelled context.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
please explain this
command: ["/root/grpc_health_probe", "-addr=:6666"]
How does this find its way inside a k8s pod? Is it from a Docker build process? Is it the name of the same app or different binary?