Skip to content

Instantly share code, notes, and snippets.

@OhYee
Last active September 4, 2025 09:15
Show Gist options
  • Select an option

  • Save OhYee/95ffeb9aa469f593eb8d5062dd9caec1 to your computer and use it in GitHub Desktop.

Select an option

Save OhYee/95ffeb9aa469f593eb8d5062dd9caec1 to your computer and use it in GitHub Desktop.
大模型请求抓取,抄提示词用
package main
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/http/httputil"
"net/url"
"os"
"regexp"
"strconv"
"strings"
"time"
)
// =====================================================================
// 配置和工具函数
// =====================================================================
// 启动时自动加载 .env 文件中的环境变量
func init() {
loadEnvFile()
}
// loadEnvFile 从 .env 文件加载环境变量
func loadEnvFile() {
data, err := os.ReadFile(".env")
if err != nil {
return // 文件不存在则忽略
}
// 逐行解析环境变量
lines := strings.Split(string(data), "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
// 跳过注释行和空行
if strings.HasPrefix(line, "#") || !strings.Contains(line, "=") {
continue
}
// 解析 KEY=VALUE 格式
parts := strings.SplitN(line, "=", 2)
if len(parts) == 2 {
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
// 只设置尚未存在的环境变量
if os.Getenv(key) == "" {
fmt.Printf("🔧 设置环境变量: %s = %s\n", key, value)
os.Setenv(key, value)
}
}
}
}
// formatElapsedTime 将时间间隔格式化为人性化的字符串
// 例如: 100ms, 5s, 1m25s
func formatElapsedTime(elapsed time.Duration) string {
if elapsed < time.Second {
// 小于1秒,显示毫秒
return fmt.Sprintf("after %dms", elapsed.Milliseconds())
} else if elapsed < time.Minute {
// 小于1分钟,显示秒
seconds := elapsed.Seconds()
if seconds == float64(int(seconds)) {
return fmt.Sprintf("after %ds", int(seconds))
}
return fmt.Sprintf("after %.1fs", seconds)
} else {
// 1分钟及以上,显示分钟和秒
minutes := int(elapsed.Minutes())
seconds := int(elapsed.Seconds()) % 60
if seconds == 0 {
return fmt.Sprintf("after %dm", minutes)
}
return fmt.Sprintf("after %dm%ds", minutes, seconds)
}
}
// unescapeUnicodeBytes 解码字节流中的 Unicode 转义序列
// 处理 \uXXXX 和 \\uXXXX 两种格式
func unescapeUnicodeBytes(data []byte) []byte {
re := regexp.MustCompile(`\\?\\u([0-9a-fA-F]{4})`)
return re.ReplaceAllFunc(data, func(match []byte) []byte {
// 处理双反斜杠的情况
if len(match) >= 6 && match[0] == '\\' && match[1] == '\\' {
match = match[1:] // 去掉一个反斜杠
}
// 提取十六进制部分并转换为 Unicode 字符
hex := string(match[len(match)-4:])
if codePoint, err := strconv.ParseInt(hex, 16, 32); err == nil {
return []byte(string(rune(codePoint)))
}
return match
})
}
// extractJSONFromStreamChunk 从流式数据块中提取 JSON 内容
// 专门用于解析 SSE (Server-Sent Events) 格式的数据
func extractJSONFromStreamChunk(chunk []byte) (content string, found bool) {
chunkStr := string(chunk)
// 检查是否是 SSE 数据格式
if !strings.HasPrefix(chunkStr, "data: ") {
return "", false
}
// 解析 JSON 数据
var jsonData map[string]any
if err := json.Unmarshal(chunk[6:], &jsonData); err != nil {
return "", false
}
// 提取嵌套的内容字段: choices[0].delta.content
if choices, ok := jsonData["choices"].([]any); ok && len(choices) > 0 {
if choice, ok := choices[0].(map[string]any); ok {
if delta, ok := choice["delta"].(map[string]any); ok {
if content, ok := delta["content"].(string); ok {
return content, true
}
}
}
}
return "", false
}
// =====================================================================
// HTTP 响应拦截器
// =====================================================================
// StreamLogger 负责记录和显示流式响应数据
// 自动处理 gzip 压缩,统一输出格式
type StreamLogger struct {
originalWriter http.ResponseWriter // 原始的响应写入器
startTime time.Time // 请求开始时间
decompressor *StreamDecompressor // 统一的流解压器
contentBuffer bytes.Buffer // 累积的内容缓冲区
}
// NewStreamLogger 创建新的流式日志记录器
func NewStreamLogger(writer http.ResponseWriter, startTime time.Time) *StreamLogger {
return &StreamLogger{
originalWriter: writer,
startTime: startTime,
decompressor: NewStreamDecompressor(startTime),
}
}
// Header 返回响应头
func (sl *StreamLogger) Header() http.Header {
return sl.originalWriter.Header()
}
// WriteHeader 写入响应状态码和头部
func (sl *StreamLogger) WriteHeader(statusCode int) {
// 检查是否为 gzip 压缩并初始化解压器
if encoding := sl.Header().Get("Content-Encoding"); strings.Contains(strings.ToLower(encoding), "gzip") {
fmt.Printf("\n\033[1;2;35m[检测到 gzip 压缩 - 自动解压处理]\033[0m\n\n")
sl.decompressor.EnableGzipMode()
}
sl.originalWriter.WriteHeader(statusCode)
}
// Write 写入响应数据 - 统一处理压缩和非压缩数据
func (sl *StreamLogger) Write(data []byte) (int, error) {
// 通过解压器处理数据(无论是否压缩都走同一个流程)
sl.decompressor.ProcessData(data, sl.logStreamChunk)
// 将原始数据写入响应
return sl.originalWriter.Write(data)
}
// Close 清理资源
func (sl *StreamLogger) Close() error {
sl.decompressor.Close()
sl.logFinalContent()
return nil
}
// logStreamChunk 记录流数据块(统一的处理函数)
func (sl *StreamLogger) logStreamChunk(chunk []byte) {
elapsed := time.Since(sl.startTime)
fmt.Printf("\033[1;36m[%s]\033[0m ", formatElapsedTime(elapsed))
fmt.Printf("\033[35m%s\033[0m", string(chunk))
// 尝试从流数据中提取内容并缓存
if content, found := extractJSONFromStreamChunk(chunk); found {
sl.contentBuffer.WriteString(content)
}
}
// logFinalContent 记录最终累积的内容
func (sl *StreamLogger) logFinalContent() {
if sl.contentBuffer.Len() > 0 {
elapsed := time.Since(sl.startTime)
fmt.Printf("\n\033[1;36m[%s]\033[0m ", formatElapsedTime(elapsed))
fmt.Printf("\033[1;2;35m[流结束]\033[0m\n\n")
fmt.Printf("\033[2;3;35m[累积内容开始]\033[0m\n")
fmt.Printf("\033[35m%s\033[0m\n", sl.contentBuffer.String())
fmt.Printf("\033[2;3;35m[累积内容结束]\033[0m\n")
}
}
// =====================================================================
// 统一流解压器
// =====================================================================
// StreamChunkHandler 定义处理解压后数据块的回调函数类型
type StreamChunkHandler func(chunk []byte)
// StreamDecompressor 统一处理压缩和非压缩的流式数据
// 自动检测并处理 gzip 压缩,输出统一格式的原始数据
type StreamDecompressor struct {
startTime time.Time // 请求开始时间
isGzipMode bool // 是否为 gzip 压缩模式
pipeReader *io.PipeReader // gzip 解压管道读取端
pipeWriter *io.PipeWriter // gzip 解压管道写入端
gzipReader *gzip.Reader // gzip 解压读取器
chunkHandler StreamChunkHandler // 数据块处理回调函数
}
// NewStreamDecompressor 创建新的流解压器
func NewStreamDecompressor(startTime time.Time) *StreamDecompressor {
return &StreamDecompressor{
startTime: startTime,
isGzipMode: false,
}
}
// EnableGzipMode 启用 gzip 解压模式
func (sd *StreamDecompressor) EnableGzipMode() {
sd.isGzipMode = true
sd.pipeReader, sd.pipeWriter = io.Pipe()
}
// ProcessData 处理输入数据(自动判断是否需要解压)
func (sd *StreamDecompressor) ProcessData(data []byte, handler StreamChunkHandler) {
if sd.isGzipMode {
// 第一次调用时启动解压协程
if sd.chunkHandler == nil {
sd.chunkHandler = handler
go sd.processGzipStream()
}
// gzip 模式:将压缩数据写入管道进行解压
sd.writeToGzipPipe(data)
} else {
// 直接模式:直接处理原始数据
handler(data)
}
}
// writeToGzipPipe 将压缩数据写入 gzip 解压管道
func (sd *StreamDecompressor) writeToGzipPipe(data []byte) {
if sd.pipeWriter == nil {
return
}
go func() {
if _, err := sd.pipeWriter.Write(data); err != nil {
elapsed := time.Since(sd.startTime)
fmt.Printf("\n\033[1;36m[%s]\033[0m ", formatElapsedTime(elapsed))
fmt.Printf("\033[33m[解压管道写入错误: %v]\033[0m\n", err)
}
}()
}
// processGzipStream 处理 gzip 解压流(在独立协程中运行)
func (sd *StreamDecompressor) processGzipStream() {
defer sd.pipeReader.Close()
// 初始化 gzip 读取器
var err error
sd.gzipReader, err = gzip.NewReader(sd.pipeReader)
if err != nil {
elapsed := time.Since(sd.startTime)
fmt.Printf("\n\033[1;36m[%s]\033[0m ", formatElapsedTime(elapsed))
fmt.Printf("\033[33m[gzip 读取器初始化错误: %v]\033[0m\n", err)
return
}
defer sd.gzipReader.Close()
// 持续读取并解压数据
buffer := make([]byte, 4096)
for {
n, err := sd.gzipReader.Read(buffer)
if n > 0 {
// 解压成功的数据通过回调函数处理(与非压缩数据使用相同的处理逻辑)
chunk := buffer[:n]
if sd.chunkHandler != nil {
sd.chunkHandler(chunk)
}
}
if err == io.EOF {
break
}
if err != nil {
elapsed := time.Since(sd.startTime)
fmt.Printf("\n\033[1;36m[%s]\033[0m ", formatElapsedTime(elapsed))
fmt.Printf("\033[33m[gzip 解压错误: %v]\033[0m\n", err)
break
}
}
}
// Close 关闭解压器和相关资源
func (sd *StreamDecompressor) Close() {
if sd.pipeWriter != nil {
sd.pipeWriter.Close()
}
if sd.gzipReader != nil {
sd.gzipReader.Close()
}
}
// =====================================================================
// 流式传输支持
// =====================================================================
// FlushingWriter 支持流式传输的写入器包装
type FlushingWriter struct {
http.ResponseWriter
flusher http.Flusher
}
// Write 写入数据并立即刷新
func (fw *FlushingWriter) Write(data []byte) (int, error) {
n, err := fw.ResponseWriter.Write(data)
if err != nil {
return n, err
}
// 立即刷新缓冲区,确保流式传输
fw.flusher.Flush()
return n, nil
}
// =====================================================================
// HTTP 代理核心逻辑
// =====================================================================
// createProxyHandler 创建 HTTP 反向代理处理器
func createProxyHandler(targetURL *url.URL) http.HandlerFunc {
proxy := httputil.NewSingleHostReverseProxy(targetURL)
// 配置请求处理逻辑
setupRequestHandler(proxy, targetURL)
// 配置响应处理逻辑
setupResponseHandler(proxy)
// 返回主处理函数
return func(w http.ResponseWriter, r *http.Request) {
handleProxyRequest(proxy, w, r)
}
}
// setupRequestHandler 配置请求处理逻辑
func setupRequestHandler(proxy *httputil.ReverseProxy, targetURL *url.URL) {
originalDirector := proxy.Director
proxy.Director = func(req *http.Request) {
// 执行原始的 Director 逻辑
originalDirector(req)
// 设置代理目标
req.RequestURI = req.URL.RequestURI()
req.Host = targetURL.Host
req.URL.Host = targetURL.Host
req.URL.Scheme = targetURL.Scheme
// 记录请求信息
logRequest(req)
}
}
// setupResponseHandler 配置响应处理逻辑
func setupResponseHandler(proxy *httputil.ReverseProxy) {
proxy.ModifyResponse = func(resp *http.Response) error {
logResponseStart(resp)
return nil
}
}
// handleProxyRequest 处理代理请求
func handleProxyRequest(proxy *httputil.ReverseProxy, w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
// 创建流式日志记录器
logger := NewStreamLogger(w, startTime)
defer logger.Close()
// 如果支持流式传输,启用自动刷新
if flusher, ok := logger.originalWriter.(http.Flusher); ok {
logger.originalWriter = &FlushingWriter{
ResponseWriter: logger.originalWriter,
flusher: flusher,
}
}
// 在请求头中记录开始时间,用于响应处理
r.Header.Set("X-Start-Time", fmt.Sprint(startTime.UnixNano()))
// 执行代理请求
proxy.ServeHTTP(logger, r)
}
// =====================================================================
// 日志记录函数
// =====================================================================
// logRequest 记录请求信息
func logRequest(req *http.Request) {
// 打印请求行
fmt.Printf("\n\033[1;34m%s\033[0m \033[3;34m%s\033[0m\n", req.Method, req.URL.String())
// 打印请求头
for name, values := range req.Header {
for _, value := range values {
fmt.Printf("\033[1;2;34m%s\033[0m: \033[2;3;34m%s\033[0m\n", name, value)
}
}
// 打印请求体
logRequestBody(req)
}
// logRequestBody 记录请求体
func logRequestBody(req *http.Request) {
if req.Body == nil || req.ContentLength <= 0 {
return
}
// 读取请求体
bodyBytes, err := io.ReadAll(req.Body)
if err != nil {
return
}
// 解码 Unicode 转义序列
bodyBytes = unescapeUnicodeBytes(bodyBytes)
// 尝试格式化 JSON
if formattedJSON := tryFormatJSON(bodyBytes); formattedJSON != "" {
fmt.Printf("\n\033[34m%s\033[0m\n", formattedJSON)
} else {
fmt.Printf("\n\033[34m%s\033[0m\n\n", string(bodyBytes))
}
// 重新创建请求体供后续使用
req.Body = io.NopCloser(strings.NewReader(string(bodyBytes)))
}
// tryFormatJSON 尝试格式化 JSON 数据
func tryFormatJSON(data []byte) string {
var jsonObj map[string]any
if err := json.Unmarshal(data, &jsonObj); err != nil {
return ""
}
formatted, err := json.MarshalIndent(jsonObj, "", " ")
if err != nil {
return ""
}
return string(unescapeUnicodeBytes(formatted))
}
// logResponseStart 记录响应开始信息
func logResponseStart(resp *http.Response) {
// 从请求头中获取开始时间
startTimeNano, _ := strconv.ParseInt(resp.Request.Header.Get("X-Start-Time"), 10, 64)
startTime := time.Unix(0, startTimeNano)
elapsed := time.Since(startTime)
// 打印响应状态码(带时间戳)
fmt.Printf("\033[1;36m[%s]\033[0m ", formatElapsedTime(elapsed))
fmt.Printf("\033[1;35m%d\033[0m\n", resp.StatusCode)
// 检查压缩类型
if encoding := resp.Header.Get("Content-Encoding"); strings.Contains(strings.ToLower(encoding), "gzip") {
fmt.Printf("\n\033[1;2;35m[检测到 gzip 压缩]\033[0m\n\n")
}
// 打印响应头
for name, values := range resp.Header {
for _, value := range values {
fmt.Printf("\033[1;2;35m%s\033[0m: \033[2;3;35m%s\033[0m\n", name, value)
}
}
}
// =====================================================================
// 主程序入口
// =====================================================================
func main() {
// 获取配置参数
config := loadConfiguration()
// 创建代理处理器
proxyHandler := createProxyHandler(config.targetURL)
// 设置路由
setupRoutes(proxyHandler, config.targetURL)
// 显示启动信息
printStartupInfo(config)
// 启动 HTTP 服务器
log.Fatal(http.ListenAndServe(":"+config.port, nil))
}
// ProxyConfig 代理服务器配置
type ProxyConfig struct {
targetURL *url.URL
port string
}
// loadConfiguration 加载配置参数
func loadConfiguration() *ProxyConfig {
// 获取目标 URL
targetURLStr := os.Getenv("BASE_URL")
if targetURLStr == "" {
targetURLStr = os.Getenv("ENDPOINT")
}
if targetURLStr == "" {
log.Fatal("❌ 环境变量 BASE_URL 或 ENDPOINT 未设置")
}
// 解析目标 URL
targetURL, err := url.Parse(targetURLStr)
if err != nil {
log.Fatalf("❌ 无效的目标 URL: %v", err)
}
// 获取监听端口
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
return &ProxyConfig{
targetURL: targetURL,
port: port,
}
}
// setupRoutes 设置路由
func setupRoutes(proxyHandler http.HandlerFunc, targetURL *url.URL) {
// 主代理路由
http.HandleFunc("/", proxyHandler)
// 健康检查端点
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "OK - Proxy target: %s\n", targetURL.String())
})
}
// printStartupInfo 打印启动信息
func printStartupInfo(config *ProxyConfig) {
fmt.Printf("🚀 HTTP 代理服务器启动\n")
fmt.Printf(" 监听端口: %s\n", config.port)
fmt.Printf(" 代理目标: %s\n", config.targetURL.String())
fmt.Printf(" 健康检查: http://localhost:%s/health\n", config.port)
fmt.Printf(" 功能特性: 流式响应记录 + 时间戳显示 + gzip 解压支持\n")
fmt.Println("========================================")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment