Skip to content

Instantly share code, notes, and snippets.

@paskozdilar
Created September 4, 2025 18:20
Show Gist options
  • Save paskozdilar/f854f25bdef38ffedf55633e6b2dfeda to your computer and use it in GitHub Desktop.
Save paskozdilar/f854f25bdef38ffedf55633e6b2dfeda to your computer and use it in GitHub Desktop.
ClientStreaming websocket proxy for gRPC Gateway
package ws
import (
"bufio"
"context"
"io"
"net/http"
"time"
"github.com/gorilla/websocket"
)
func NewWrapper(h http.Handler) http.Handler {
return &wrapper{
h: h,
u: websocket.Upgrader{},
}
}
type wrapper struct {
h http.Handler
u websocket.Upgrader
}
func (w *wrapper) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
conn, err := w.u.Upgrade(wr, r, nil)
if err != nil {
http.Error(wr, "Error upgrading to websocket", http.StatusInternalServerError)
return
}
w.ServeWS(conn, r)
}
// Forward WebSocket connection to the embedded HTTP handler.
func (w *wrapper) ServeWS(ws *websocket.Conn, r *http.Request) {
// WebSocket connection / HTTP request pipes for passing data around
wsReader, wsWriter := io.Pipe()
httpReader, httpWriter := io.Pipe()
// Create mock *http.Request and http.ResponseWriter for executing
// ServeHTTP
ctx, cancel := context.WithCancel(r.Context())
defer cancel()
url := r.URL.String()
r, err := http.NewRequestWithContext(ctx, http.MethodPost, url, eofPipeReader{httpReader})
if err != nil {
return
}
fwdr := newResponseForwarder(wsWriter, r.Header)
// Forward data from pipes into websocket
go func() {
// NOTE: check socketRead comment
socketRead(httpWriter, ws)
r.Body.Close()
wsWriter.Close()
cancel()
}()
go func() {
socketWrite(ws, wsReader)
ws.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
time.Now().Add(5*time.Second))
ws.Close()
}()
// Execute HTTP request
w.h.ServeHTTP(fwdr, r)
wsReader.Close()
}
// listen to websocket messages and write them to writer
//
// NOTE:
// Since grpc-gateway introduced io.Body draining, ServerStreaming methods
// require Body to be read completely before the method is started.
// This is why we read only one message from the websocket and then close the
// writer.
//
// This means that we cannot use this function for ClientStreaming or
// Bidirectional methods.
//
// TODO:
// Implement a generic solution.
// This may require work on grpc-gateway itself.
func socketRead(w io.WriteCloser, ws *websocket.Conn) error {
buf := bufio.NewWriter(w)
// for {
// _, message, err := ws.ReadMessage()
// if err != nil {
// return
// }
// buf.Write(message)
// buf.WriteRune('\n')
// if err := buf.Flush(); err != nil {
// return
// }
// }
_, message, err := ws.ReadMessage()
if err != nil {
return err
}
buf.Write(message)
buf.WriteRune('\n')
if err := buf.Flush(); err != nil {
return err
}
w.Close()
for {
_, _, err := ws.ReadMessage()
if err != nil {
return err
}
}
}
// listen to reader message and write them to websocket
func socketWrite(ws *websocket.Conn, r io.Reader) error {
buf := bufio.NewReader(r)
for {
s, err := buf.ReadString('\n')
if err != nil {
return err
}
if err := ws.WriteMessage(websocket.TextMessage, []byte(s)); err != nil {
return err
}
}
}
// Implementation of http.ResponseWriter interface that redirects all response
// data to embedded *io.PipeWriter.
//
// Useful for passing custom writers to http.Handler.ServeHTTP() method.
type responseForwarder struct {
*io.PipeWriter
h http.Header
}
func newResponseForwarder(w *io.PipeWriter, h http.Header) http.ResponseWriter {
return &responseForwarder{w, h}
}
func (rf *responseForwarder) Header() http.Header {
return rf.h
}
func (rf *responseForwarder) WriteHeader(int) {
}
func (rf *responseForwarder) Flush() {
}
type eofPipeReader struct {
*io.PipeReader
}
func (r eofPipeReader) Read(p []byte) (int, error) {
n, err := r.PipeReader.Read(p)
if err == io.ErrClosedPipe {
err = io.EOF
}
return n, err
}
// Error wrapper for sending errors to all calls
func NewErrorWrapper(errmsg string) http.Handler {
return &errorWrapper{
errmsg: []byte(errmsg),
u: websocket.Upgrader{},
}
}
type errorWrapper struct {
errmsg []byte
u websocket.Upgrader
}
func (ew *errorWrapper) ServeHTTP(wr http.ResponseWriter, r *http.Request) {
conn, err := ew.u.Upgrade(wr, r, nil)
if err != nil {
http.Error(wr, "Error upgrading to websocket", http.StatusInternalServerError)
return
}
// Write error message
conn.WriteMessage(websocket.CloseAbnormalClosure, ew.errmsg)
conn.Close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment