Created
August 31, 2024 16:09
-
-
Save slok/c713f3ee9ace754b57eecfea4d04170b to your computer and use it in GitHub Desktop.
Kubernetes port forward example in go (without kubectl) using official K8s Go libs.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log/slog" | |
"net/http" | |
"net/url" | |
"os" | |
"os/signal" | |
"strings" | |
"syscall" | |
"k8s.io/apimachinery/pkg/util/httpstream" | |
_ "k8s.io/client-go/plugin/pkg/client/auth" // Init all available Kube client auth systems. | |
"k8s.io/client-go/rest" | |
"k8s.io/client-go/tools/clientcmd" | |
"k8s.io/client-go/tools/portforward" | |
"k8s.io/client-go/transport/spdy" | |
) | |
func main() { | |
ctx := context.Background() | |
err := run(ctx) | |
if err != nil { | |
fmt.Fprintf(os.Stdout, "Error: %s", err) | |
os.Exit(1) | |
return | |
} | |
} | |
func run(ctx context.Context) error { | |
ctx, stop := signal.NotifyContext(ctx, syscall.SIGTERM, syscall.SIGINT) | |
defer stop() | |
const ( | |
kubeConfig = "/home/slok/.kube/config" | |
kubeCtx = "fonoa-jazz" | |
pod = "doge-7dd9bb5474-7jplq" | |
podNS = "doge-jazz" | |
podPort = 8080 | |
localPort = 4444 | |
) | |
// Load kubectl with context. | |
config, err := LoadKubeCtxConfig(kubeConfig, kubeCtx) | |
if err != nil { | |
return fmt.Errorf("could not load kubernetes configuration: %w", err) | |
} | |
slog.Info("Opening port forward...") | |
pf := NewPortForwarder(config) | |
err = pf.PortForward(ctx, podNS, pod, localPort, podPort) | |
if err != nil { | |
return fmt.Errorf("could not create port-forward: %w", err) | |
} | |
return nil | |
} | |
// LoadKubeCtxConfig loads kubernetes configuration based on flags and ensures we use the provided context. | |
func LoadKubeCtxConfig(kubeconfigPath, kubeCtx string) (*rest.Config, error) { | |
cfg, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( | |
&clientcmd.ClientConfigLoadingRules{ | |
ExplicitPath: kubeconfigPath, | |
}, | |
&clientcmd.ConfigOverrides{ | |
CurrentContext: kubeCtx, | |
}).ClientConfig() | |
if err != nil { | |
return nil, fmt.Errorf("could not load configuration: %w", err) | |
} | |
// Set better cli rate limiter. | |
cfg.QPS = 100 | |
cfg.Burst = 100 | |
return cfg, nil | |
} | |
// PortForwarder will open a local port forward with a given pod in a kubernetes cluster. | |
type PortForwarder struct { | |
config *rest.Config | |
outLogger stdoutLogger | |
errLogger stderrLogger | |
} | |
func NewPortForwarder(config *rest.Config) PortForwarder { | |
return PortForwarder{ | |
config: config, | |
outLogger: stdoutLogger{}, | |
errLogger: stderrLogger{}, | |
} | |
} | |
func (p PortForwarder) PortForward(ctx context.Context, ns string, pod string, portLocal int, portPod int) error { | |
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", ns, pod) | |
host := strings.TrimPrefix(p.config.Host, "https://") | |
spdyDialer, err := p.spdyDialer(host, path) | |
if err != nil { | |
return fmt.Errorf("could not create SPDY based dialer: %w", err) | |
} | |
websocketDialer, err := p.websocketDialer(host, path) | |
if err != nil { | |
return fmt.Errorf("could not create websocket based dialer: %w", err) | |
} | |
// Try websockets (>=v1.30), if not fallback ot SPDY. | |
dialer := portforward.NewFallbackDialer(websocketDialer, spdyDialer, func(err error) bool { | |
return httpstream.IsUpgradeFailure(err) || httpstream.IsHTTPSProxyError(err) | |
}) | |
// Control graceful shutdown by using the context. On context end, stop the port forward. | |
stopCh := make(chan struct{}) | |
go func() { | |
<-ctx.Done() | |
stopCh <- struct{}{} | |
slog.Info("Port forward closed") | |
}() | |
fw, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", portLocal, portPod)}, stopCh, nil, p.outLogger, p.errLogger) | |
if err != nil { | |
return err | |
} | |
// Execute the port forward, this is a blocking operation. | |
return fw.ForwardPorts() | |
} | |
func (p PortForwarder) spdyDialer(host, path string) (httpstream.Dialer, error) { | |
// Create the SPDY round tripper will handle the request/response. | |
transport, upgrader, err := spdy.RoundTripperFor(p.config) | |
if err != nil { | |
return nil, fmt.Errorf("could not create SPDY roundtripper: %w", err) | |
} | |
// Create the SPDY dialer that will connect the client with the server. | |
httpCli := &http.Client{Transport: transport} | |
dialer := spdy.NewDialer(upgrader, httpCli, http.MethodPost, &url.URL{Scheme: "https", Path: path, Host: host}) | |
return dialer, nil | |
} | |
func (p PortForwarder) websocketDialer(host, path string) (httpstream.Dialer, error) { | |
dialer, err := portforward.NewSPDYOverWebsocketDialer(&url.URL{Scheme: "https", Path: path, Host: host}, p.config) | |
if err != nil { | |
return nil, fmt.Errorf("could not create websocket dialer: %w", err) | |
} | |
return dialer, nil | |
} | |
type stdoutLogger struct{} | |
func (s stdoutLogger) Write(p []byte) (n int, err error) { | |
slog.Info(string(p)) | |
return len(p), nil | |
} | |
type stderrLogger struct{} | |
func (s stderrLogger) Write(p []byte) (n int, err error) { | |
slog.Error(string(p)) | |
return len(p), nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment