Skip to content

Instantly share code, notes, and snippets.

@slok
Created August 31, 2024 16:09
Show Gist options
  • Save slok/c713f3ee9ace754b57eecfea4d04170b to your computer and use it in GitHub Desktop.
Save slok/c713f3ee9ace754b57eecfea4d04170b to your computer and use it in GitHub Desktop.
Kubernetes port forward example in go (without kubectl) using official K8s Go libs.
package main
import (
"context"
"fmt"
"log/slog"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"k8s.io/apimachinery/pkg/util/httpstream"
_ "k8s.io/client-go/plugin/pkg/client/auth" // Init all available Kube client auth systems.
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/portforward"
"k8s.io/client-go/transport/spdy"
)
func main() {
ctx := context.Background()
err := run(ctx)
if err != nil {
fmt.Fprintf(os.Stdout, "Error: %s", err)
os.Exit(1)
return
}
}
func run(ctx context.Context) error {
ctx, stop := signal.NotifyContext(ctx, syscall.SIGTERM, syscall.SIGINT)
defer stop()
const (
kubeConfig = "/home/slok/.kube/config"
kubeCtx = "fonoa-jazz"
pod = "doge-7dd9bb5474-7jplq"
podNS = "doge-jazz"
podPort = 8080
localPort = 4444
)
// Load kubectl with context.
config, err := LoadKubeCtxConfig(kubeConfig, kubeCtx)
if err != nil {
return fmt.Errorf("could not load kubernetes configuration: %w", err)
}
slog.Info("Opening port forward...")
pf := NewPortForwarder(config)
err = pf.PortForward(ctx, podNS, pod, localPort, podPort)
if err != nil {
return fmt.Errorf("could not create port-forward: %w", err)
}
return nil
}
// LoadKubeCtxConfig loads kubernetes configuration based on flags and ensures we use the provided context.
func LoadKubeCtxConfig(kubeconfigPath, kubeCtx string) (*rest.Config, error) {
cfg, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{
ExplicitPath: kubeconfigPath,
},
&clientcmd.ConfigOverrides{
CurrentContext: kubeCtx,
}).ClientConfig()
if err != nil {
return nil, fmt.Errorf("could not load configuration: %w", err)
}
// Set better cli rate limiter.
cfg.QPS = 100
cfg.Burst = 100
return cfg, nil
}
// PortForwarder will open a local port forward with a given pod in a kubernetes cluster.
type PortForwarder struct {
config *rest.Config
outLogger stdoutLogger
errLogger stderrLogger
}
func NewPortForwarder(config *rest.Config) PortForwarder {
return PortForwarder{
config: config,
outLogger: stdoutLogger{},
errLogger: stderrLogger{},
}
}
func (p PortForwarder) PortForward(ctx context.Context, ns string, pod string, portLocal int, portPod int) error {
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", ns, pod)
host := strings.TrimPrefix(p.config.Host, "https://")
spdyDialer, err := p.spdyDialer(host, path)
if err != nil {
return fmt.Errorf("could not create SPDY based dialer: %w", err)
}
websocketDialer, err := p.websocketDialer(host, path)
if err != nil {
return fmt.Errorf("could not create websocket based dialer: %w", err)
}
// Try websockets (>=v1.30), if not fallback ot SPDY.
dialer := portforward.NewFallbackDialer(websocketDialer, spdyDialer, func(err error) bool {
return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err)
})
// Control graceful shutdown by using the context. On context end, stop the port forward.
stopCh := make(chan struct{})
go func() {
<-ctx.Done()
stopCh <- struct{}{}
slog.Info("Port forward closed")
}()
fw, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", portLocal, portPod)}, stopCh, nil, p.outLogger, p.errLogger)
if err != nil {
return err
}
// Execute the port forward, this is a blocking operation.
return fw.ForwardPorts()
}
func (p PortForwarder) spdyDialer(host, path string) (httpstream.Dialer, error) {
// Create the SPDY round tripper will handle the request/response.
transport, upgrader, err := spdy.RoundTripperFor(p.config)
if err != nil {
return nil, fmt.Errorf("could not create SPDY roundtripper: %w", err)
}
// Create the SPDY dialer that will connect the client with the server.
httpCli := &http.Client{Transport: transport}
dialer := spdy.NewDialer(upgrader, httpCli, http.MethodPost, &url.URL{Scheme: "https", Path: path, Host: host})
return dialer, nil
}
func (p PortForwarder) websocketDialer(host, path string) (httpstream.Dialer, error) {
dialer, err := portforward.NewSPDYOverWebsocketDialer(&url.URL{Scheme: "https", Path: path, Host: host}, p.config)
if err != nil {
return nil, fmt.Errorf("could not create websocket dialer: %w", err)
}
return dialer, nil
}
type stdoutLogger struct{}
func (s stdoutLogger) Write(p []byte) (n int, err error) {
slog.Info(string(p))
return len(p), nil
}
type stderrLogger struct{}
func (s stderrLogger) Write(p []byte) (n int, err error) {
slog.Error(string(p))
return len(p), nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment