Created
October 3, 2020 00:18
-
-
Save rsms/b70b4c7fe3b25e17b4b1f6af8b007c14 to your computer and use it in GitHub Desktop.
Example go http server with systemd socket activation and zero-downtime restart
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Unit] | |
Description = Foo HTTP server | |
Requires = foo.socket | |
After = multi-user.target | |
[Service] | |
User = www-data | |
Group = www-data | |
WorkingDirectory = /var/foo | |
ExecStart = /var/foo/bin/foo-server | |
ExecReload = /bin/kill -HUP $MAINPID | |
Restart = always | |
NotifyAccess = main | |
KillMode = process | |
NonBlocking = true |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Unit] | |
Description=Foo HTTP server socket | |
[Socket] | |
ListenStream = 80 | |
BindIPv6Only = both |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"net/http" | |
"os" | |
"time" | |
"github.com/coreos/go-systemd/activation" | |
"rsms/systemd" | |
) | |
func main() { | |
httpRoutes := &http.ServeMux{} | |
httpServer := &http.Server{ Handler: httpRoutes, ...} | |
// http endpoint that slowly sends chunks back over the course of 10 seconds | |
httpRoutes.HandleFunc("/slow", func(w http.ResponseWriter, req *http.Request) { | |
flusher, _ := w.(http.Flusher) | |
w.Header().Set("Transfer-Encoding", "chunked") | |
for i := 0; ; { | |
fmt.Fprintf(w, "Chunk %d from server process %d\n", i, os.Getpid()) | |
flusher.Flush() // Trigger "chunked" encoding and send a chunk... | |
i++ | |
if i == 10 { | |
break | |
} | |
time.Sleep(1000 * time.Millisecond) | |
} | |
w.Header().Set("Content-Length", "0") | |
}) | |
done := systemd.EnableGracefulShutdown(func(asyncContinue func()) { | |
timeout := time.Second | |
if asyncContinue != nil { | |
timeout = 30 * time.Second | |
httpServer.Server.RegisterOnShutdown(asyncContinue) | |
} | |
ctx, cancel := context.WithTimeout(context.Background(), timeout) | |
defer cancel() | |
httpServer.Server.SetKeepAlivesEnabled(false) | |
if err := httpServer.Server.Shutdown(ctx); err != nil { | |
fmt.Printf("graceful shutdown error: %s\n", err) | |
} else { | |
fmt.Printf("graceful shutdown complete\n") | |
} | |
}) | |
if err := listenAndServe(); err != nil { | |
panic(err) | |
} | |
// wait for shutdown | |
<-done | |
} | |
func listenAndServe(s *http.Server) error { | |
// get listeners from systemd | |
listeners, err := activation.Listeners() | |
if err != nil { | |
// Note: current implementation of go-systemd/activation never returns an error so it's | |
// unclear under what conditions it might do so in the future. | |
return err | |
} | |
if len(listeners) == 0 { | |
// no systemd socket for this process | |
err = s.ListenAndServe() | |
} else if len(listeners) != 1 { | |
// We can only handle a single socket; fail if we get more than 1. | |
// If multiple sockets are provided by systemd for the process, it's better to call Serve(l) | |
// directly instead of using ListenSystemd() | |
panic("More than one socket fds from systemd") | |
} else { | |
// start accepting connections from the systemd-provided socket | |
println("using socket from systemd socket activation") | |
err = s.Serve(listeners[0]) | |
} | |
if err == http.ErrServerClosed { | |
// returned from Serve functions when server.Shutdown() was initiated | |
err = nil | |
} | |
return err | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package systemd | |
import ( | |
"context" | |
"fmt" | |
"io" | |
"net" | |
"os" | |
"os/exec" | |
"os/signal" | |
"strconv" | |
"strings" | |
"syscall" | |
"time" | |
"github.com/coreos/go-systemd/daemon" | |
) | |
type ShutdownMode int | |
const ( | |
ShutdownSync ShutdownMode = iota | |
ShutdownAsync | |
) | |
// GetInvocationId returns the systemd ID for the process invocation. | |
// Returns an empty string if not run by systemd. | |
// This can be used to test if the process is controlled by systemd or not. | |
func GetInvocationId() string { | |
invocationId := os.Getenv("INVOCATION_ID") | |
if len(invocationId) > 0 && len(os.Getenv("JOURNAL_STREAM")) == 0 { | |
// systemd >=v231 sets JOURNAL_STREAM as well. If not set, not systemd. | |
invocationId = "" | |
} | |
return invocationId | |
} | |
// EnableGracefulShutdown installs signal handlers and invokes shutdownHandler when the | |
// process should shut down. | |
// | |
// The returned channel closes when shutdownHandler has been called and returned. | |
// | |
// When SIGINT or SIGTERM is received, shutdownHandler(nil) is called. | |
// This corresponds to "service NAME stop" and "service NAME restart" systemd actions. | |
// The shutdownHandler should shut down as quickly as possible. Shutdown is synchronous. | |
// | |
// When SIGHUP is received, shutdownHandler(ready func()) is called. | |
// This corresponds to "service NAME reload" systemd action. | |
// The shutdownHandler should do the following when the "ready" function is non-nil: | |
// | |
// 1. Stop accepting network requests, stop listening and reliquish any exclusive-access | |
// resources that only one process should access at a time, like file-based databases. | |
// | |
// 2. Call read() | |
// | |
// 3. Proceed with shut down as smoothly as possible, taking whatever time it needs. | |
// This may include waiting for ongoing processes or network requests to complete. | |
// | |
// When ready() is called a new process will be spawned to take the place of the current one. | |
// | |
// Common use of this function looks like this: | |
// | |
// func main() { | |
// done := EnableGracefulShutdown(func(ready func()) { | |
// stopAcceptingConnections() | |
// closeDatabase() | |
// ready() | |
// waitForCurrentWorkToFinish() | |
// }) | |
// openDatabase() | |
// startAcceptingConnections() | |
// <-done | |
// } | |
// | |
func EnableGracefulShutdown(shutdownHandler func(ready func())) chan struct{} { | |
done := make(chan struct{}) | |
quit := make(chan os.Signal, 1) | |
signal.Notify(quit, | |
// for reload: | |
syscall.SIGHUP, | |
// for stop or full restart: | |
syscall.SIGINT, syscall.SIGTERM, | |
) | |
go func() { | |
sig := <-quit | |
fmt.Printf("systemdInstallSignalHandlers got signal %v\n", sig) | |
switch sig { | |
case syscall.SIGINT, syscall.SIGTERM: | |
// Shutdown asap | |
shutdownHandler(nil) | |
case syscall.SIGHUP: | |
// Execute a short-lived process and asks systemd to track it instead of us. | |
fmt.Println("[reload] begin") | |
// continueCh is a channel used to signal that... | |
// 1. shutdown has been initialized (send value) | |
// 2. shutdown has completed (close) | |
continueCh := make(chan struct{}) | |
// Invoke user shutdown handler in a goroutine so we can wrap the channel. | |
// Use a short timeout to catch user error -- the continuation callback should be | |
// invoked usually immediately or within a few milliseconds of shutdownHandler. | |
// This way we can also avoid deadlock if shutdownHandler never calls the | |
// continuation callback (otherwise we would never exit.) | |
initCallbackTimeout := 5 * time.Second | |
ctx, cancelCtx := context.WithTimeout(context.Background(), initCallbackTimeout) | |
go func() { | |
shutdownHandler(func() { | |
fmt.Println("[reload] user called asyncContinue") | |
select { | |
case <-ctx.Done(): | |
// timeout (do not send on continueCh) | |
fmt.Println("[reload] user called asyncContinue (timeout)") | |
default: | |
// finished in time | |
fmt.Println("[reload] user called asyncContinue (ok)") | |
continueCh <- struct{}{} | |
} | |
}) | |
close(continueCh) | |
}() | |
// wait for shutdown to begin (i.e. to reliquish exclusive-access resources) | |
fmt.Println("[reload] waiting for asyncContinue") | |
select { | |
case <-continueCh: | |
cancelCtx() | |
case <-ctx.Done(): | |
// timeout | |
fmt.Fprintf( | |
os.Stderr, | |
"timeout in systemd.EnableGracefulShutdown handler: "+ | |
"Continuation function was not called within %s.\n", | |
initCallbackTimeout, | |
) | |
} | |
// spawn decoy process | |
fmt.Println("[reload] spawning decoy process") | |
pid, err := detachedSleep() | |
if err != nil { | |
fmt.Printf("error in detachedSleep %s", err.Error()) | |
} else { | |
// tell systemd to track the decoy process instead of the current process | |
daemon.SdNotify(false, fmt.Sprintf("MAINPID=%d", pid)) | |
// Wait for confirmation from systemd | |
// Since we relinquished control to the decoy process with MAINPID= this will | |
// cause a warning e.g. "Got notification message from PID 123, but reception | |
// only permitted for main PID 456" in the systemd journal log. However it is | |
// a good trick that works: when this function returns it means that systemd | |
// has received the MAINPID= message, starting a new process, and we can safely | |
// proceed with shutdown of this current process. | |
// | |
// An alternative to this is to sleep for some long period of time, like 1s, | |
// to reduce the probability of systemd missing our MAINPID message. | |
// | |
// Note that as soon as systemd has interpreted the MAINPID notification it | |
// proceeds with starting a new process, so whatever we do beyond this point | |
// only affects shutdown of this current process. | |
// | |
SdNotifyBarrier(false, time.Second) | |
// give systemd a tiny bit extra time in case the implementation interpreted | |
// the BARRIER notification internally on a separate message queue than MAINPID. | |
time.Sleep(10 * time.Millisecond) | |
} | |
// wait for shutdown to complete (this may take whatever time) | |
fmt.Println("awaiting shutdown finalization") | |
<-continueCh | |
} // switch sig | |
close(done) | |
}() | |
return done | |
} | |
func init() { | |
// As early as possible, check if we should be the decoy. | |
state := os.Getenv("__SD_SHUTDOWN") | |
os.Unsetenv("__SD_SHUTDOWN") | |
switch state { | |
case "1": | |
// First step, fork again. | |
execPath, err := selfExeFile() | |
if err != nil { | |
panic(fmt.Errorf("selfExeFile error %s", err)) | |
} | |
child, err := os.StartProcess( | |
execPath, | |
[]string{execPath}, | |
&os.ProcAttr{ | |
Env: append(os.Environ(), "__SD_SHUTDOWN=2"), | |
}) | |
if err != nil { | |
panic(fmt.Errorf("cannot execute sleep command: %s", err)) | |
} | |
// Advertise child's PID and exit. Child will be | |
// orphaned and adopted by PID 1. | |
fmt.Printf("%d", child.Pid) | |
os.Exit(0) | |
case "2": | |
// wait for systemd | |
SdNotifyBarrier(false, time.Second) | |
// time.Sleep(time.Millisecond * 10000) | |
os.Exit(0) | |
} | |
// Not the shutdown helper. Business as usual. | |
} | |
// selfExeFile returns the absolute path to ourselves. This relies on | |
// /proc/self/exe which may be a symlink to a deleted path (for | |
// example, during an upgrade). | |
func selfExeFile() (string, error) { | |
execPath, err := os.Readlink("/proc/self/exe") | |
execPath = strings.TrimSuffix(execPath, " (deleted)") | |
return execPath, err | |
} | |
// detachedSleep spawns a detached process sleeping one second and | |
// returns its PID. A full daemonization is not needed as the process | |
// is short-lived. | |
func detachedSleep() (uint64, error) { | |
selfexe, err := selfExeFile() | |
if err != nil { | |
return 0, err | |
} | |
fmt.Printf("detachedSleep starting command %q\n", selfexe) | |
cmd := exec.Command(selfexe) | |
cmd.Env = append(os.Environ(), "__SD_SHUTDOWN=1") | |
out, err := cmd.Output() | |
if err != nil { | |
return 0, err | |
} | |
pid, err := strconv.ParseUint(strings.TrimSpace(string(out)), 10, 64) | |
if err != nil { | |
return 0, fmt.Errorf("cannot parse PID of sleep command: %s", err) | |
} | |
return pid, nil | |
} | |
func SdNotifyBarrier(unsetEnvironment bool, timeout time.Duration) error { | |
// modelled after libsystemd's sd_notify_barrier | |
// construct unix socket address from systemd environment variable | |
socketAddr := &net.UnixAddr{ | |
Name: os.Getenv("NOTIFY_SOCKET"), | |
Net: "unixgram", | |
} | |
if socketAddr.Name == "" { | |
return fmt.Errorf("NOTIFY_SOCKET missing") | |
} | |
// create a pipe for communicating with systemd daemon | |
pipe_r, pipe_w, err := os.Pipe() // (r *File, w *File, error) | |
if err != nil { | |
return err | |
} | |
if unsetEnvironment { | |
if err := os.Unsetenv("NOTIFY_SOCKET"); err != nil { | |
return err | |
} | |
} | |
// connect to unix socket at socketAddr | |
conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr) | |
if err != nil { | |
return err | |
} | |
defer conn.Close() | |
// get the FD for the unix socket file | |
connf, err := conn.File() | |
if err != nil { | |
return err | |
} | |
// send over write end of the pipe to the systemd daemon | |
fdRights := syscall.UnixRights(int(pipe_w.Fd())) | |
err = syscall.Sendmsg(int(connf.Fd()), []byte("BARRIER=1"), fdRights, nil, 0) | |
if err != nil { | |
return err | |
} | |
pipe_w.Close() | |
// wait for systemd to close the pipe | |
var b [1]byte | |
pipe_r.SetReadDeadline(time.Now().Add(timeout)) | |
_, err = pipe_r.Read(b[:]) | |
if err == io.EOF { | |
err = nil | |
} | |
return err | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment