Skip to content

Instantly share code, notes, and snippets.

@Dev-iL
Created July 13, 2025 08:03
Show Gist options
  • Save Dev-iL/3a310fd6199e0579697413c7671f2c45 to your computer and use it in GitHub Desktop.
Save Dev-iL/3a310fd6199e0579697413c7671f2c45 to your computer and use it in GitHub Desktop.
Apache Mailing List Thread Fetcher and Reconstructor
#!/usr/bin/env python3
"""Apache Mailing List Thread Fetcher and Reconstructor.
This module fetches, reconstructs, and caches Apache mailing list threads in a tree structure
suitable for LLM summarization. It supports multiple data sources with a cascade fallback:
1. Local YAML cache
2. Local mbox files
3. Apache mailing list API
Requirements:
pip install requests ruamel.yaml mail-parser-reply
Dependencies:
- requests: For API calls
- ruamel.yaml: For YAML parsing with better formatting preservation
- mail_parser_reply: For cleaning email reply chains
Author: Dev-iL (with the help of Gemini and Claude)
Python: 3.10+
"""
from __future__ import annotations
import argparse
import logging
import mailbox
import re
import sys
from dataclasses import dataclass, field
from datetime import datetime
from email.utils import parsedate_to_datetime
from pathlib import Path
import requests
from mailparser_reply import EmailReplyParser
from ruamel.yaml import YAML
TYPE_CHECKING = False
if TYPE_CHECKING:
from typing import Any
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
# Configure YAML parser
def get_yaml_parser() -> YAML:
"""Create a configured YAML parser instance.
Returns:
Configured YAML parser with settings for readable output
"""
yaml = YAML()
yaml.default_flow_style = False
yaml.preserve_quotes = True
yaml.width = 4096 # Prevent line wrapping for long strings
yaml.indent(mapping=2, sequence=2, offset=0)
return yaml
# --- Data Structures ---
@dataclass
class EmailMessage:
"""Represents an email message with all relevant metadata."""
mid: str
message_id: str
in_reply_to: str
from_addr: str
subject: str
body: str
epoch: int
@classmethod
def from_dict(cls, data: dict[str, Any]) -> EmailMessage:
"""Create an EmailMessage from a dictionary."""
return cls(
mid=data.get("mid", ""),
message_id=data.get("message-id", "").strip(),
in_reply_to=data.get("in-reply-to", "").strip(),
from_addr=data.get("from", ""),
subject=data.get("subject", ""),
body=data.get("body", ""),
epoch=data.get("epoch", 0),
)
def to_dict(self) -> dict[str, Any]:
"""Convert EmailMessage to a dictionary."""
return {
"mid": self.mid,
"message-id": self.message_id,
"in-reply-to": self.in_reply_to,
"from": self.from_addr,
"subject": self.subject,
"body": self.body,
"epoch": self.epoch,
}
@dataclass
class EmailNode:
"""A node in the conversation tree representing an email message.
Attributes:
message: The email message data
parent: Reference to the parent node (if any)
children: List of child nodes (replies to this message)
"""
message: EmailMessage
parent: EmailNode | None = None
children: list[EmailNode] = field(default_factory=list)
@property
def message_id(self) -> str:
"""Get the message ID of this node."""
return self.message.message_id
def to_dict(self) -> dict[str, Any]:
"""Convert the node and its subtree to a dictionary for serialization.
Returns:
Dictionary representation of the node and its children
"""
return {
"message": self.message.to_dict(),
"children": [child.to_dict() for child in self.children],
}
@staticmethod
def from_dict(data: dict[str, Any]) -> EmailNode:
"""Reconstruct an EmailNode from a dictionary.
Args:
data: Dictionary containing message data and children
Returns:
Reconstructed EmailNode with its subtree
"""
message = EmailMessage.from_dict(data["message"])
node = EmailNode(message)
node.children = [
EmailNode.from_dict(child_data) for child_data in data.get("children", [])
]
# Re-establish parent references
for child in node.children:
child.parent = node
return node
def __repr__(self) -> str:
return f"<EmailNode: {self.message_id}>"
# --- Exceptions ---
class ThreadFetchError(Exception):
"""Raised when thread fetching fails."""
class MessageFetchError(Exception):
"""Raised when individual message fetching fails."""
# --- Data Loading & Fetching ---
def get_thread_metadata(thread_id: str) -> tuple[list[str], dict[str, str]]:
"""Fetch the thread structure to get all message IDs and their mapping.
Args:
thread_id: The Apache mailing list thread ID
Returns:
Tuple of (list of message IDs, mapping of mid to message-id)
Raises:
ThreadFetchError: If the thread structure cannot be fetched
"""
logger.info(f"Discovering all messages in thread {thread_id}...")
try:
url = f"https://lists.apache.org/api/thread.lua?id={thread_id}"
response = requests.get(url, timeout=30)
response.raise_for_status()
thread_data = response.json()
except requests.RequestException as e:
msg = f"Could not fetch thread structure: {e}"
raise ThreadFetchError(msg) from e
except ValueError as e:
msg = f"Invalid JSON response: {e}"
raise ThreadFetchError(msg) from e
all_mids: set[str] = set()
mid_to_message_id_map: dict[str, str] = {}
def _collect_metadata(node: dict[str, Any]) -> None:
"""Recursively collect message IDs from the thread structure."""
if "mid" in node and "message-id" in node:
all_mids.add(node["mid"])
mid_to_message_id_map[node["mid"]] = node["message-id"].strip()
for child in node.get("children", []):
_collect_metadata(child)
thread_root = thread_data.get("thread", {})
if not thread_root:
msg = "No thread data found in API response"
raise ThreadFetchError(msg)
_collect_metadata(thread_root)
logger.info(f"Found {len(all_mids)} messages in thread")
return list(all_mids), mid_to_message_id_map
def load_from_yaml(filepath: str | Path) -> tuple[list[dict[str, Any]], list[str]]:
"""Load processed messages and their IDs from the YAML storage file.
Args:
filepath: Path to the YAML file
Returns:
Tuple of (list of processed messages, list of processed message IDs)
"""
filepath = Path(filepath)
if not filepath.exists():
logger.info(f"No existing cache found at '{filepath}'")
return [], []
try:
yaml = get_yaml_parser()
with open(filepath, encoding="utf-8") as f:
data = yaml.load(f) or {}
except Exception as e:
logger.exception(f"Failed to parse YAML file: {e}")
return [], []
processed_messages = data.get("processed_messages", [])
processed_mids = data.get("processed_mids", [])
logger.info(f"Loaded {len(processed_mids)} processed messages from cache")
return processed_messages, processed_mids
def load_mbox_index(mbox_path: str | Path) -> dict[str, mailbox.Message]:
"""Parse an mbox file and index all messages by their Message-ID.
Args:
mbox_path: Path to the mbox file
Returns:
Dictionary mapping Message-ID to mailbox.Message objects
"""
if not mbox_path:
return {}
mbox_path = Path(mbox_path)
if not mbox_path.exists():
logger.warning(f"Mbox file not found: {mbox_path}")
return {}
logger.info(f"Indexing mbox file at '{mbox_path}'...")
try:
mbox = mailbox.mbox(str(mbox_path))
index = {
msg.get("Message-ID", "").strip(): msg
for msg in mbox
if msg.get("Message-ID")
}
logger.info(f"Indexed {len(index)} messages from mbox")
return index
except Exception as e:
logger.exception(f"Failed to read mbox file: {e}")
return {}
def extract_body_from_message(msg: mailbox.Message) -> str:
"""Extract the plain text body from a mailbox.Message object.
Args:
msg: The mailbox.Message object
Returns:
The extracted plain text body
"""
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
try:
charset = part.get_content_charset("utf-8")
payload = part.get_payload(decode=True)
if payload:
body = payload.decode(charset, "ignore")
break
except (AttributeError, UnicodeDecodeError, LookupError):
continue
else:
try:
charset = msg.get_content_charset("utf-8")
payload = msg.get_payload(decode=True)
if payload:
body = payload.decode(charset, "ignore")
except (AttributeError, UnicodeDecodeError, LookupError):
body = ""
return body
def convert_mbox_message_to_dict(mid: str, mbox_msg: mailbox.Message) -> dict[str, Any]:
"""Convert an mbox message object to our standard dictionary format.
Args:
mid: The message ID from the API
mbox_msg: The mailbox.Message object
Returns:
Dictionary representation of the message
"""
body = extract_body_from_message(mbox_msg)
# Parse the date
date_str = mbox_msg.get("Date", "")
try:
date_obj = parsedate_to_datetime(date_str)
epoch = int(date_obj.timestamp()) if date_obj else 0
except (ValueError, TypeError, AttributeError):
epoch = 0
return {
"mid": mid,
"message-id": mbox_msg.get("Message-ID", "").strip(),
"in-reply-to": mbox_msg.get("In-Reply-To", "").strip(),
"from": mbox_msg.get("From", ""),
"subject": mbox_msg.get("Subject", ""),
"body": body,
"epoch": epoch,
}
def fetch_message_from_api(mid: str) -> dict[str, Any] | None:
"""Fetch a single full message from the Apache mailing list API.
Args:
mid: The message ID to fetch
Returns:
Message data as a dictionary, or None if fetch fails
"""
try:
url = f"https://lists.apache.org/api/email.lua?id={mid}"
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logger.warning(f"API fetch failed for {mid}: {e}")
return None
except ValueError as e:
logger.warning(f"Invalid JSON response for {mid}: {e}")
return None
# --- Tree Construction & Display ---
def clean_email_body(body: str) -> str:
"""
Cleans an email body by removing quoted replies, signatures, and other noise.
This uses a multi-stage process for more effective cleaning.
"""
if not body:
return ""
# 1. Use mail-parser-reply as the first pass. This handles most standard cases.
parser = EmailReplyParser()
cleaned_body = parser.parse_reply(body)
# 2. Define regex patterns for artifacts mail-parser-reply might miss.
patterns = [
# Remove lines starting with ">", the most common quoting method.
# This is the most important addition to fix the observed issue.
re.compile(r"^\s*>[> ]*.*$", re.MULTILINE),
# Remove "On [date], [author] wrote:" and similar international headers
re.compile(r"^\s*On .*(wrote|écrit|gesendet|geschrieben|scrisse|escreveu|napisal|escribió):\s*$", re.MULTILINE | re.IGNORECASE),
# Remove Korean date/author lines found in the example data
re.compile(r"^\d{4}년\s\d{1,2}월\s\d{1,2}일\s.*님이 작성:$", re.MULTILINE),
# Remove standard email headers that are sometimes left in the body
re.compile(r"^\s*(From|Sent|To|Cc|Subject):.*$", re.MULTILINE | re.IGNORECASE),
# Remove horizontal lines/separators
re.compile(r"^\s*_{20,}\s*$", re.MULTILINE),
re.compile(r"^\s*-{20,}\s*$", re.MULTILINE),
# Remove common footers and disclaimers
re.compile(r"^-*To unsubscribe, e-mail:.*", re.MULTILINE | re.DOTALL),
re.compile(r"^\s*CAUTION:.*$", re.MULTILINE | re.IGNORECASE),
re.compile(r"^\s*AVERTISSEMENT:.*$", re.MULTILINE | re.IGNORECASE),
]
# 3. Sequentially apply the cleanup patterns.
for pattern in patterns:
cleaned_body = pattern.sub("", cleaned_body)
# 4. Final cleanup of excessive newlines to make the output compact.
cleaned_body = re.sub(r'\n{3,}', '\n\n', str(cleaned_body))
return cleaned_body.strip()
def build_conversation_tree(messages: list[dict[str, Any]]) -> list[EmailNode]:
"""Build a conversation tree from a list of messages.
Args:
messages: List of message dictionaries
Returns:
List of root nodes (messages without parents in the set)
"""
# Create nodes and build index
nodes_by_id: dict[str, EmailNode] = {}
all_nodes: list[EmailNode] = []
for msg_dict in messages:
msg = EmailMessage.from_dict(msg_dict)
node = EmailNode(msg)
all_nodes.append(node)
if node.message_id:
nodes_by_id[node.message_id] = node
# Build parent-child relationships
root_nodes: list[EmailNode] = []
for node in all_nodes:
parent_id = node.message.in_reply_to
if parent_id and parent_id in nodes_by_id:
parent_node = nodes_by_id[parent_id]
parent_node.children.append(node)
node.parent = parent_node
else:
# No parent found in the set - this is a root
root_nodes.append(node)
# Sort children by timestamp
for node in all_nodes:
node.children.sort(key=lambda n: n.message.epoch)
return root_nodes
def print_reconstructed_thread(node: EmailNode, level: int = 0) -> None:
"""Recursively print the reconstructed thread with proper indentation.
This version prints full headers for the root message and minimal headers
for replies to produce a cleaner, more readable conversation flow.
Args:
node: The current node to print
level: Current indentation level
"""
indent = " " * level
msg = node.message
cleaned_body = clean_email_body(msg.body)
if level == 0:
# For the root message, print full details.
timestamp = (
datetime.fromtimestamp(msg.epoch).strftime("%Y-%m-%d %H:%M:%S")
if msg.epoch
else "Unknown Date"
)
subject = " ".join(msg.subject.replace("\n", " ").split())
print(f"{indent}From: {msg.from_addr}")
print(f"{indent}Date: {timestamp}")
print(f"{indent}Subject: {subject}")
print(f"{indent}{'-' * (len(subject) + 9)}")
else:
# For replies, print a minimal header to show who is speaking.
print(f"{indent}--- Reply from: {msg.from_addr} ---")
if cleaned_body:
# Print the cleaned body with indentation.
for line in cleaned_body.split("\n"):
print(f"{indent}{line}")
else:
# Handle cases where the body is empty after cleaning.
print(f"{indent}[Body is empty or was entirely quoted content]")
print() # Add a newline for spacing between messages
# Recursively print children.
for child in node.children:
print_reconstructed_thread(child, level + 1)
def save_to_yaml(
filepath: str | Path,
root_nodes: list[EmailNode],
all_messages: list[dict[str, Any]],
) -> None:
"""Save the thread data to a YAML file.
Args:
filepath: Path to the output YAML file
root_nodes: List of root nodes in the conversation tree
all_messages: List of all message dictionaries
"""
filepath = Path(filepath)
processed_mids = [msg.get("mid") for msg in all_messages if msg.get("mid")]
# Handle single vs multiple roots
if len(root_nodes) == 1:
thread_tree = root_nodes[0].to_dict()
else:
thread_tree = [node.to_dict() for node in root_nodes]
data = {
"metadata": {
"last_updated": datetime.now().isoformat(),
"message_count": len(all_messages),
"root_count": len(root_nodes),
},
"processed_mids": processed_mids,
"processed_messages": all_messages,
"thread_tree": thread_tree,
}
try:
yaml = get_yaml_parser()
with open(filepath, "w", encoding="utf-8") as f:
yaml.dump(data, f)
logger.info(f"Saved {len(processed_mids)} messages to '{filepath}'")
except OSError as e:
logger.exception(f"Failed to save YAML file: {e}")
raise
# --- Main Orchestrator ---
def main(
thread_id: str,
mbox_path: str | Path | None = None,
yaml_path: str | Path | None = None,
) -> None:
"""Orchestrate the fetch, reconstruction, and storage of a thread.
This function implements a cascade data source strategy:
1. Load existing messages from YAML cache
2. Try to source new messages from local mbox file
3. Fall back to API for any remaining messages
Args:
thread_id: The Apache mailing list thread ID
mbox_path: Optional path to an mbox file
yaml_path: Optional path for the YAML output file
Raises:
ThreadFetchError: If the thread cannot be fetched
"""
# Default output path
if yaml_path is None:
yaml_path = Path(f"{thread_id}.yaml")
try:
# Step 1: Discover all messages in the thread
server_mids, mid_to_message_id_map = get_thread_metadata(thread_id)
if not server_mids:
logger.error("No messages found in thread")
return
# Step 2: Load data from local sources
logger.info("Loading local data...")
old_messages, processed_mids = load_from_yaml(yaml_path)
mbox_index = load_mbox_index(mbox_path) if mbox_path else {}
# Step 3: Consolidate messages using cascade logic
logger.info("Consolidating messages (Cache → Mbox → API)...")
all_messages = list(old_messages) # Start with cached messages
processed_mids_set = set(processed_mids)
# Determine which messages are new
new_mids = [mid for mid in server_mids if mid not in processed_mids_set]
logger.info(f"Need to fetch {len(new_mids)} new messages")
# Process new messages
api_fetch_count = 0
mbox_fetch_count = 0
for mid in new_mids:
message_id = mid_to_message_id_map.get(mid, "")
# Try mbox first
if message_id and message_id in mbox_index:
mbox_msg = mbox_index[message_id]
msg_data = convert_mbox_message_to_dict(mid, mbox_msg)
all_messages.append(msg_data)
mbox_fetch_count += 1
logger.debug(f"Sourced '{mid}' from mbox")
continue
# Fall back to API
logger.debug(f"Fetching '{mid}' from API...")
msg_data = fetch_message_from_api(mid)
if msg_data:
all_messages.append(msg_data)
api_fetch_count += 1
else:
logger.error(f"Failed to fetch message {mid}")
logger.info(f"Fetched {mbox_fetch_count} from mbox, {api_fetch_count} from API")
# Step 4: Build the conversation tree
logger.info("Reconstructing conversation tree...")
root_nodes = build_conversation_tree(all_messages)
if not root_nodes:
logger.error("Could not find any root messages in the conversation")
return
logger.info(f"Found {len(root_nodes)} root message(s)")
# Step 5: Save and display
save_to_yaml(yaml_path, root_nodes, all_messages)
print("\n" + "="*70)
print(" R E C O N S T R U C T E D C O N V E R S A T I O N")
print("="*70 + "\n")
for i, root in enumerate(root_nodes):
if len(root_nodes) > 1:
print(f"--- THREAD ROOT {i+1} ---\n")
print_reconstructed_thread(root)
except ThreadFetchError as e:
logger.exception(f"Failed to fetch thread: {e}")
raise
except KeyboardInterrupt:
logger.info("Operation cancelled by user")
raise
except Exception as e:
logger.exception(f"Unexpected error: {e}")
raise
def cli() -> None:
"""Command-line interface for the thread fetcher."""
parser = argparse.ArgumentParser(
description="Fetch, reconstruct, and cache Apache mailing list threads for LLM processing.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Basic usage, creates <thread_id>.yaml
%(prog)s 41b04mg0rolv0sj98jhogsztstxnqfg5
# Use a local mbox file as a primary source for new messages
%(prog)s 41b04mg0rolv0sj98jhogsztstxnqfg5 -m path/to/dev_airflow.mbox
# Specify a custom output/cache file
%(prog)s 41b04mg0rolv0sj98jhogsztstxnqfg5 -o thread_cache.yaml -v
""",
)
parser.add_argument(
"thread_id",
help="The Apache mailing list thread ID to process",
)
parser.add_argument(
"-m",
"--mbox",
dest="mbox_path",
type=Path,
help="Path to an mbox file to use as a data source",
)
parser.add_argument(
"-o",
"--output",
dest="yaml_path",
type=Path,
help="Path for the YAML output/cache file (default: <thread_id>.yaml)",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Enable verbose (DEBUG) logging",
)
args = parser.parse_args()
# Configure logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
try:
main(args.thread_id, args.mbox_path, args.yaml_path)
except ThreadFetchError:
sys.exit(1)
except KeyboardInterrupt:
sys.exit(130)
if __name__ == "__main__":
cli()
@Dev-iL
Copy link
Author

Dev-iL commented Jul 13, 2025

Prompt used for turning the output of the script into a markdown document:

Attached is a thread from the Apache mailing list discussing the topic of unit testing. Reorganize it according to the below and produce a markdown file: 
* Mention all participant as a comma-separated list of names with no emails or titles, near the top of the file.
* Explain the presented approaches along with their pros and cons. Note what the consensus or final conclusion was.
* Format code sections as code (python)
* Avoid "who said what" or "who supported what" - just focus on what was being said.
* Add an appendix containing additional background on the discussed topic explaining the problem being solved, known approaches, and providing some more context (and possibly glossary) of technical terms used in the discussion. If public libraries additional are used for context attempt to retrieve more information using the context7 MCP.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment