Skip to content

Instantly share code, notes, and snippets.

@g023
Last active May 26, 2026 06:51
Show Gist options
  • Select an option

  • Save g023/c2bb7b540ffe64cee76023f18f6f9365 to your computer and use it in GitHub Desktop.

Select an option

Save g023/c2bb7b540ffe64cee76023f18f6f9365 to your computer and use it in GitHub Desktop.
OpenAI-compatible proxy for DeepSeek V4 Flash with intelligent auto context compression features
#!/usr/bin/env python3
"""
Zero-dependency OpenAI-compatible proxy for DeepSeek V4 Flash.
Author: g023
License: MIT
All client‑supplied model and generation parameters are **ignored**.
The proxy always uses the model, max output tokens, and other settings
defined in the global configuration (see --help and the constants below).
Optimisations:
- System prompt compression (auto-summarized via DeepSeek API;
originals stored in ./pre_sys/, summaries cached in ./post_sys/)
- Markdown block deduplication (keeps only the latest occurrence full)
- Conversation summarisation triggers when token budget is exceeded
- Assistant reasoning is cached to avoid redundant re‑generation
- Inter‑message content fingerprinting & deduplication (Feature F-1)
- Removes repeated boilerplate segments (environment_info, userMemory,
reminderInstructions, etc.) from user messages across conversation turns.
- Segments are hashed (SHA‑256), duplicates replaced with an empty string
(or a minimal placeholder if the message becomes empty).
- Per‑conversation fingerprint storage with LRU eviction.
* Reads from local file K.dat for API key if DEEPSEEK_API_KEY env var is not set.
* just a proof of concept pet project. Do not expose this server to the internet.
"""
import argparse
import collections
import copy
import hashlib
import http.server
import json
import logging
import os
import re
import signal
import socketserver
import sys
import threading
import time
import urllib.error
import urllib.request
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# ==============================================================================
# Global configuration – these forcibly override every client request
# ==============================================================================
DEEPSEEK_BASE = "https://api.deepseek.com"
DEFAULT_MODEL = "deepseek-v4-flash" # "deepseek-v4-flash" "deepseek-v4-pro" # model that will *always* be used
MAX_CACHE_SIZE = 500 # LRU cache for assistant reasoning
MAX_CONTEXT = 128000 # tokens (context size)
SUMMARY_RATIO = 0.8 # trigger summarisation at 80 % of MAX_CONTEXT
SUMMARY_MODEL = DEFAULT_MODEL # model used for the summarisation call
MAX_OUTPUT_TOKENS = 128000 # max tokens to generate (overrides client)
THINKING_MODE = "auto" # "enabled", "disabled", or "auto" (default)
# --------------------------------------------------------------------------
# Local file save toggles – set to False to disable disk writes
# --------------------------------------------------------------------------
SAVE_PREPOST_MSGS = False # save pre/post message dumps to ./pre_msg/ and ./post_msg/
SAVE_PREPOST_SYSTEM = True # save original/summarized system prompts to ./pre_sys/ and ./post_sys/
# --------------------------------------------------------------------------
# Retry configuration for summarisation calls
# --------------------------------------------------------------------------
SUMMARISE_MAX_RETRIES = 3
SUMMARISE_RETRY_BASE_SLEEP = 2.0 # seconds, doubled each attempt
# --------------------------------------------------------------------------
# Feature F-1: Inter‑message content fingerprinting & deduplication
# --------------------------------------------------------------------------
MAX_FINGERPRINT_HISTORY = 100 # max number of segments stored per conversation
# Known boilerplate XML tags – each as (open_tag, close_tag)
_BOILERPLATE_PATTERNS = {
"environment_info": ("<environment_info>", "</environment_info>"),
"workspace_info": ("<workspace_info>", "</workspace_info>"),
"userMemory": ("<userMemory>", "</userMemory>"),
"sessionMemory": ("<sessionMemory>", "</sessionMemory>"),
"repoMemory": ("<repoMemory>", "</repoMemory>"),
"context": ("<context>", "</context>"),
"reminderInstructions": ("<reminderInstructions>", "</reminderInstructions>"),
"additional_skills_reminder": ("<additional_skills_reminder>", "</additional_skills_reminder>"),
"editorContext": ("<editorContext>", "</editorContext>"),
}
# Fingerprint storage: conv_id -> { segment_hash: (first_index, full_segment_text) }
_segment_fingerprints: Dict[str, Dict[str, Tuple[int, str]]] = {}
_segment_fp_lock = threading.Lock()
# Read the DeepSeek API key
DSEEK_KEY = ""
if os.environ.get("DEEPSEEK_API_KEY"):
DSEEK_KEY = os.environ["DEEPSEEK_API_KEY"]
else:
try:
with open("K.dat", "r") as f:
DSEEK_KEY = f.read().strip()
except Exception:
print("ERROR: DEEPSEEK_API_KEY environment variable not set and K.dat not found.",
file=sys.stderr)
sys.exit(1)
# ==============================================================================
# Logging / dev feedback – directories for pre/post messages
# ==============================================================================
PRE_MSG_DIR = Path("./pre_msg")
POST_MSG_DIR = Path("./post_msg")
def _save_json_message(dir_path: Path, prefix: str, messages: list, msg_hash: str):
"""Save a message list as a tab-indented JSON file."""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename = f"{timestamp}_{msg_hash}.json"
filepath = dir_path / filename
with open(filepath, "w", encoding="utf-8") as f:
json.dump(messages, f, ensure_ascii=False, indent="\t")
except Exception:
logging.exception(f"Failed to save {prefix} message dump")
def _md5_of_messages(messages: list) -> str:
"""Short (8-char) MD5 hash of the messages list (stable)."""
data = json.dumps(messages, sort_keys=True, ensure_ascii=False).encode()
return hashlib.md5(data).hexdigest()[:8]
# ==============================================================================
# Improved token estimation – CJK‑aware heuristic
# ==============================================================================
_CJK_RANGES = [
(0x4E00, 0x9FFF), # CJK Unified Ideographs
(0x3400, 0x4DBF), # CJK Unified Ideographs Extension A
(0x20000, 0x2A6DF), # Extension B
(0x2A700, 0x2B73F), # Extension C
(0x2B740, 0x2B81F), # Extension D
(0x2B820, 0x2CEAF), # Extension E
(0x2CEB0, 0x2EBEF), # Extension F
(0x30000, 0x3134F), # Extension G
(0x31350, 0x323AF), # Extension H
]
_CJK_PUNCT = {0x3000, 0x3001, 0x3002, 0xFF0C, 0xFF0E, 0xFF1A, 0xFF1B, 0xFF01, 0xFF1F,
0x300C, 0x300D, 0x300E, 0x300F, 0x3010, 0x3011, 0x300A, 0x300B}
def _is_cjk(cp: int) -> bool:
if cp in _CJK_PUNCT:
return True
for lo, hi in _CJK_RANGES:
if lo <= cp <= hi:
return True
return False
def estimate_tokens(text: str) -> int:
"""
Token count heuristic:
- For CJK‑heavy text (>50% CJK characters): 1 token per character.
- Otherwise: 1 token per 3.5 characters (conservative for code/English).
"""
total = len(text)
if total == 0:
return 0
cjk_count = sum(1 for ch in text if _is_cjk(ord(ch)))
if cjk_count / total > 0.5:
# Mostly CJK: each character is roughly one token
return max(1, total)
else:
return max(1, int(total // 3.5))
# ==============================================================================
# LRU Cache for assistant reasoning
# ==============================================================================
class LRUCache:
def __init__(self, maxsize: int):
self.maxsize = maxsize
self._cache = collections.OrderedDict()
self._lock = threading.Lock()
def get(self, key: str):
with self._lock:
if key in self._cache:
self._cache.move_to_end(key)
return self._cache[key]
return None
def set(self, key: str, value: dict):
with self._lock:
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = value
if len(self._cache) > self.maxsize:
self._cache.popitem(last=False)
_assistant_cache = LRUCache(MAX_CACHE_SIZE)
# ==============================================================================
# Hashing utilities
# ==============================================================================
def _stable_hash(obj: Any) -> str:
"""Stable SHA256 hash of a JSON‑serialisable object (dict or list)."""
return hashlib.sha256(
json.dumps(obj, sort_keys=True, ensure_ascii=False).encode()
).hexdigest()
def _conv_hash(messages: List[dict]) -> str:
"""Hash of a message list – only fields that affect the conversation identity."""
important_keys = {"role", "content", "tool_calls", "name", "tool_call_id"}
cleaned = [{k: v for k, v in m.items() if k in important_keys} for m in messages]
return _stable_hash(cleaned)
# ==============================================================================
# System prompt compression – auto-summarize via DeepSeek API
# ==============================================================================
PRE_SYS_DIR = Path("./pre_sys")
POST_SYS_DIR = Path("./post_sys")
_sys_lock = threading.Lock()
def _ensure_sys_dirs():
PRE_SYS_DIR.mkdir(parents=True, exist_ok=True)
POST_SYS_DIR.mkdir(parents=True, exist_ok=True)
def _sys_original_path(sys_hash: str) -> Path:
return PRE_SYS_DIR / f"{sys_hash}.txt"
def _sys_summary_path(sys_hash: str) -> Path:
return POST_SYS_DIR / f"{sys_hash}.txt"
def load_summarized_prompt(sys_hash: str) -> Optional[str]:
"""Return the cached summarized prompt if it exists, else None."""
path = _sys_summary_path(sys_hash)
if path.exists():
try:
return path.read_text(encoding="utf-8")
except Exception:
logging.warning(f"Failed to read summarized prompt {sys_hash}")
return None
def save_original_prompt(sys_hash: str, content: str):
"""Atomically save the original system prompt to disk (thread‑safe)."""
path = _sys_original_path(sys_hash)
if path.exists():
return # already saved
tmp_path = path.with_suffix(".tmp")
with _sys_lock:
try:
tmp_path.write_text(content, encoding="utf-8")
tmp_path.rename(path)
except Exception as e:
logging.warning(f"Failed to save original prompt {sys_hash}: {e}")
def save_summarized_prompt(sys_hash: str, content: str):
"""Atomically write a summarized prompt to disk (thread‑safe)."""
path = _sys_summary_path(sys_hash)
tmp_path = path.with_suffix(".tmp")
with _sys_lock:
try:
tmp_path.write_text(content, encoding="utf-8")
tmp_path.rename(path)
except Exception as e:
logging.warning(f"Failed to write summarized prompt {sys_hash}: {e}")
def summarize_system_prompt(original: str, api_key: str) -> str:
"""Call DeepSeek to produce a concise summary of a system prompt."""
summary_prompt = (
"You are a prompt compression assistant. Summarize the following system prompt "
"as concisely as possible while preserving ALL critical instructions, constraints, "
"formatting rules, and behavioral guidelines. Remove redundancy, examples, and "
"verbose explanations. Output ONLY the compressed prompt — no commentary.\n\n"
f"{original}"
)
payload = {
"model": SUMMARY_MODEL,
"messages": [{"role": "user", "content": summary_prompt}],
"max_tokens": 2000,
"temperature": 0.0,
"thinking": {"type": "disabled"},
}
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
last_exc = None
for attempt in range(1, SUMMARISE_MAX_RETRIES + 1):
try:
req = urllib.request.Request(
f"{DEEPSEEK_BASE}/chat/completions",
data=json.dumps(payload).encode(),
headers=headers,
method="POST",
)
with urllib.request.urlopen(req, timeout=120) as resp:
body = json.loads(resp.read().decode())
return body["choices"][0]["message"]["content"]
except Exception as e:
last_exc = e
if attempt < SUMMARISE_MAX_RETRIES:
sleep_time = SUMMARISE_RETRY_BASE_SLEEP * (2 ** (attempt - 1))
logging.warning("System prompt summarization attempt %d failed, retrying in %.1fs: %s",
attempt, sleep_time, e)
time.sleep(sleep_time)
else:
logging.error("System prompt summarization failed after %d attempts", SUMMARISE_MAX_RETRIES)
raise RuntimeError(f"System prompt summarization failed: {last_exc}")
# ==============================================================================
# Markdown block deduplication (fixed – no index corruption)
# ==============================================================================
_FENCE_PATTERN = re.compile(r"(```|~~~)(\w*)\n(.*?)\1", re.DOTALL)
def deduplicate_markdown_blocks(messages: List[dict]) -> int:
"""
For each fenced code block, keep the **last** occurrence full; replace
earlier occurrences with a placeholder. Modifies messages in-place.
Returns the count of blocks replaced.
"""
block_info: Dict[str, int] = {} # hash -> latest global ID
all_matches: List[Dict[str, Any]] = [] # per-match info
replaced_count = 0
global_counter = 0
for msg_idx, msg in enumerate(messages):
content = msg.get("content", "")
if not isinstance(content, str):
continue
for match in _FENCE_PATTERN.finditer(content):
inner_text = match.group(3)
h = hashlib.sha256(inner_text.encode()).hexdigest()
all_matches.append({
"global_id": global_counter,
"msg_idx": msg_idx,
"start": match.start(3),
"end": match.end(3),
"hash": h,
"full_text": inner_text,
})
block_info[h] = global_counter
global_counter += 1
msg_matches: Dict[int, List[dict]] = {}
for m in all_matches:
msg_matches.setdefault(m["msg_idx"], []).append(m)
for msg_idx, matches in msg_matches.items():
msg = messages[msg_idx]
original = msg["content"]
matches_sorted = sorted(matches, key=lambda x: x["start"], reverse=True)
new_parts = []
prev_end = len(original)
for match in matches_sorted:
start, end = match["start"], match["end"]
is_last = block_info.get(match["hash"]) == match["global_id"]
if not is_last:
replacement = ".. (code omitted, see later version) .."
replaced_count += 1
else:
replacement = match["full_text"]
new_parts.append(original[end:prev_end])
new_parts.append(replacement)
prev_end = start
new_parts.append(original[:prev_end])
msg["content"] = "".join(reversed(new_parts))
return replaced_count
# ==============================================================================
# Feature F-1: Inter-message content fingerprinting & deduplication
# ==============================================================================
def _get_conversation_base_id(messages: List[dict]) -> str:
"""
Derive a stable conversation identifier from the first system message
and the first user message (excluding boilerplate tags). This ID persists
across requests of the same conversation, even as new messages are added.
"""
# Find first system message (if any)
sys_content = ""
for msg in messages:
if msg.get("role") == "system" and isinstance(msg.get("content"), str):
sys_content = msg["content"]
break
# Find first user message (if any)
user_content = ""
for msg in messages:
if msg.get("role") == "user" and isinstance(msg.get("content"), str):
# Strip known boilerplate tags to get the "core" user content
content = msg["content"]
for open_tag, close_tag in _BOILERPLATE_PATTERNS.values():
# Remove all occurrences of this tag pair and their contents
# (crude but sufficient for fingerprinting)
pattern = re.escape(open_tag) + r".*?" + re.escape(close_tag)
content = re.sub(pattern, "", content, flags=re.DOTALL)
user_content = content.strip()
break
combined = f"{sys_content}\n{user_content}".strip()
if not combined:
# Fallback: use the full conversation hash (will change each turn,
# but still isolates turns that are completely boilerplate)
return _conv_hash(messages)
return hashlib.sha256(combined.encode()).hexdigest()
def deduplicate_user_message_segments(
messages: List[dict],
conv_id: str
) -> Tuple[List[dict], int]:
"""
For each user message, identify boilerplate segments and omit those
that are identical to previously-seen segments in this conversation.
Modifies messages in-place. Returns (messages, segments_removed).
If a user message becomes empty after removals, it is replaced with
a minimal placeholder "(no new content)".
"""
global _segment_fingerprints
with _segment_fp_lock:
if conv_id not in _segment_fingerprints:
_segment_fingerprints[conv_id] = {}
history = _segment_fingerprints[conv_id]
segments_removed = 0
for msg in messages:
if msg.get("role") != "user":
continue
content = msg.get("content", "")
if not isinstance(content, str):
continue
new_content = content
# Process each boilerplate pattern
for open_tag, close_tag in _BOILERPLATE_PATTERNS.values():
# Find all non-overlapping occurrences
idx = 0
while True:
start = new_content.find(open_tag, idx)
if start == -1:
break
end = new_content.find(close_tag, start + len(open_tag))
if end == -1:
break
end += len(close_tag)
segment = new_content[start:end]
seg_hash = hashlib.sha256(segment.encode()).hexdigest()
if seg_hash in history:
# Duplicate segment: remove it entirely
new_content = new_content[:start] + new_content[end:]
segments_removed += 1
# Continue scanning from the same start position
idx = start
else:
# First time seeing this segment: store it
history[seg_hash] = (len(history), segment)
# Prune if too many entries
if len(history) > MAX_FINGERPRINT_HISTORY:
oldest = min(history.keys(), key=lambda k: history[k][0])
del history[oldest]
idx = end
# After processing, check if the message became empty
new_content = new_content.strip()
if new_content == "":
new_content = "(no new content)"
msg["content"] = new_content
return messages, segments_removed
# ==============================================================================
# Conversation summarisation – with token-aware split & retries
# ==============================================================================
def _total_tokens(messages: List[dict]) -> int:
"""Estimate total token count for a list of messages."""
total = 0
for m in messages:
content = m.get("content", "")
if isinstance(content, str):
total += estimate_tokens(content)
for tc in m.get("tool_calls", []):
total += estimate_tokens(json.dumps(tc.get("function", {}).get("arguments", "")))
return total
def _summarise_messages_with_retry(summarise_payload: dict, api_key: str) -> str:
"""
Call DeepSeek summarisation with retries (exponential backoff).
Raises RuntimeError if all attempts fail.
"""
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
last_exc = None
for attempt in range(1, SUMMARISE_MAX_RETRIES + 1):
try:
req = urllib.request.Request(
f"{DEEPSEEK_BASE}/chat/completions",
data=json.dumps(summarise_payload).encode(),
headers=headers,
method="POST",
)
with urllib.request.urlopen(req, timeout=120) as resp:
body = json.loads(resp.read().decode())
return body["choices"][0]["message"]["content"]
except Exception as e:
last_exc = e
if attempt < SUMMARISE_MAX_RETRIES:
sleep_time = SUMMARISE_RETRY_BASE_SLEEP * (2 ** (attempt - 1))
logging.warning("Summarisation attempt %d failed, retrying in %.1fs: %s",
attempt, sleep_time, e)
time.sleep(sleep_time)
else:
logging.error("Summarisation failed after %d attempts", SUMMARISE_MAX_RETRIES)
raise RuntimeError(f"Summarisation API call failed: {last_exc}")
def _merge_system_messages(messages: List[dict]) -> Tuple[Optional[str], List[dict]]:
"""Extract and merge all system messages into a single string. Returns the merged
content (or None if none) and the remaining non‑system messages."""
systems = [m["content"] for m in messages if m["role"] == "system" and isinstance(m.get("content"), str)]
others = [m for m in messages if m["role"] != "system"]
merged = "\n".join(systems) if systems else None
return merged, others
def maybe_summarize(messages: List[dict], api_key: str,
max_context: int = MAX_CONTEXT,
ratio: float = SUMMARY_RATIO) -> Tuple[List[dict], bool]:
"""
If total estimated tokens > max_context * ratio, summarise oldest messages
(except system) and replace them with a condensed summary message.
Handles multiple system messages by merging them.
Returns a new message list (does not modify original) and a boolean indicating
whether summarisation was performed.
Guarantees at most ONE system message.
"""
threshold = int(max_context * ratio)
total = _total_tokens(messages)
if total <= threshold:
merged_sys, non_sys = _merge_system_messages(messages)
if merged_sys is not None:
return [{"role": "system", "content": merged_sys}] + non_sys, False
return messages, False
merged_sys, non_system = _merge_system_messages(messages)
if not non_system:
prefix = [{"role": "system", "content": merged_sys}] if merged_sys else []
return prefix, False
# Token-aware split: accumulate messages from the start until the token
# deficit is covered (or at least half the messages are covered, whichever
# is less). The deficit is total - threshold.
deficit = total - threshold
accumulated = 0
idx = 0
for i, msg in enumerate(non_system):
# Add token count of this message
accumulated += estimate_tokens(msg.get("content", ""))
for tc in msg.get("tool_calls", []):
accumulated += estimate_tokens(json.dumps(tc.get("function", {}).get("arguments", "")))
if accumulated >= deficit and i >= len(non_system) // 2:
idx = i + 1 # summarise up to and including this message
break
else:
# Not enough tokens? Fallback to half the messages
idx = max(1, len(non_system) // 2)
# --- Critical fix: ensure we never split a tool message from its
# preceding assistant message with tool_calls ---
# Walk forward from idx until we are at a safe split point:
# - Not in the middle of an assistant(tool_calls) → tool pair
# - i.e., the message at idx must NOT be a "tool" role message
# whose preceding assistant message had tool_calls
while idx < len(non_system):
msg = non_system[idx]
if msg.get("role") == "tool":
# This tool message belongs to a preceding assistant; move split past it
idx += 1
continue
if msg.get("role") == "assistant" and msg.get("tool_calls"):
# Check if the *next* message is a tool response to this assistant
if idx + 1 < len(non_system) and non_system[idx + 1].get("role") == "tool":
# Move split past both the assistant and its tool response(s)
idx += 1
while idx < len(non_system) and non_system[idx].get("role") == "tool":
idx += 1
continue
break
to_summarise = non_system[:idx]
to_keep = non_system[idx:]
if not to_summarise:
prefix = [{"role": "system", "content": merged_sys}] if merged_sys else []
return prefix + non_system, False
summary_prompt = (
"Summarise the following conversation excerpt. "
"Retain all critical facts, decisions, and code fragments. "
"Be concise but complete.\n\n"
)
summarise_text = ""
for m in to_summarise:
role = m["role"]
content = m.get("content", "")
if isinstance(content, str):
summarise_text += f"[{role}]: {content}\n"
payload = {
"model": SUMMARY_MODEL,
"messages": [
{"role": "user", "content": summary_prompt + summarise_text}
],
"max_tokens": 1000,
"temperature": 0.0,
"thinking": {"type": "disabled"},
}
try:
summary = _summarise_messages_with_retry(payload, api_key)
except RuntimeError as e:
logging.warning(f"Summarisation failed, falling back to truncation: {e}")
summary = "[Earlier conversation truncated due to length]"
new_messages: List[dict] = []
if merged_sys:
new_sys_content = merged_sys + "\n\n[Earlier conversation summary]\n" + summary
new_messages.append({"role": "system", "content": new_sys_content})
else:
new_messages.append({"role": "system", "content": f"[Earlier conversation summary]\n{summary}"})
new_messages.extend(to_keep)
return new_messages, True
# ==============================================================================
# Reasoning injection helpers
# ==============================================================================
def cache_assistant_message(original_msgs: List[dict], assistant_msg: dict):
"""Cache assistant message (with reasoning) so it can be reused on subsequent turns."""
if not assistant_msg.get("tool_calls") and not assistant_msg.get("reasoning_content"):
return
prefix = [m.copy() for m in original_msgs]
clean_asst = {k: v for k, v in assistant_msg.items() if k != "reasoning_content"}
prefix.append(clean_asst)
_assistant_cache.set(_conv_hash(prefix), assistant_msg)
def inject_reasoning(messages: List[dict]):
"""Look up cached reasoning_content for tool‑call assistant messages and inject it."""
for i, msg in enumerate(messages):
if msg.get("role") != "assistant":
continue
if not msg.get("tool_calls"):
continue
if "reasoning_content" in msg:
continue
prefix = messages[:i+1]
cached = _assistant_cache.get(_conv_hash(prefix))
if cached and "reasoning_content" in cached:
msg["reasoning_content"] = cached["reasoning_content"]
def should_disable_thinking(messages: List[dict]) -> bool:
"""Return True if thinking should be disabled for the current conversation
(i.e. there is a tool‑call assistant message WITHOUT reasoning, meaning the
model doesn’t need to produce new reasoning)."""
return any(
m.get("role") == "assistant" and m.get("tool_calls") and "reasoning_content" not in m
for m in messages
)
# ==============================================================================
# DeepSeek API helpers
# ==============================================================================
def _make_deepseek_request(payload: dict, stream: bool) -> urllib.request.Request:
headers = {
"Authorization": f"Bearer {DSEEK_KEY}",
"Content-Type": "application/json",
"Accept": "text/event-stream" if stream else "application/json",
}
return urllib.request.Request(
f"{DEEPSEEK_BASE}/chat/completions",
data=json.dumps(payload).encode(),
headers=headers,
method="POST",
)
def deepseek_nonstream(payload: dict) -> dict:
"""Perform a non‑streaming request. Raises RuntimeError on HTTP errors."""
req = _make_deepseek_request(payload, stream=False)
try:
with urllib.request.urlopen(req, timeout=600) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
error_body = e.read().decode() if e.fp else ""
raise RuntimeError(f"DeepSeek HTTP {e.code}: {error_body}") from e
# ==============================================================================
# Streaming buffer for accumulating tool calls
# ==============================================================================
class StreamBuffer:
def __init__(self):
self.reasoning = ""
self.content = ""
self.tool_calls: Dict[int, dict] = {}
self.finish_reason: Optional[str] = None
self.usage: Optional[dict] = None
def process_chunk(self, chunk: dict) -> None:
for choice in chunk.get("choices", []):
delta = choice.get("delta", {})
if "reasoning_content" in delta:
rc = delta["reasoning_content"]
self.reasoning = "" if rc is None else self.reasoning + rc
if "content" in delta:
ct = delta["content"]
self.content = "" if ct is None else self.content + ct
for tc in delta.get("tool_calls", []):
idx = tc.get("index")
if idx is None:
continue
if idx not in self.tool_calls:
self.tool_calls[idx] = {
"id": tc.get("id", ""),
"type": tc.get("type", "function"),
"function": {
"name": "",
"arguments": "",
},
}
cur = self.tool_calls[idx]
tid = tc.get("id")
if tid is not None:
cur["id"] = tid
ttype = tc.get("type")
if ttype is not None:
cur["type"] = ttype
func_raw = tc.get("function")
func = func_raw if isinstance(func_raw, dict) else {}
name = func.get("name")
if name is not None:
cur["function"]["name"] = name
args = func.get("arguments")
if args is not None:
cur["function"]["arguments"] += args
if "message" in choice:
msg = choice["message"]
rc_msg = msg.get("reasoning_content")
ct_msg = msg.get("content")
if rc_msg is not None:
self.reasoning = rc_msg
if ct_msg is not None:
self.content = ct_msg
finish = choice.get("finish_reason")
if finish is not None:
self.finish_reason = finish
if "usage" in chunk:
self.usage = chunk["usage"]
def build_assistant_message(self) -> dict:
msg: Dict[str, Any] = {"role": "assistant"}
if self.reasoning:
msg["reasoning_content"] = self.reasoning
if self.content:
msg["content"] = self.content
if self.tool_calls:
msg["tool_calls"] = [self.tool_calls[k] for k in sorted(self.tool_calls)]
return msg
# ==============================================================================
# HTTP Request Handler
# ==============================================================================
class ProxyHandler(http.server.BaseHTTPRequestHandler):
def log_message(self, format, *args):
logging.info("%s - %s", self.client_address[0], format % args)
def do_POST(self):
if self.path != "/v1/chat/completions":
self.send_error(404)
return
content_length = int(self.headers.get("Content-Length", 0))
if not content_length:
self.send_error(400, "Empty body")
return
body = self.rfile.read(content_length)
try:
client_req = json.loads(body)
except json.JSONDecodeError:
self.send_error(400, "Invalid JSON")
return
messages = client_req.get("messages", [])
# --- Early validation: messages must be a non-empty list ---
if not isinstance(messages, list) or len(messages) == 0:
self.send_error(400, "Empty or invalid 'messages' array")
return
stream = client_req.get("stream", False)
# Deep copy before any in‑place mutations
original_messages = copy.deepcopy(messages)
# ---------- Pre-processing metrics ----------
original_msg_count = len(original_messages)
original_tokens = _total_tokens(original_messages)
msg_hash = _md5_of_messages(original_messages)
# Save original message for developer introspection
if SAVE_PREPOST_MSGS:
_save_json_message(PRE_MSG_DIR, "pre", original_messages, msg_hash)
# ---- Pipeline ----
# 1. Markdown block deduplication
replaced_blocks = deduplicate_markdown_blocks(messages)
# 2. Inter-message content fingerprinting & deduplication (Feature F-1)
conv_id = _get_conversation_base_id(original_messages)
messages, deduped_segments = deduplicate_user_message_segments(messages, conv_id)
# 3. Conversation summarisation (token-aware split)
messages, summarized = maybe_summarize(
messages,
api_key=DSEEK_KEY,
max_context=MAX_CONTEXT,
ratio=SUMMARY_RATIO,
)
# 4. System prompt compression – auto-summarize & cache
compressed_prompts_found = 0
for msg in messages:
if msg["role"] == "system" and isinstance(msg.get("content"), str):
sys_content = msg["content"]
h = _stable_hash({"content": sys_content})
# Save original if not already saved
if SAVE_PREPOST_SYSTEM:
save_original_prompt(h, sys_content)
# Check for cached summarized version
summarized_sys = load_summarized_prompt(h)
if summarized_sys:
logging.debug("Using summarized system prompt for hash %s", h[:12])
msg["content"] = summarized_sys
compressed_prompts_found += 1
else:
# Generate summary via DeepSeek API
try:
logging.info("Generating summarized system prompt for hash %s", h[:12])
summarized_sys = summarize_system_prompt(sys_content, DSEEK_KEY)
if SAVE_PREPOST_SYSTEM:
save_summarized_prompt(h, summarized_sys)
msg["content"] = summarized_sys
compressed_prompts_found += 1
except RuntimeError as e:
logging.warning("Failed to summarize system prompt, using original: %s", e)
# 5. Reasoning injection
inject_reasoning(messages)
# 6. Build final payload
payload = dict(client_req)
payload["messages"] = messages
payload["model"] = DEFAULT_MODEL
payload["max_tokens"] = MAX_OUTPUT_TOKENS
if THINKING_MODE == "enabled":
payload["thinking"] = {"type": "enabled"}
elif THINKING_MODE == "disabled":
payload["thinking"] = {"type": "disabled"}
else:
payload["thinking"] = {
"type": "disabled" if should_disable_thinking(messages) else "enabled"
}
payload["stream"] = stream
# ---------- Post-processing metrics ----------
final_msg_count = len(messages)
final_tokens = _total_tokens(messages)
compression_pct = (1 - final_tokens / original_tokens) * 100 if original_tokens > 0 else 0.0
if SAVE_PREPOST_MSGS:
_save_json_message(POST_MSG_DIR, "post", messages, msg_hash)
logging.info(
f"REQ {msg_hash} | msgs: {original_msg_count} → {final_msg_count} "
f"| tokens: {original_tokens} → {final_tokens} ({compression_pct:+.1f}%) "
f"| blocks dedup'd: {replaced_blocks} "
f"| dedup'd segments: {deduped_segments} "
f"| summarized: {'yes' if summarized else 'no'} "
f"| compressed prompts: {compressed_prompts_found} "
f"| stream: {stream} "
f"| thinking: {payload['thinking']['type']}"
)
# 7. Dispatch
try:
if stream:
self._handle_stream(payload, original_messages)
else:
self._handle_nonstream(payload, original_messages)
except RuntimeError as e:
logging.error("Upstream error: %s", e)
self.send_error(502, f"Upstream error: {e}")
except Exception:
logging.exception("Unexpected proxy error")
try:
self.send_error(500, "Internal proxy error")
except Exception:
pass
def _handle_nonstream(self, payload, original_msgs):
resp = deepseek_nonstream(payload)
choices = resp.get("choices")
if not choices or len(choices) == 0:
logging.error("DeepSeek returned empty choices array")
self.send_error(502, "Empty response from upstream")
return
choice = choices[0]
assistant_msg = choice.get("message", {}).copy()
cache_assistant_message(original_msgs, assistant_msg)
if "reasoning_content" in assistant_msg:
del assistant_msg["reasoning_content"]
choice["message"] = assistant_msg
body = json.dumps(resp).encode()
try:
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
except (BrokenPipeError, ConnectionResetError, OSError) as e:
logging.warning("Client disconnected while sending non‑stream response: %s", e)
def _handle_stream(self, payload, original_msgs):
req = _make_deepseek_request(payload, stream=True)
try:
with urllib.request.urlopen(req, timeout=600) as upstream:
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.end_headers()
buffer = StreamBuffer()
done = False
try:
for line in upstream:
line_str = line.decode() if isinstance(line, bytes) else line
if line_str.startswith("data:"):
data_part = line_str[5:].strip()
if data_part == "[DONE]":
self.wfile.write(b"data: [DONE]\n\n")
self.wfile.flush()
done = True
break
try:
chunk = json.loads(data_part)
except json.JSONDecodeError:
self.wfile.write(line_str.encode() + b"\n")
self.wfile.flush()
continue
buffer.process_chunk(chunk)
for choice in chunk.get("choices", []):
if "delta" in choice:
choice["delta"].pop("reasoning_content", None)
if "message" in choice:
choice["message"].pop("reasoning_content", None)
self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode())
self.wfile.flush()
else:
self.wfile.write(line_str.encode() + b"\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError, OSError) as e:
logging.warning("Client disconnected during stream: %s", e)
done = True
except Exception:
logging.exception("Unexpected error while streaming response")
done = True
if not done:
try:
self.wfile.write(b"data: [DONE]\n\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError, OSError):
pass
assistant_msg = buffer.build_assistant_message()
if assistant_msg.get("tool_calls") or assistant_msg.get("reasoning_content"):
cache_assistant_message(original_msgs, assistant_msg)
except (urllib.error.HTTPError, urllib.error.URLError) as e:
logging.error("Upstream connection error: %s", e)
error_detail = ""
if isinstance(e, urllib.error.HTTPError):
try:
error_detail = e.read().decode()
except Exception:
pass
self.send_error(502, f"Upstream error: {error_detail}")
# ==============================================================================
# Main – with fixed Ctrl+C handling (avoid deadlock) and dev directories
# ==============================================================================
def main():
global MAX_CONTEXT, SUMMARY_RATIO, MAX_OUTPUT_TOKENS, THINKING_MODE
parser = argparse.ArgumentParser(description="DeepSeek V4 Flash OpenAI Proxy (globals forced)")
parser.add_argument("--port", type=int, default=8080, help="Listening port")
parser.add_argument("--host", default="0.0.0.0", help="Bind address")
parser.add_argument("--max-context", type=int, default=128000, help="Max context tokens")
parser.add_argument("--summarize-ratio", type=float, default=0.8,
help="Trigger summarisation when tokens exceed ratio * max-context")
parser.add_argument("--disable-compression", action="store_true",
help="Do not auto-summarize system prompts (use originals as-is)")
parser.add_argument("--max-output-tokens", type=int, default=128000,
help="Force this many max tokens for generation (overrides client)")
parser.add_argument("--thinking", choices=["enabled", "disabled", "auto"], default="auto",
help="Force thinking mode (default: auto)")
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
if not args.disable_compression and SAVE_PREPOST_SYSTEM:
_ensure_sys_dirs()
MAX_CONTEXT = args.max_context
SUMMARY_RATIO = args.summarize_ratio
MAX_OUTPUT_TOKENS = args.max_output_tokens
THINKING_MODE = args.thinking
# Ensure developer dump directories exist
if SAVE_PREPOST_MSGS:
PRE_MSG_DIR.mkdir(parents=True, exist_ok=True)
POST_MSG_DIR.mkdir(parents=True, exist_ok=True)
server = socketserver.ThreadingTCPServer((args.host, args.port), ProxyHandler)
server.daemon_threads = True
# ---- Graceful shutdown WITHOUT deadlocking the main thread ----
shutdown_lock = threading.Lock()
shutting_down = False
def _shutdown(signum, frame):
nonlocal shutting_down
with shutdown_lock:
if shutting_down:
return
shutting_down = True
logging.info("Received signal %s, shutting down.", signum)
threading.Thread(target=server.shutdown, daemon=True).start()
signal.signal(signal.SIGTERM, _shutdown)
signal.signal(signal.SIGINT, _shutdown)
logging.info(f"Proxy listening on {args.host}:{args.port}")
logging.info(f"Forced model: {DEFAULT_MODEL}, max_tokens: {MAX_OUTPUT_TOKENS}, thinking: {THINKING_MODE}")
logging.info(f"Pre/post message dumps: {PRE_MSG_DIR} / {POST_MSG_DIR}")
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
logging.info("Server stopped.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment