Last active
May 26, 2026 06:51
-
-
Save g023/c2bb7b540ffe64cee76023f18f6f9365 to your computer and use it in GitHub Desktop.
OpenAI-compatible proxy for DeepSeek V4 Flash with intelligent auto context compression features
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Zero-dependency OpenAI-compatible proxy for DeepSeek V4 Flash. | |
| Author: g023 | |
| License: MIT | |
| All client‑supplied model and generation parameters are **ignored**. | |
| The proxy always uses the model, max output tokens, and other settings | |
| defined in the global configuration (see --help and the constants below). | |
| Optimisations: | |
| - System prompt compression (auto-summarized via DeepSeek API; | |
| originals stored in ./pre_sys/, summaries cached in ./post_sys/) | |
| - Markdown block deduplication (keeps only the latest occurrence full) | |
| - Conversation summarisation triggers when token budget is exceeded | |
| - Assistant reasoning is cached to avoid redundant re‑generation | |
| - Inter‑message content fingerprinting & deduplication (Feature F-1) | |
| - Removes repeated boilerplate segments (environment_info, userMemory, | |
| reminderInstructions, etc.) from user messages across conversation turns. | |
| - Segments are hashed (SHA‑256), duplicates replaced with an empty string | |
| (or a minimal placeholder if the message becomes empty). | |
| - Per‑conversation fingerprint storage with LRU eviction. | |
| * Reads from local file K.dat for API key if DEEPSEEK_API_KEY env var is not set. | |
| * just a proof of concept pet project. Do not expose this server to the internet. | |
| """ | |
| import argparse | |
| import collections | |
| import copy | |
| import hashlib | |
| import http.server | |
| import json | |
| import logging | |
| import os | |
| import re | |
| import signal | |
| import socketserver | |
| import sys | |
| import threading | |
| import time | |
| import urllib.error | |
| import urllib.request | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| # ============================================================================== | |
| # Global configuration – these forcibly override every client request | |
| # ============================================================================== | |
| DEEPSEEK_BASE = "https://api.deepseek.com" | |
| DEFAULT_MODEL = "deepseek-v4-flash" # "deepseek-v4-flash" "deepseek-v4-pro" # model that will *always* be used | |
| MAX_CACHE_SIZE = 500 # LRU cache for assistant reasoning | |
| MAX_CONTEXT = 128000 # tokens (context size) | |
| SUMMARY_RATIO = 0.8 # trigger summarisation at 80 % of MAX_CONTEXT | |
| SUMMARY_MODEL = DEFAULT_MODEL # model used for the summarisation call | |
| MAX_OUTPUT_TOKENS = 128000 # max tokens to generate (overrides client) | |
| THINKING_MODE = "auto" # "enabled", "disabled", or "auto" (default) | |
| # -------------------------------------------------------------------------- | |
| # Local file save toggles – set to False to disable disk writes | |
| # -------------------------------------------------------------------------- | |
| SAVE_PREPOST_MSGS = False # save pre/post message dumps to ./pre_msg/ and ./post_msg/ | |
| SAVE_PREPOST_SYSTEM = True # save original/summarized system prompts to ./pre_sys/ and ./post_sys/ | |
| # -------------------------------------------------------------------------- | |
| # Retry configuration for summarisation calls | |
| # -------------------------------------------------------------------------- | |
| SUMMARISE_MAX_RETRIES = 3 | |
| SUMMARISE_RETRY_BASE_SLEEP = 2.0 # seconds, doubled each attempt | |
| # -------------------------------------------------------------------------- | |
| # Feature F-1: Inter‑message content fingerprinting & deduplication | |
| # -------------------------------------------------------------------------- | |
| MAX_FINGERPRINT_HISTORY = 100 # max number of segments stored per conversation | |
| # Known boilerplate XML tags – each as (open_tag, close_tag) | |
| _BOILERPLATE_PATTERNS = { | |
| "environment_info": ("<environment_info>", "</environment_info>"), | |
| "workspace_info": ("<workspace_info>", "</workspace_info>"), | |
| "userMemory": ("<userMemory>", "</userMemory>"), | |
| "sessionMemory": ("<sessionMemory>", "</sessionMemory>"), | |
| "repoMemory": ("<repoMemory>", "</repoMemory>"), | |
| "context": ("<context>", "</context>"), | |
| "reminderInstructions": ("<reminderInstructions>", "</reminderInstructions>"), | |
| "additional_skills_reminder": ("<additional_skills_reminder>", "</additional_skills_reminder>"), | |
| "editorContext": ("<editorContext>", "</editorContext>"), | |
| } | |
| # Fingerprint storage: conv_id -> { segment_hash: (first_index, full_segment_text) } | |
| _segment_fingerprints: Dict[str, Dict[str, Tuple[int, str]]] = {} | |
| _segment_fp_lock = threading.Lock() | |
| # Read the DeepSeek API key | |
| DSEEK_KEY = "" | |
| if os.environ.get("DEEPSEEK_API_KEY"): | |
| DSEEK_KEY = os.environ["DEEPSEEK_API_KEY"] | |
| else: | |
| try: | |
| with open("K.dat", "r") as f: | |
| DSEEK_KEY = f.read().strip() | |
| except Exception: | |
| print("ERROR: DEEPSEEK_API_KEY environment variable not set and K.dat not found.", | |
| file=sys.stderr) | |
| sys.exit(1) | |
| # ============================================================================== | |
| # Logging / dev feedback – directories for pre/post messages | |
| # ============================================================================== | |
| PRE_MSG_DIR = Path("./pre_msg") | |
| POST_MSG_DIR = Path("./post_msg") | |
| def _save_json_message(dir_path: Path, prefix: str, messages: list, msg_hash: str): | |
| """Save a message list as a tab-indented JSON file.""" | |
| try: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
| filename = f"{timestamp}_{msg_hash}.json" | |
| filepath = dir_path / filename | |
| with open(filepath, "w", encoding="utf-8") as f: | |
| json.dump(messages, f, ensure_ascii=False, indent="\t") | |
| except Exception: | |
| logging.exception(f"Failed to save {prefix} message dump") | |
| def _md5_of_messages(messages: list) -> str: | |
| """Short (8-char) MD5 hash of the messages list (stable).""" | |
| data = json.dumps(messages, sort_keys=True, ensure_ascii=False).encode() | |
| return hashlib.md5(data).hexdigest()[:8] | |
| # ============================================================================== | |
| # Improved token estimation – CJK‑aware heuristic | |
| # ============================================================================== | |
| _CJK_RANGES = [ | |
| (0x4E00, 0x9FFF), # CJK Unified Ideographs | |
| (0x3400, 0x4DBF), # CJK Unified Ideographs Extension A | |
| (0x20000, 0x2A6DF), # Extension B | |
| (0x2A700, 0x2B73F), # Extension C | |
| (0x2B740, 0x2B81F), # Extension D | |
| (0x2B820, 0x2CEAF), # Extension E | |
| (0x2CEB0, 0x2EBEF), # Extension F | |
| (0x30000, 0x3134F), # Extension G | |
| (0x31350, 0x323AF), # Extension H | |
| ] | |
| _CJK_PUNCT = {0x3000, 0x3001, 0x3002, 0xFF0C, 0xFF0E, 0xFF1A, 0xFF1B, 0xFF01, 0xFF1F, | |
| 0x300C, 0x300D, 0x300E, 0x300F, 0x3010, 0x3011, 0x300A, 0x300B} | |
| def _is_cjk(cp: int) -> bool: | |
| if cp in _CJK_PUNCT: | |
| return True | |
| for lo, hi in _CJK_RANGES: | |
| if lo <= cp <= hi: | |
| return True | |
| return False | |
| def estimate_tokens(text: str) -> int: | |
| """ | |
| Token count heuristic: | |
| - For CJK‑heavy text (>50% CJK characters): 1 token per character. | |
| - Otherwise: 1 token per 3.5 characters (conservative for code/English). | |
| """ | |
| total = len(text) | |
| if total == 0: | |
| return 0 | |
| cjk_count = sum(1 for ch in text if _is_cjk(ord(ch))) | |
| if cjk_count / total > 0.5: | |
| # Mostly CJK: each character is roughly one token | |
| return max(1, total) | |
| else: | |
| return max(1, int(total // 3.5)) | |
| # ============================================================================== | |
| # LRU Cache for assistant reasoning | |
| # ============================================================================== | |
| class LRUCache: | |
| def __init__(self, maxsize: int): | |
| self.maxsize = maxsize | |
| self._cache = collections.OrderedDict() | |
| self._lock = threading.Lock() | |
| def get(self, key: str): | |
| with self._lock: | |
| if key in self._cache: | |
| self._cache.move_to_end(key) | |
| return self._cache[key] | |
| return None | |
| def set(self, key: str, value: dict): | |
| with self._lock: | |
| if key in self._cache: | |
| self._cache.move_to_end(key) | |
| self._cache[key] = value | |
| if len(self._cache) > self.maxsize: | |
| self._cache.popitem(last=False) | |
| _assistant_cache = LRUCache(MAX_CACHE_SIZE) | |
| # ============================================================================== | |
| # Hashing utilities | |
| # ============================================================================== | |
| def _stable_hash(obj: Any) -> str: | |
| """Stable SHA256 hash of a JSON‑serialisable object (dict or list).""" | |
| return hashlib.sha256( | |
| json.dumps(obj, sort_keys=True, ensure_ascii=False).encode() | |
| ).hexdigest() | |
| def _conv_hash(messages: List[dict]) -> str: | |
| """Hash of a message list – only fields that affect the conversation identity.""" | |
| important_keys = {"role", "content", "tool_calls", "name", "tool_call_id"} | |
| cleaned = [{k: v for k, v in m.items() if k in important_keys} for m in messages] | |
| return _stable_hash(cleaned) | |
| # ============================================================================== | |
| # System prompt compression – auto-summarize via DeepSeek API | |
| # ============================================================================== | |
| PRE_SYS_DIR = Path("./pre_sys") | |
| POST_SYS_DIR = Path("./post_sys") | |
| _sys_lock = threading.Lock() | |
| def _ensure_sys_dirs(): | |
| PRE_SYS_DIR.mkdir(parents=True, exist_ok=True) | |
| POST_SYS_DIR.mkdir(parents=True, exist_ok=True) | |
| def _sys_original_path(sys_hash: str) -> Path: | |
| return PRE_SYS_DIR / f"{sys_hash}.txt" | |
| def _sys_summary_path(sys_hash: str) -> Path: | |
| return POST_SYS_DIR / f"{sys_hash}.txt" | |
| def load_summarized_prompt(sys_hash: str) -> Optional[str]: | |
| """Return the cached summarized prompt if it exists, else None.""" | |
| path = _sys_summary_path(sys_hash) | |
| if path.exists(): | |
| try: | |
| return path.read_text(encoding="utf-8") | |
| except Exception: | |
| logging.warning(f"Failed to read summarized prompt {sys_hash}") | |
| return None | |
| def save_original_prompt(sys_hash: str, content: str): | |
| """Atomically save the original system prompt to disk (thread‑safe).""" | |
| path = _sys_original_path(sys_hash) | |
| if path.exists(): | |
| return # already saved | |
| tmp_path = path.with_suffix(".tmp") | |
| with _sys_lock: | |
| try: | |
| tmp_path.write_text(content, encoding="utf-8") | |
| tmp_path.rename(path) | |
| except Exception as e: | |
| logging.warning(f"Failed to save original prompt {sys_hash}: {e}") | |
| def save_summarized_prompt(sys_hash: str, content: str): | |
| """Atomically write a summarized prompt to disk (thread‑safe).""" | |
| path = _sys_summary_path(sys_hash) | |
| tmp_path = path.with_suffix(".tmp") | |
| with _sys_lock: | |
| try: | |
| tmp_path.write_text(content, encoding="utf-8") | |
| tmp_path.rename(path) | |
| except Exception as e: | |
| logging.warning(f"Failed to write summarized prompt {sys_hash}: {e}") | |
| def summarize_system_prompt(original: str, api_key: str) -> str: | |
| """Call DeepSeek to produce a concise summary of a system prompt.""" | |
| summary_prompt = ( | |
| "You are a prompt compression assistant. Summarize the following system prompt " | |
| "as concisely as possible while preserving ALL critical instructions, constraints, " | |
| "formatting rules, and behavioral guidelines. Remove redundancy, examples, and " | |
| "verbose explanations. Output ONLY the compressed prompt — no commentary.\n\n" | |
| f"{original}" | |
| ) | |
| payload = { | |
| "model": SUMMARY_MODEL, | |
| "messages": [{"role": "user", "content": summary_prompt}], | |
| "max_tokens": 2000, | |
| "temperature": 0.0, | |
| "thinking": {"type": "disabled"}, | |
| } | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| } | |
| last_exc = None | |
| for attempt in range(1, SUMMARISE_MAX_RETRIES + 1): | |
| try: | |
| req = urllib.request.Request( | |
| f"{DEEPSEEK_BASE}/chat/completions", | |
| data=json.dumps(payload).encode(), | |
| headers=headers, | |
| method="POST", | |
| ) | |
| with urllib.request.urlopen(req, timeout=120) as resp: | |
| body = json.loads(resp.read().decode()) | |
| return body["choices"][0]["message"]["content"] | |
| except Exception as e: | |
| last_exc = e | |
| if attempt < SUMMARISE_MAX_RETRIES: | |
| sleep_time = SUMMARISE_RETRY_BASE_SLEEP * (2 ** (attempt - 1)) | |
| logging.warning("System prompt summarization attempt %d failed, retrying in %.1fs: %s", | |
| attempt, sleep_time, e) | |
| time.sleep(sleep_time) | |
| else: | |
| logging.error("System prompt summarization failed after %d attempts", SUMMARISE_MAX_RETRIES) | |
| raise RuntimeError(f"System prompt summarization failed: {last_exc}") | |
| # ============================================================================== | |
| # Markdown block deduplication (fixed – no index corruption) | |
| # ============================================================================== | |
| _FENCE_PATTERN = re.compile(r"(```|~~~)(\w*)\n(.*?)\1", re.DOTALL) | |
| def deduplicate_markdown_blocks(messages: List[dict]) -> int: | |
| """ | |
| For each fenced code block, keep the **last** occurrence full; replace | |
| earlier occurrences with a placeholder. Modifies messages in-place. | |
| Returns the count of blocks replaced. | |
| """ | |
| block_info: Dict[str, int] = {} # hash -> latest global ID | |
| all_matches: List[Dict[str, Any]] = [] # per-match info | |
| replaced_count = 0 | |
| global_counter = 0 | |
| for msg_idx, msg in enumerate(messages): | |
| content = msg.get("content", "") | |
| if not isinstance(content, str): | |
| continue | |
| for match in _FENCE_PATTERN.finditer(content): | |
| inner_text = match.group(3) | |
| h = hashlib.sha256(inner_text.encode()).hexdigest() | |
| all_matches.append({ | |
| "global_id": global_counter, | |
| "msg_idx": msg_idx, | |
| "start": match.start(3), | |
| "end": match.end(3), | |
| "hash": h, | |
| "full_text": inner_text, | |
| }) | |
| block_info[h] = global_counter | |
| global_counter += 1 | |
| msg_matches: Dict[int, List[dict]] = {} | |
| for m in all_matches: | |
| msg_matches.setdefault(m["msg_idx"], []).append(m) | |
| for msg_idx, matches in msg_matches.items(): | |
| msg = messages[msg_idx] | |
| original = msg["content"] | |
| matches_sorted = sorted(matches, key=lambda x: x["start"], reverse=True) | |
| new_parts = [] | |
| prev_end = len(original) | |
| for match in matches_sorted: | |
| start, end = match["start"], match["end"] | |
| is_last = block_info.get(match["hash"]) == match["global_id"] | |
| if not is_last: | |
| replacement = ".. (code omitted, see later version) .." | |
| replaced_count += 1 | |
| else: | |
| replacement = match["full_text"] | |
| new_parts.append(original[end:prev_end]) | |
| new_parts.append(replacement) | |
| prev_end = start | |
| new_parts.append(original[:prev_end]) | |
| msg["content"] = "".join(reversed(new_parts)) | |
| return replaced_count | |
| # ============================================================================== | |
| # Feature F-1: Inter-message content fingerprinting & deduplication | |
| # ============================================================================== | |
| def _get_conversation_base_id(messages: List[dict]) -> str: | |
| """ | |
| Derive a stable conversation identifier from the first system message | |
| and the first user message (excluding boilerplate tags). This ID persists | |
| across requests of the same conversation, even as new messages are added. | |
| """ | |
| # Find first system message (if any) | |
| sys_content = "" | |
| for msg in messages: | |
| if msg.get("role") == "system" and isinstance(msg.get("content"), str): | |
| sys_content = msg["content"] | |
| break | |
| # Find first user message (if any) | |
| user_content = "" | |
| for msg in messages: | |
| if msg.get("role") == "user" and isinstance(msg.get("content"), str): | |
| # Strip known boilerplate tags to get the "core" user content | |
| content = msg["content"] | |
| for open_tag, close_tag in _BOILERPLATE_PATTERNS.values(): | |
| # Remove all occurrences of this tag pair and their contents | |
| # (crude but sufficient for fingerprinting) | |
| pattern = re.escape(open_tag) + r".*?" + re.escape(close_tag) | |
| content = re.sub(pattern, "", content, flags=re.DOTALL) | |
| user_content = content.strip() | |
| break | |
| combined = f"{sys_content}\n{user_content}".strip() | |
| if not combined: | |
| # Fallback: use the full conversation hash (will change each turn, | |
| # but still isolates turns that are completely boilerplate) | |
| return _conv_hash(messages) | |
| return hashlib.sha256(combined.encode()).hexdigest() | |
| def deduplicate_user_message_segments( | |
| messages: List[dict], | |
| conv_id: str | |
| ) -> Tuple[List[dict], int]: | |
| """ | |
| For each user message, identify boilerplate segments and omit those | |
| that are identical to previously-seen segments in this conversation. | |
| Modifies messages in-place. Returns (messages, segments_removed). | |
| If a user message becomes empty after removals, it is replaced with | |
| a minimal placeholder "(no new content)". | |
| """ | |
| global _segment_fingerprints | |
| with _segment_fp_lock: | |
| if conv_id not in _segment_fingerprints: | |
| _segment_fingerprints[conv_id] = {} | |
| history = _segment_fingerprints[conv_id] | |
| segments_removed = 0 | |
| for msg in messages: | |
| if msg.get("role") != "user": | |
| continue | |
| content = msg.get("content", "") | |
| if not isinstance(content, str): | |
| continue | |
| new_content = content | |
| # Process each boilerplate pattern | |
| for open_tag, close_tag in _BOILERPLATE_PATTERNS.values(): | |
| # Find all non-overlapping occurrences | |
| idx = 0 | |
| while True: | |
| start = new_content.find(open_tag, idx) | |
| if start == -1: | |
| break | |
| end = new_content.find(close_tag, start + len(open_tag)) | |
| if end == -1: | |
| break | |
| end += len(close_tag) | |
| segment = new_content[start:end] | |
| seg_hash = hashlib.sha256(segment.encode()).hexdigest() | |
| if seg_hash in history: | |
| # Duplicate segment: remove it entirely | |
| new_content = new_content[:start] + new_content[end:] | |
| segments_removed += 1 | |
| # Continue scanning from the same start position | |
| idx = start | |
| else: | |
| # First time seeing this segment: store it | |
| history[seg_hash] = (len(history), segment) | |
| # Prune if too many entries | |
| if len(history) > MAX_FINGERPRINT_HISTORY: | |
| oldest = min(history.keys(), key=lambda k: history[k][0]) | |
| del history[oldest] | |
| idx = end | |
| # After processing, check if the message became empty | |
| new_content = new_content.strip() | |
| if new_content == "": | |
| new_content = "(no new content)" | |
| msg["content"] = new_content | |
| return messages, segments_removed | |
| # ============================================================================== | |
| # Conversation summarisation – with token-aware split & retries | |
| # ============================================================================== | |
| def _total_tokens(messages: List[dict]) -> int: | |
| """Estimate total token count for a list of messages.""" | |
| total = 0 | |
| for m in messages: | |
| content = m.get("content", "") | |
| if isinstance(content, str): | |
| total += estimate_tokens(content) | |
| for tc in m.get("tool_calls", []): | |
| total += estimate_tokens(json.dumps(tc.get("function", {}).get("arguments", ""))) | |
| return total | |
| def _summarise_messages_with_retry(summarise_payload: dict, api_key: str) -> str: | |
| """ | |
| Call DeepSeek summarisation with retries (exponential backoff). | |
| Raises RuntimeError if all attempts fail. | |
| """ | |
| headers = { | |
| "Authorization": f"Bearer {api_key}", | |
| "Content-Type": "application/json", | |
| } | |
| last_exc = None | |
| for attempt in range(1, SUMMARISE_MAX_RETRIES + 1): | |
| try: | |
| req = urllib.request.Request( | |
| f"{DEEPSEEK_BASE}/chat/completions", | |
| data=json.dumps(summarise_payload).encode(), | |
| headers=headers, | |
| method="POST", | |
| ) | |
| with urllib.request.urlopen(req, timeout=120) as resp: | |
| body = json.loads(resp.read().decode()) | |
| return body["choices"][0]["message"]["content"] | |
| except Exception as e: | |
| last_exc = e | |
| if attempt < SUMMARISE_MAX_RETRIES: | |
| sleep_time = SUMMARISE_RETRY_BASE_SLEEP * (2 ** (attempt - 1)) | |
| logging.warning("Summarisation attempt %d failed, retrying in %.1fs: %s", | |
| attempt, sleep_time, e) | |
| time.sleep(sleep_time) | |
| else: | |
| logging.error("Summarisation failed after %d attempts", SUMMARISE_MAX_RETRIES) | |
| raise RuntimeError(f"Summarisation API call failed: {last_exc}") | |
| def _merge_system_messages(messages: List[dict]) -> Tuple[Optional[str], List[dict]]: | |
| """Extract and merge all system messages into a single string. Returns the merged | |
| content (or None if none) and the remaining non‑system messages.""" | |
| systems = [m["content"] for m in messages if m["role"] == "system" and isinstance(m.get("content"), str)] | |
| others = [m for m in messages if m["role"] != "system"] | |
| merged = "\n".join(systems) if systems else None | |
| return merged, others | |
| def maybe_summarize(messages: List[dict], api_key: str, | |
| max_context: int = MAX_CONTEXT, | |
| ratio: float = SUMMARY_RATIO) -> Tuple[List[dict], bool]: | |
| """ | |
| If total estimated tokens > max_context * ratio, summarise oldest messages | |
| (except system) and replace them with a condensed summary message. | |
| Handles multiple system messages by merging them. | |
| Returns a new message list (does not modify original) and a boolean indicating | |
| whether summarisation was performed. | |
| Guarantees at most ONE system message. | |
| """ | |
| threshold = int(max_context * ratio) | |
| total = _total_tokens(messages) | |
| if total <= threshold: | |
| merged_sys, non_sys = _merge_system_messages(messages) | |
| if merged_sys is not None: | |
| return [{"role": "system", "content": merged_sys}] + non_sys, False | |
| return messages, False | |
| merged_sys, non_system = _merge_system_messages(messages) | |
| if not non_system: | |
| prefix = [{"role": "system", "content": merged_sys}] if merged_sys else [] | |
| return prefix, False | |
| # Token-aware split: accumulate messages from the start until the token | |
| # deficit is covered (or at least half the messages are covered, whichever | |
| # is less). The deficit is total - threshold. | |
| deficit = total - threshold | |
| accumulated = 0 | |
| idx = 0 | |
| for i, msg in enumerate(non_system): | |
| # Add token count of this message | |
| accumulated += estimate_tokens(msg.get("content", "")) | |
| for tc in msg.get("tool_calls", []): | |
| accumulated += estimate_tokens(json.dumps(tc.get("function", {}).get("arguments", ""))) | |
| if accumulated >= deficit and i >= len(non_system) // 2: | |
| idx = i + 1 # summarise up to and including this message | |
| break | |
| else: | |
| # Not enough tokens? Fallback to half the messages | |
| idx = max(1, len(non_system) // 2) | |
| # --- Critical fix: ensure we never split a tool message from its | |
| # preceding assistant message with tool_calls --- | |
| # Walk forward from idx until we are at a safe split point: | |
| # - Not in the middle of an assistant(tool_calls) → tool pair | |
| # - i.e., the message at idx must NOT be a "tool" role message | |
| # whose preceding assistant message had tool_calls | |
| while idx < len(non_system): | |
| msg = non_system[idx] | |
| if msg.get("role") == "tool": | |
| # This tool message belongs to a preceding assistant; move split past it | |
| idx += 1 | |
| continue | |
| if msg.get("role") == "assistant" and msg.get("tool_calls"): | |
| # Check if the *next* message is a tool response to this assistant | |
| if idx + 1 < len(non_system) and non_system[idx + 1].get("role") == "tool": | |
| # Move split past both the assistant and its tool response(s) | |
| idx += 1 | |
| while idx < len(non_system) and non_system[idx].get("role") == "tool": | |
| idx += 1 | |
| continue | |
| break | |
| to_summarise = non_system[:idx] | |
| to_keep = non_system[idx:] | |
| if not to_summarise: | |
| prefix = [{"role": "system", "content": merged_sys}] if merged_sys else [] | |
| return prefix + non_system, False | |
| summary_prompt = ( | |
| "Summarise the following conversation excerpt. " | |
| "Retain all critical facts, decisions, and code fragments. " | |
| "Be concise but complete.\n\n" | |
| ) | |
| summarise_text = "" | |
| for m in to_summarise: | |
| role = m["role"] | |
| content = m.get("content", "") | |
| if isinstance(content, str): | |
| summarise_text += f"[{role}]: {content}\n" | |
| payload = { | |
| "model": SUMMARY_MODEL, | |
| "messages": [ | |
| {"role": "user", "content": summary_prompt + summarise_text} | |
| ], | |
| "max_tokens": 1000, | |
| "temperature": 0.0, | |
| "thinking": {"type": "disabled"}, | |
| } | |
| try: | |
| summary = _summarise_messages_with_retry(payload, api_key) | |
| except RuntimeError as e: | |
| logging.warning(f"Summarisation failed, falling back to truncation: {e}") | |
| summary = "[Earlier conversation truncated due to length]" | |
| new_messages: List[dict] = [] | |
| if merged_sys: | |
| new_sys_content = merged_sys + "\n\n[Earlier conversation summary]\n" + summary | |
| new_messages.append({"role": "system", "content": new_sys_content}) | |
| else: | |
| new_messages.append({"role": "system", "content": f"[Earlier conversation summary]\n{summary}"}) | |
| new_messages.extend(to_keep) | |
| return new_messages, True | |
| # ============================================================================== | |
| # Reasoning injection helpers | |
| # ============================================================================== | |
| def cache_assistant_message(original_msgs: List[dict], assistant_msg: dict): | |
| """Cache assistant message (with reasoning) so it can be reused on subsequent turns.""" | |
| if not assistant_msg.get("tool_calls") and not assistant_msg.get("reasoning_content"): | |
| return | |
| prefix = [m.copy() for m in original_msgs] | |
| clean_asst = {k: v for k, v in assistant_msg.items() if k != "reasoning_content"} | |
| prefix.append(clean_asst) | |
| _assistant_cache.set(_conv_hash(prefix), assistant_msg) | |
| def inject_reasoning(messages: List[dict]): | |
| """Look up cached reasoning_content for tool‑call assistant messages and inject it.""" | |
| for i, msg in enumerate(messages): | |
| if msg.get("role") != "assistant": | |
| continue | |
| if not msg.get("tool_calls"): | |
| continue | |
| if "reasoning_content" in msg: | |
| continue | |
| prefix = messages[:i+1] | |
| cached = _assistant_cache.get(_conv_hash(prefix)) | |
| if cached and "reasoning_content" in cached: | |
| msg["reasoning_content"] = cached["reasoning_content"] | |
| def should_disable_thinking(messages: List[dict]) -> bool: | |
| """Return True if thinking should be disabled for the current conversation | |
| (i.e. there is a tool‑call assistant message WITHOUT reasoning, meaning the | |
| model doesn’t need to produce new reasoning).""" | |
| return any( | |
| m.get("role") == "assistant" and m.get("tool_calls") and "reasoning_content" not in m | |
| for m in messages | |
| ) | |
| # ============================================================================== | |
| # DeepSeek API helpers | |
| # ============================================================================== | |
| def _make_deepseek_request(payload: dict, stream: bool) -> urllib.request.Request: | |
| headers = { | |
| "Authorization": f"Bearer {DSEEK_KEY}", | |
| "Content-Type": "application/json", | |
| "Accept": "text/event-stream" if stream else "application/json", | |
| } | |
| return urllib.request.Request( | |
| f"{DEEPSEEK_BASE}/chat/completions", | |
| data=json.dumps(payload).encode(), | |
| headers=headers, | |
| method="POST", | |
| ) | |
| def deepseek_nonstream(payload: dict) -> dict: | |
| """Perform a non‑streaming request. Raises RuntimeError on HTTP errors.""" | |
| req = _make_deepseek_request(payload, stream=False) | |
| try: | |
| with urllib.request.urlopen(req, timeout=600) as resp: | |
| return json.loads(resp.read().decode()) | |
| except urllib.error.HTTPError as e: | |
| error_body = e.read().decode() if e.fp else "" | |
| raise RuntimeError(f"DeepSeek HTTP {e.code}: {error_body}") from e | |
| # ============================================================================== | |
| # Streaming buffer for accumulating tool calls | |
| # ============================================================================== | |
| class StreamBuffer: | |
| def __init__(self): | |
| self.reasoning = "" | |
| self.content = "" | |
| self.tool_calls: Dict[int, dict] = {} | |
| self.finish_reason: Optional[str] = None | |
| self.usage: Optional[dict] = None | |
| def process_chunk(self, chunk: dict) -> None: | |
| for choice in chunk.get("choices", []): | |
| delta = choice.get("delta", {}) | |
| if "reasoning_content" in delta: | |
| rc = delta["reasoning_content"] | |
| self.reasoning = "" if rc is None else self.reasoning + rc | |
| if "content" in delta: | |
| ct = delta["content"] | |
| self.content = "" if ct is None else self.content + ct | |
| for tc in delta.get("tool_calls", []): | |
| idx = tc.get("index") | |
| if idx is None: | |
| continue | |
| if idx not in self.tool_calls: | |
| self.tool_calls[idx] = { | |
| "id": tc.get("id", ""), | |
| "type": tc.get("type", "function"), | |
| "function": { | |
| "name": "", | |
| "arguments": "", | |
| }, | |
| } | |
| cur = self.tool_calls[idx] | |
| tid = tc.get("id") | |
| if tid is not None: | |
| cur["id"] = tid | |
| ttype = tc.get("type") | |
| if ttype is not None: | |
| cur["type"] = ttype | |
| func_raw = tc.get("function") | |
| func = func_raw if isinstance(func_raw, dict) else {} | |
| name = func.get("name") | |
| if name is not None: | |
| cur["function"]["name"] = name | |
| args = func.get("arguments") | |
| if args is not None: | |
| cur["function"]["arguments"] += args | |
| if "message" in choice: | |
| msg = choice["message"] | |
| rc_msg = msg.get("reasoning_content") | |
| ct_msg = msg.get("content") | |
| if rc_msg is not None: | |
| self.reasoning = rc_msg | |
| if ct_msg is not None: | |
| self.content = ct_msg | |
| finish = choice.get("finish_reason") | |
| if finish is not None: | |
| self.finish_reason = finish | |
| if "usage" in chunk: | |
| self.usage = chunk["usage"] | |
| def build_assistant_message(self) -> dict: | |
| msg: Dict[str, Any] = {"role": "assistant"} | |
| if self.reasoning: | |
| msg["reasoning_content"] = self.reasoning | |
| if self.content: | |
| msg["content"] = self.content | |
| if self.tool_calls: | |
| msg["tool_calls"] = [self.tool_calls[k] for k in sorted(self.tool_calls)] | |
| return msg | |
| # ============================================================================== | |
| # HTTP Request Handler | |
| # ============================================================================== | |
| class ProxyHandler(http.server.BaseHTTPRequestHandler): | |
| def log_message(self, format, *args): | |
| logging.info("%s - %s", self.client_address[0], format % args) | |
| def do_POST(self): | |
| if self.path != "/v1/chat/completions": | |
| self.send_error(404) | |
| return | |
| content_length = int(self.headers.get("Content-Length", 0)) | |
| if not content_length: | |
| self.send_error(400, "Empty body") | |
| return | |
| body = self.rfile.read(content_length) | |
| try: | |
| client_req = json.loads(body) | |
| except json.JSONDecodeError: | |
| self.send_error(400, "Invalid JSON") | |
| return | |
| messages = client_req.get("messages", []) | |
| # --- Early validation: messages must be a non-empty list --- | |
| if not isinstance(messages, list) or len(messages) == 0: | |
| self.send_error(400, "Empty or invalid 'messages' array") | |
| return | |
| stream = client_req.get("stream", False) | |
| # Deep copy before any in‑place mutations | |
| original_messages = copy.deepcopy(messages) | |
| # ---------- Pre-processing metrics ---------- | |
| original_msg_count = len(original_messages) | |
| original_tokens = _total_tokens(original_messages) | |
| msg_hash = _md5_of_messages(original_messages) | |
| # Save original message for developer introspection | |
| if SAVE_PREPOST_MSGS: | |
| _save_json_message(PRE_MSG_DIR, "pre", original_messages, msg_hash) | |
| # ---- Pipeline ---- | |
| # 1. Markdown block deduplication | |
| replaced_blocks = deduplicate_markdown_blocks(messages) | |
| # 2. Inter-message content fingerprinting & deduplication (Feature F-1) | |
| conv_id = _get_conversation_base_id(original_messages) | |
| messages, deduped_segments = deduplicate_user_message_segments(messages, conv_id) | |
| # 3. Conversation summarisation (token-aware split) | |
| messages, summarized = maybe_summarize( | |
| messages, | |
| api_key=DSEEK_KEY, | |
| max_context=MAX_CONTEXT, | |
| ratio=SUMMARY_RATIO, | |
| ) | |
| # 4. System prompt compression – auto-summarize & cache | |
| compressed_prompts_found = 0 | |
| for msg in messages: | |
| if msg["role"] == "system" and isinstance(msg.get("content"), str): | |
| sys_content = msg["content"] | |
| h = _stable_hash({"content": sys_content}) | |
| # Save original if not already saved | |
| if SAVE_PREPOST_SYSTEM: | |
| save_original_prompt(h, sys_content) | |
| # Check for cached summarized version | |
| summarized_sys = load_summarized_prompt(h) | |
| if summarized_sys: | |
| logging.debug("Using summarized system prompt for hash %s", h[:12]) | |
| msg["content"] = summarized_sys | |
| compressed_prompts_found += 1 | |
| else: | |
| # Generate summary via DeepSeek API | |
| try: | |
| logging.info("Generating summarized system prompt for hash %s", h[:12]) | |
| summarized_sys = summarize_system_prompt(sys_content, DSEEK_KEY) | |
| if SAVE_PREPOST_SYSTEM: | |
| save_summarized_prompt(h, summarized_sys) | |
| msg["content"] = summarized_sys | |
| compressed_prompts_found += 1 | |
| except RuntimeError as e: | |
| logging.warning("Failed to summarize system prompt, using original: %s", e) | |
| # 5. Reasoning injection | |
| inject_reasoning(messages) | |
| # 6. Build final payload | |
| payload = dict(client_req) | |
| payload["messages"] = messages | |
| payload["model"] = DEFAULT_MODEL | |
| payload["max_tokens"] = MAX_OUTPUT_TOKENS | |
| if THINKING_MODE == "enabled": | |
| payload["thinking"] = {"type": "enabled"} | |
| elif THINKING_MODE == "disabled": | |
| payload["thinking"] = {"type": "disabled"} | |
| else: | |
| payload["thinking"] = { | |
| "type": "disabled" if should_disable_thinking(messages) else "enabled" | |
| } | |
| payload["stream"] = stream | |
| # ---------- Post-processing metrics ---------- | |
| final_msg_count = len(messages) | |
| final_tokens = _total_tokens(messages) | |
| compression_pct = (1 - final_tokens / original_tokens) * 100 if original_tokens > 0 else 0.0 | |
| if SAVE_PREPOST_MSGS: | |
| _save_json_message(POST_MSG_DIR, "post", messages, msg_hash) | |
| logging.info( | |
| f"REQ {msg_hash} | msgs: {original_msg_count} → {final_msg_count} " | |
| f"| tokens: {original_tokens} → {final_tokens} ({compression_pct:+.1f}%) " | |
| f"| blocks dedup'd: {replaced_blocks} " | |
| f"| dedup'd segments: {deduped_segments} " | |
| f"| summarized: {'yes' if summarized else 'no'} " | |
| f"| compressed prompts: {compressed_prompts_found} " | |
| f"| stream: {stream} " | |
| f"| thinking: {payload['thinking']['type']}" | |
| ) | |
| # 7. Dispatch | |
| try: | |
| if stream: | |
| self._handle_stream(payload, original_messages) | |
| else: | |
| self._handle_nonstream(payload, original_messages) | |
| except RuntimeError as e: | |
| logging.error("Upstream error: %s", e) | |
| self.send_error(502, f"Upstream error: {e}") | |
| except Exception: | |
| logging.exception("Unexpected proxy error") | |
| try: | |
| self.send_error(500, "Internal proxy error") | |
| except Exception: | |
| pass | |
| def _handle_nonstream(self, payload, original_msgs): | |
| resp = deepseek_nonstream(payload) | |
| choices = resp.get("choices") | |
| if not choices or len(choices) == 0: | |
| logging.error("DeepSeek returned empty choices array") | |
| self.send_error(502, "Empty response from upstream") | |
| return | |
| choice = choices[0] | |
| assistant_msg = choice.get("message", {}).copy() | |
| cache_assistant_message(original_msgs, assistant_msg) | |
| if "reasoning_content" in assistant_msg: | |
| del assistant_msg["reasoning_content"] | |
| choice["message"] = assistant_msg | |
| body = json.dumps(resp).encode() | |
| try: | |
| self.send_response(200) | |
| self.send_header("Content-Type", "application/json") | |
| self.send_header("Content-Length", str(len(body))) | |
| self.end_headers() | |
| self.wfile.write(body) | |
| except (BrokenPipeError, ConnectionResetError, OSError) as e: | |
| logging.warning("Client disconnected while sending non‑stream response: %s", e) | |
| def _handle_stream(self, payload, original_msgs): | |
| req = _make_deepseek_request(payload, stream=True) | |
| try: | |
| with urllib.request.urlopen(req, timeout=600) as upstream: | |
| self.send_response(200) | |
| self.send_header("Content-Type", "text/event-stream") | |
| self.send_header("Cache-Control", "no-cache") | |
| self.end_headers() | |
| buffer = StreamBuffer() | |
| done = False | |
| try: | |
| for line in upstream: | |
| line_str = line.decode() if isinstance(line, bytes) else line | |
| if line_str.startswith("data:"): | |
| data_part = line_str[5:].strip() | |
| if data_part == "[DONE]": | |
| self.wfile.write(b"data: [DONE]\n\n") | |
| self.wfile.flush() | |
| done = True | |
| break | |
| try: | |
| chunk = json.loads(data_part) | |
| except json.JSONDecodeError: | |
| self.wfile.write(line_str.encode() + b"\n") | |
| self.wfile.flush() | |
| continue | |
| buffer.process_chunk(chunk) | |
| for choice in chunk.get("choices", []): | |
| if "delta" in choice: | |
| choice["delta"].pop("reasoning_content", None) | |
| if "message" in choice: | |
| choice["message"].pop("reasoning_content", None) | |
| self.wfile.write(f"data: {json.dumps(chunk)}\n\n".encode()) | |
| self.wfile.flush() | |
| else: | |
| self.wfile.write(line_str.encode() + b"\n") | |
| self.wfile.flush() | |
| except (BrokenPipeError, ConnectionResetError, OSError) as e: | |
| logging.warning("Client disconnected during stream: %s", e) | |
| done = True | |
| except Exception: | |
| logging.exception("Unexpected error while streaming response") | |
| done = True | |
| if not done: | |
| try: | |
| self.wfile.write(b"data: [DONE]\n\n") | |
| self.wfile.flush() | |
| except (BrokenPipeError, ConnectionResetError, OSError): | |
| pass | |
| assistant_msg = buffer.build_assistant_message() | |
| if assistant_msg.get("tool_calls") or assistant_msg.get("reasoning_content"): | |
| cache_assistant_message(original_msgs, assistant_msg) | |
| except (urllib.error.HTTPError, urllib.error.URLError) as e: | |
| logging.error("Upstream connection error: %s", e) | |
| error_detail = "" | |
| if isinstance(e, urllib.error.HTTPError): | |
| try: | |
| error_detail = e.read().decode() | |
| except Exception: | |
| pass | |
| self.send_error(502, f"Upstream error: {error_detail}") | |
| # ============================================================================== | |
| # Main – with fixed Ctrl+C handling (avoid deadlock) and dev directories | |
| # ============================================================================== | |
| def main(): | |
| global MAX_CONTEXT, SUMMARY_RATIO, MAX_OUTPUT_TOKENS, THINKING_MODE | |
| parser = argparse.ArgumentParser(description="DeepSeek V4 Flash OpenAI Proxy (globals forced)") | |
| parser.add_argument("--port", type=int, default=8080, help="Listening port") | |
| parser.add_argument("--host", default="0.0.0.0", help="Bind address") | |
| parser.add_argument("--max-context", type=int, default=128000, help="Max context tokens") | |
| parser.add_argument("--summarize-ratio", type=float, default=0.8, | |
| help="Trigger summarisation when tokens exceed ratio * max-context") | |
| parser.add_argument("--disable-compression", action="store_true", | |
| help="Do not auto-summarize system prompts (use originals as-is)") | |
| parser.add_argument("--max-output-tokens", type=int, default=128000, | |
| help="Force this many max tokens for generation (overrides client)") | |
| parser.add_argument("--thinking", choices=["enabled", "disabled", "auto"], default="auto", | |
| help="Force thinking mode (default: auto)") | |
| args = parser.parse_args() | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s" | |
| ) | |
| if not args.disable_compression and SAVE_PREPOST_SYSTEM: | |
| _ensure_sys_dirs() | |
| MAX_CONTEXT = args.max_context | |
| SUMMARY_RATIO = args.summarize_ratio | |
| MAX_OUTPUT_TOKENS = args.max_output_tokens | |
| THINKING_MODE = args.thinking | |
| # Ensure developer dump directories exist | |
| if SAVE_PREPOST_MSGS: | |
| PRE_MSG_DIR.mkdir(parents=True, exist_ok=True) | |
| POST_MSG_DIR.mkdir(parents=True, exist_ok=True) | |
| server = socketserver.ThreadingTCPServer((args.host, args.port), ProxyHandler) | |
| server.daemon_threads = True | |
| # ---- Graceful shutdown WITHOUT deadlocking the main thread ---- | |
| shutdown_lock = threading.Lock() | |
| shutting_down = False | |
| def _shutdown(signum, frame): | |
| nonlocal shutting_down | |
| with shutdown_lock: | |
| if shutting_down: | |
| return | |
| shutting_down = True | |
| logging.info("Received signal %s, shutting down.", signum) | |
| threading.Thread(target=server.shutdown, daemon=True).start() | |
| signal.signal(signal.SIGTERM, _shutdown) | |
| signal.signal(signal.SIGINT, _shutdown) | |
| logging.info(f"Proxy listening on {args.host}:{args.port}") | |
| logging.info(f"Forced model: {DEFAULT_MODEL}, max_tokens: {MAX_OUTPUT_TOKENS}, thinking: {THINKING_MODE}") | |
| logging.info(f"Pre/post message dumps: {PRE_MSG_DIR} / {POST_MSG_DIR}") | |
| try: | |
| server.serve_forever() | |
| except KeyboardInterrupt: | |
| pass | |
| finally: | |
| server.server_close() | |
| logging.info("Server stopped.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment