Created
September 4, 2025 18:20
-
-
Save paskozdilar/f854f25bdef38ffedf55633e6b2dfeda to your computer and use it in GitHub Desktop.
ClientStreaming websocket proxy for gRPC Gateway
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package ws | |
| import ( | |
| "bufio" | |
| "context" | |
| "io" | |
| "net/http" | |
| "time" | |
| "github.com/gorilla/websocket" | |
| ) | |
| func NewWrapper(h http.Handler) http.Handler { | |
| return &wrapper{ | |
| h: h, | |
| u: websocket.Upgrader{}, | |
| } | |
| } | |
| type wrapper struct { | |
| h http.Handler | |
| u websocket.Upgrader | |
| } | |
| func (w *wrapper) ServeHTTP(wr http.ResponseWriter, r *http.Request) { | |
| conn, err := w.u.Upgrade(wr, r, nil) | |
| if err != nil { | |
| http.Error(wr, "Error upgrading to websocket", http.StatusInternalServerError) | |
| return | |
| } | |
| w.ServeWS(conn, r) | |
| } | |
| // Forward WebSocket connection to the embedded HTTP handler. | |
| func (w *wrapper) ServeWS(ws *websocket.Conn, r *http.Request) { | |
| // WebSocket connection / HTTP request pipes for passing data around | |
| wsReader, wsWriter := io.Pipe() | |
| httpReader, httpWriter := io.Pipe() | |
| // Create mock *http.Request and http.ResponseWriter for executing | |
| // ServeHTTP | |
| ctx, cancel := context.WithCancel(r.Context()) | |
| defer cancel() | |
| url := r.URL.String() | |
| r, err := http.NewRequestWithContext(ctx, http.MethodPost, url, eofPipeReader{httpReader}) | |
| if err != nil { | |
| return | |
| } | |
| fwdr := newResponseForwarder(wsWriter, r.Header) | |
| // Forward data from pipes into websocket | |
| go func() { | |
| // NOTE: check socketRead comment | |
| socketRead(httpWriter, ws) | |
| r.Body.Close() | |
| wsWriter.Close() | |
| cancel() | |
| }() | |
| go func() { | |
| socketWrite(ws, wsReader) | |
| ws.WriteControl(websocket.CloseMessage, | |
| websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), | |
| time.Now().Add(5*time.Second)) | |
| ws.Close() | |
| }() | |
| // Execute HTTP request | |
| w.h.ServeHTTP(fwdr, r) | |
| wsReader.Close() | |
| } | |
| // listen to websocket messages and write them to writer | |
| // | |
| // NOTE: | |
| // Since grpc-gateway introduced io.Body draining, ServerStreaming methods | |
| // require Body to be read completely before the method is started. | |
| // This is why we read only one message from the websocket and then close the | |
| // writer. | |
| // | |
| // This means that we cannot use this function for ClientStreaming or | |
| // Bidirectional methods. | |
| // | |
| // TODO: | |
| // Implement a generic solution. | |
| // This may require work on grpc-gateway itself. | |
| func socketRead(w io.WriteCloser, ws *websocket.Conn) error { | |
| buf := bufio.NewWriter(w) | |
| // for { | |
| // _, message, err := ws.ReadMessage() | |
| // if err != nil { | |
| // return | |
| // } | |
| // buf.Write(message) | |
| // buf.WriteRune('\n') | |
| // if err := buf.Flush(); err != nil { | |
| // return | |
| // } | |
| // } | |
| _, message, err := ws.ReadMessage() | |
| if err != nil { | |
| return err | |
| } | |
| buf.Write(message) | |
| buf.WriteRune('\n') | |
| if err := buf.Flush(); err != nil { | |
| return err | |
| } | |
| w.Close() | |
| for { | |
| _, _, err := ws.ReadMessage() | |
| if err != nil { | |
| return err | |
| } | |
| } | |
| } | |
| // listen to reader message and write them to websocket | |
| func socketWrite(ws *websocket.Conn, r io.Reader) error { | |
| buf := bufio.NewReader(r) | |
| for { | |
| s, err := buf.ReadString('\n') | |
| if err != nil { | |
| return err | |
| } | |
| if err := ws.WriteMessage(websocket.TextMessage, []byte(s)); err != nil { | |
| return err | |
| } | |
| } | |
| } | |
| // Implementation of http.ResponseWriter interface that redirects all response | |
| // data to embedded *io.PipeWriter. | |
| // | |
| // Useful for passing custom writers to http.Handler.ServeHTTP() method. | |
| type responseForwarder struct { | |
| *io.PipeWriter | |
| h http.Header | |
| } | |
| func newResponseForwarder(w *io.PipeWriter, h http.Header) http.ResponseWriter { | |
| return &responseForwarder{w, h} | |
| } | |
| func (rf *responseForwarder) Header() http.Header { | |
| return rf.h | |
| } | |
| func (rf *responseForwarder) WriteHeader(int) { | |
| } | |
| func (rf *responseForwarder) Flush() { | |
| } | |
| type eofPipeReader struct { | |
| *io.PipeReader | |
| } | |
| func (r eofPipeReader) Read(p []byte) (int, error) { | |
| n, err := r.PipeReader.Read(p) | |
| if err == io.ErrClosedPipe { | |
| err = io.EOF | |
| } | |
| return n, err | |
| } | |
| // Error wrapper for sending errors to all calls | |
| func NewErrorWrapper(errmsg string) http.Handler { | |
| return &errorWrapper{ | |
| errmsg: []byte(errmsg), | |
| u: websocket.Upgrader{}, | |
| } | |
| } | |
| type errorWrapper struct { | |
| errmsg []byte | |
| u websocket.Upgrader | |
| } | |
| func (ew *errorWrapper) ServeHTTP(wr http.ResponseWriter, r *http.Request) { | |
| conn, err := ew.u.Upgrade(wr, r, nil) | |
| if err != nil { | |
| http.Error(wr, "Error upgrading to websocket", http.StatusInternalServerError) | |
| return | |
| } | |
| // Write error message | |
| conn.WriteMessage(websocket.CloseAbnormalClosure, ew.errmsg) | |
| conn.Close() | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment