Created
July 13, 2025 08:03
-
-
Save Dev-iL/3a310fd6199e0579697413c7671f2c45 to your computer and use it in GitHub Desktop.
Apache Mailing List Thread Fetcher and Reconstructor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Apache Mailing List Thread Fetcher and Reconstructor. | |
This module fetches, reconstructs, and caches Apache mailing list threads in a tree structure | |
suitable for LLM summarization. It supports multiple data sources with a cascade fallback: | |
1. Local YAML cache | |
2. Local mbox files | |
3. Apache mailing list API | |
Requirements: | |
pip install requests ruamel.yaml mail-parser-reply | |
Dependencies: | |
- requests: For API calls | |
- ruamel.yaml: For YAML parsing with better formatting preservation | |
- mail_parser_reply: For cleaning email reply chains | |
Author: Dev-iL (with the help of Gemini and Claude) | |
Python: 3.10+ | |
""" | |
from __future__ import annotations | |
import argparse | |
import logging | |
import mailbox | |
import re | |
import sys | |
from dataclasses import dataclass, field | |
from datetime import datetime | |
from email.utils import parsedate_to_datetime | |
from pathlib import Path | |
import requests | |
from mailparser_reply import EmailReplyParser | |
from ruamel.yaml import YAML | |
TYPE_CHECKING = False | |
if TYPE_CHECKING: | |
from typing import Any | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
) | |
logger = logging.getLogger(__name__) | |
# Configure YAML parser | |
def get_yaml_parser() -> YAML: | |
"""Create a configured YAML parser instance. | |
Returns: | |
Configured YAML parser with settings for readable output | |
""" | |
yaml = YAML() | |
yaml.default_flow_style = False | |
yaml.preserve_quotes = True | |
yaml.width = 4096 # Prevent line wrapping for long strings | |
yaml.indent(mapping=2, sequence=2, offset=0) | |
return yaml | |
# --- Data Structures --- | |
@dataclass | |
class EmailMessage: | |
"""Represents an email message with all relevant metadata.""" | |
mid: str | |
message_id: str | |
in_reply_to: str | |
from_addr: str | |
subject: str | |
body: str | |
epoch: int | |
@classmethod | |
def from_dict(cls, data: dict[str, Any]) -> EmailMessage: | |
"""Create an EmailMessage from a dictionary.""" | |
return cls( | |
mid=data.get("mid", ""), | |
message_id=data.get("message-id", "").strip(), | |
in_reply_to=data.get("in-reply-to", "").strip(), | |
from_addr=data.get("from", ""), | |
subject=data.get("subject", ""), | |
body=data.get("body", ""), | |
epoch=data.get("epoch", 0), | |
) | |
def to_dict(self) -> dict[str, Any]: | |
"""Convert EmailMessage to a dictionary.""" | |
return { | |
"mid": self.mid, | |
"message-id": self.message_id, | |
"in-reply-to": self.in_reply_to, | |
"from": self.from_addr, | |
"subject": self.subject, | |
"body": self.body, | |
"epoch": self.epoch, | |
} | |
@dataclass | |
class EmailNode: | |
"""A node in the conversation tree representing an email message. | |
Attributes: | |
message: The email message data | |
parent: Reference to the parent node (if any) | |
children: List of child nodes (replies to this message) | |
""" | |
message: EmailMessage | |
parent: EmailNode | None = None | |
children: list[EmailNode] = field(default_factory=list) | |
@property | |
def message_id(self) -> str: | |
"""Get the message ID of this node.""" | |
return self.message.message_id | |
def to_dict(self) -> dict[str, Any]: | |
"""Convert the node and its subtree to a dictionary for serialization. | |
Returns: | |
Dictionary representation of the node and its children | |
""" | |
return { | |
"message": self.message.to_dict(), | |
"children": [child.to_dict() for child in self.children], | |
} | |
@staticmethod | |
def from_dict(data: dict[str, Any]) -> EmailNode: | |
"""Reconstruct an EmailNode from a dictionary. | |
Args: | |
data: Dictionary containing message data and children | |
Returns: | |
Reconstructed EmailNode with its subtree | |
""" | |
message = EmailMessage.from_dict(data["message"]) | |
node = EmailNode(message) | |
node.children = [ | |
EmailNode.from_dict(child_data) for child_data in data.get("children", []) | |
] | |
# Re-establish parent references | |
for child in node.children: | |
child.parent = node | |
return node | |
def __repr__(self) -> str: | |
return f"<EmailNode: {self.message_id}>" | |
# --- Exceptions --- | |
class ThreadFetchError(Exception): | |
"""Raised when thread fetching fails.""" | |
class MessageFetchError(Exception): | |
"""Raised when individual message fetching fails.""" | |
# --- Data Loading & Fetching --- | |
def get_thread_metadata(thread_id: str) -> tuple[list[str], dict[str, str]]: | |
"""Fetch the thread structure to get all message IDs and their mapping. | |
Args: | |
thread_id: The Apache mailing list thread ID | |
Returns: | |
Tuple of (list of message IDs, mapping of mid to message-id) | |
Raises: | |
ThreadFetchError: If the thread structure cannot be fetched | |
""" | |
logger.info(f"Discovering all messages in thread {thread_id}...") | |
try: | |
url = f"https://lists.apache.org/api/thread.lua?id={thread_id}" | |
response = requests.get(url, timeout=30) | |
response.raise_for_status() | |
thread_data = response.json() | |
except requests.RequestException as e: | |
msg = f"Could not fetch thread structure: {e}" | |
raise ThreadFetchError(msg) from e | |
except ValueError as e: | |
msg = f"Invalid JSON response: {e}" | |
raise ThreadFetchError(msg) from e | |
all_mids: set[str] = set() | |
mid_to_message_id_map: dict[str, str] = {} | |
def _collect_metadata(node: dict[str, Any]) -> None: | |
"""Recursively collect message IDs from the thread structure.""" | |
if "mid" in node and "message-id" in node: | |
all_mids.add(node["mid"]) | |
mid_to_message_id_map[node["mid"]] = node["message-id"].strip() | |
for child in node.get("children", []): | |
_collect_metadata(child) | |
thread_root = thread_data.get("thread", {}) | |
if not thread_root: | |
msg = "No thread data found in API response" | |
raise ThreadFetchError(msg) | |
_collect_metadata(thread_root) | |
logger.info(f"Found {len(all_mids)} messages in thread") | |
return list(all_mids), mid_to_message_id_map | |
def load_from_yaml(filepath: str | Path) -> tuple[list[dict[str, Any]], list[str]]: | |
"""Load processed messages and their IDs from the YAML storage file. | |
Args: | |
filepath: Path to the YAML file | |
Returns: | |
Tuple of (list of processed messages, list of processed message IDs) | |
""" | |
filepath = Path(filepath) | |
if not filepath.exists(): | |
logger.info(f"No existing cache found at '{filepath}'") | |
return [], [] | |
try: | |
yaml = get_yaml_parser() | |
with open(filepath, encoding="utf-8") as f: | |
data = yaml.load(f) or {} | |
except Exception as e: | |
logger.exception(f"Failed to parse YAML file: {e}") | |
return [], [] | |
processed_messages = data.get("processed_messages", []) | |
processed_mids = data.get("processed_mids", []) | |
logger.info(f"Loaded {len(processed_mids)} processed messages from cache") | |
return processed_messages, processed_mids | |
def load_mbox_index(mbox_path: str | Path) -> dict[str, mailbox.Message]: | |
"""Parse an mbox file and index all messages by their Message-ID. | |
Args: | |
mbox_path: Path to the mbox file | |
Returns: | |
Dictionary mapping Message-ID to mailbox.Message objects | |
""" | |
if not mbox_path: | |
return {} | |
mbox_path = Path(mbox_path) | |
if not mbox_path.exists(): | |
logger.warning(f"Mbox file not found: {mbox_path}") | |
return {} | |
logger.info(f"Indexing mbox file at '{mbox_path}'...") | |
try: | |
mbox = mailbox.mbox(str(mbox_path)) | |
index = { | |
msg.get("Message-ID", "").strip(): msg | |
for msg in mbox | |
if msg.get("Message-ID") | |
} | |
logger.info(f"Indexed {len(index)} messages from mbox") | |
return index | |
except Exception as e: | |
logger.exception(f"Failed to read mbox file: {e}") | |
return {} | |
def extract_body_from_message(msg: mailbox.Message) -> str: | |
"""Extract the plain text body from a mailbox.Message object. | |
Args: | |
msg: The mailbox.Message object | |
Returns: | |
The extracted plain text body | |
""" | |
body = "" | |
if msg.is_multipart(): | |
for part in msg.walk(): | |
if part.get_content_type() == "text/plain": | |
try: | |
charset = part.get_content_charset("utf-8") | |
payload = part.get_payload(decode=True) | |
if payload: | |
body = payload.decode(charset, "ignore") | |
break | |
except (AttributeError, UnicodeDecodeError, LookupError): | |
continue | |
else: | |
try: | |
charset = msg.get_content_charset("utf-8") | |
payload = msg.get_payload(decode=True) | |
if payload: | |
body = payload.decode(charset, "ignore") | |
except (AttributeError, UnicodeDecodeError, LookupError): | |
body = "" | |
return body | |
def convert_mbox_message_to_dict(mid: str, mbox_msg: mailbox.Message) -> dict[str, Any]: | |
"""Convert an mbox message object to our standard dictionary format. | |
Args: | |
mid: The message ID from the API | |
mbox_msg: The mailbox.Message object | |
Returns: | |
Dictionary representation of the message | |
""" | |
body = extract_body_from_message(mbox_msg) | |
# Parse the date | |
date_str = mbox_msg.get("Date", "") | |
try: | |
date_obj = parsedate_to_datetime(date_str) | |
epoch = int(date_obj.timestamp()) if date_obj else 0 | |
except (ValueError, TypeError, AttributeError): | |
epoch = 0 | |
return { | |
"mid": mid, | |
"message-id": mbox_msg.get("Message-ID", "").strip(), | |
"in-reply-to": mbox_msg.get("In-Reply-To", "").strip(), | |
"from": mbox_msg.get("From", ""), | |
"subject": mbox_msg.get("Subject", ""), | |
"body": body, | |
"epoch": epoch, | |
} | |
def fetch_message_from_api(mid: str) -> dict[str, Any] | None: | |
"""Fetch a single full message from the Apache mailing list API. | |
Args: | |
mid: The message ID to fetch | |
Returns: | |
Message data as a dictionary, or None if fetch fails | |
""" | |
try: | |
url = f"https://lists.apache.org/api/email.lua?id={mid}" | |
response = requests.get(url, timeout=30) | |
response.raise_for_status() | |
return response.json() | |
except requests.RequestException as e: | |
logger.warning(f"API fetch failed for {mid}: {e}") | |
return None | |
except ValueError as e: | |
logger.warning(f"Invalid JSON response for {mid}: {e}") | |
return None | |
# --- Tree Construction & Display --- | |
def clean_email_body(body: str) -> str: | |
""" | |
Cleans an email body by removing quoted replies, signatures, and other noise. | |
This uses a multi-stage process for more effective cleaning. | |
""" | |
if not body: | |
return "" | |
# 1. Use mail-parser-reply as the first pass. This handles most standard cases. | |
parser = EmailReplyParser() | |
cleaned_body = parser.parse_reply(body) | |
# 2. Define regex patterns for artifacts mail-parser-reply might miss. | |
patterns = [ | |
# Remove lines starting with ">", the most common quoting method. | |
# This is the most important addition to fix the observed issue. | |
re.compile(r"^\s*>[> ]*.*$", re.MULTILINE), | |
# Remove "On [date], [author] wrote:" and similar international headers | |
re.compile(r"^\s*On .*(wrote|écrit|gesendet|geschrieben|scrisse|escreveu|napisal|escribió):\s*$", re.MULTILINE | re.IGNORECASE), | |
# Remove Korean date/author lines found in the example data | |
re.compile(r"^\d{4}년\s\d{1,2}월\s\d{1,2}일\s.*님이 작성:$", re.MULTILINE), | |
# Remove standard email headers that are sometimes left in the body | |
re.compile(r"^\s*(From|Sent|To|Cc|Subject):.*$", re.MULTILINE | re.IGNORECASE), | |
# Remove horizontal lines/separators | |
re.compile(r"^\s*_{20,}\s*$", re.MULTILINE), | |
re.compile(r"^\s*-{20,}\s*$", re.MULTILINE), | |
# Remove common footers and disclaimers | |
re.compile(r"^-*To unsubscribe, e-mail:.*", re.MULTILINE | re.DOTALL), | |
re.compile(r"^\s*CAUTION:.*$", re.MULTILINE | re.IGNORECASE), | |
re.compile(r"^\s*AVERTISSEMENT:.*$", re.MULTILINE | re.IGNORECASE), | |
] | |
# 3. Sequentially apply the cleanup patterns. | |
for pattern in patterns: | |
cleaned_body = pattern.sub("", cleaned_body) | |
# 4. Final cleanup of excessive newlines to make the output compact. | |
cleaned_body = re.sub(r'\n{3,}', '\n\n', str(cleaned_body)) | |
return cleaned_body.strip() | |
def build_conversation_tree(messages: list[dict[str, Any]]) -> list[EmailNode]: | |
"""Build a conversation tree from a list of messages. | |
Args: | |
messages: List of message dictionaries | |
Returns: | |
List of root nodes (messages without parents in the set) | |
""" | |
# Create nodes and build index | |
nodes_by_id: dict[str, EmailNode] = {} | |
all_nodes: list[EmailNode] = [] | |
for msg_dict in messages: | |
msg = EmailMessage.from_dict(msg_dict) | |
node = EmailNode(msg) | |
all_nodes.append(node) | |
if node.message_id: | |
nodes_by_id[node.message_id] = node | |
# Build parent-child relationships | |
root_nodes: list[EmailNode] = [] | |
for node in all_nodes: | |
parent_id = node.message.in_reply_to | |
if parent_id and parent_id in nodes_by_id: | |
parent_node = nodes_by_id[parent_id] | |
parent_node.children.append(node) | |
node.parent = parent_node | |
else: | |
# No parent found in the set - this is a root | |
root_nodes.append(node) | |
# Sort children by timestamp | |
for node in all_nodes: | |
node.children.sort(key=lambda n: n.message.epoch) | |
return root_nodes | |
def print_reconstructed_thread(node: EmailNode, level: int = 0) -> None: | |
"""Recursively print the reconstructed thread with proper indentation. | |
This version prints full headers for the root message and minimal headers | |
for replies to produce a cleaner, more readable conversation flow. | |
Args: | |
node: The current node to print | |
level: Current indentation level | |
""" | |
indent = " " * level | |
msg = node.message | |
cleaned_body = clean_email_body(msg.body) | |
if level == 0: | |
# For the root message, print full details. | |
timestamp = ( | |
datetime.fromtimestamp(msg.epoch).strftime("%Y-%m-%d %H:%M:%S") | |
if msg.epoch | |
else "Unknown Date" | |
) | |
subject = " ".join(msg.subject.replace("\n", " ").split()) | |
print(f"{indent}From: {msg.from_addr}") | |
print(f"{indent}Date: {timestamp}") | |
print(f"{indent}Subject: {subject}") | |
print(f"{indent}{'-' * (len(subject) + 9)}") | |
else: | |
# For replies, print a minimal header to show who is speaking. | |
print(f"{indent}--- Reply from: {msg.from_addr} ---") | |
if cleaned_body: | |
# Print the cleaned body with indentation. | |
for line in cleaned_body.split("\n"): | |
print(f"{indent}{line}") | |
else: | |
# Handle cases where the body is empty after cleaning. | |
print(f"{indent}[Body is empty or was entirely quoted content]") | |
print() # Add a newline for spacing between messages | |
# Recursively print children. | |
for child in node.children: | |
print_reconstructed_thread(child, level + 1) | |
def save_to_yaml( | |
filepath: str | Path, | |
root_nodes: list[EmailNode], | |
all_messages: list[dict[str, Any]], | |
) -> None: | |
"""Save the thread data to a YAML file. | |
Args: | |
filepath: Path to the output YAML file | |
root_nodes: List of root nodes in the conversation tree | |
all_messages: List of all message dictionaries | |
""" | |
filepath = Path(filepath) | |
processed_mids = [msg.get("mid") for msg in all_messages if msg.get("mid")] | |
# Handle single vs multiple roots | |
if len(root_nodes) == 1: | |
thread_tree = root_nodes[0].to_dict() | |
else: | |
thread_tree = [node.to_dict() for node in root_nodes] | |
data = { | |
"metadata": { | |
"last_updated": datetime.now().isoformat(), | |
"message_count": len(all_messages), | |
"root_count": len(root_nodes), | |
}, | |
"processed_mids": processed_mids, | |
"processed_messages": all_messages, | |
"thread_tree": thread_tree, | |
} | |
try: | |
yaml = get_yaml_parser() | |
with open(filepath, "w", encoding="utf-8") as f: | |
yaml.dump(data, f) | |
logger.info(f"Saved {len(processed_mids)} messages to '{filepath}'") | |
except OSError as e: | |
logger.exception(f"Failed to save YAML file: {e}") | |
raise | |
# --- Main Orchestrator --- | |
def main( | |
thread_id: str, | |
mbox_path: str | Path | None = None, | |
yaml_path: str | Path | None = None, | |
) -> None: | |
"""Orchestrate the fetch, reconstruction, and storage of a thread. | |
This function implements a cascade data source strategy: | |
1. Load existing messages from YAML cache | |
2. Try to source new messages from local mbox file | |
3. Fall back to API for any remaining messages | |
Args: | |
thread_id: The Apache mailing list thread ID | |
mbox_path: Optional path to an mbox file | |
yaml_path: Optional path for the YAML output file | |
Raises: | |
ThreadFetchError: If the thread cannot be fetched | |
""" | |
# Default output path | |
if yaml_path is None: | |
yaml_path = Path(f"{thread_id}.yaml") | |
try: | |
# Step 1: Discover all messages in the thread | |
server_mids, mid_to_message_id_map = get_thread_metadata(thread_id) | |
if not server_mids: | |
logger.error("No messages found in thread") | |
return | |
# Step 2: Load data from local sources | |
logger.info("Loading local data...") | |
old_messages, processed_mids = load_from_yaml(yaml_path) | |
mbox_index = load_mbox_index(mbox_path) if mbox_path else {} | |
# Step 3: Consolidate messages using cascade logic | |
logger.info("Consolidating messages (Cache → Mbox → API)...") | |
all_messages = list(old_messages) # Start with cached messages | |
processed_mids_set = set(processed_mids) | |
# Determine which messages are new | |
new_mids = [mid for mid in server_mids if mid not in processed_mids_set] | |
logger.info(f"Need to fetch {len(new_mids)} new messages") | |
# Process new messages | |
api_fetch_count = 0 | |
mbox_fetch_count = 0 | |
for mid in new_mids: | |
message_id = mid_to_message_id_map.get(mid, "") | |
# Try mbox first | |
if message_id and message_id in mbox_index: | |
mbox_msg = mbox_index[message_id] | |
msg_data = convert_mbox_message_to_dict(mid, mbox_msg) | |
all_messages.append(msg_data) | |
mbox_fetch_count += 1 | |
logger.debug(f"Sourced '{mid}' from mbox") | |
continue | |
# Fall back to API | |
logger.debug(f"Fetching '{mid}' from API...") | |
msg_data = fetch_message_from_api(mid) | |
if msg_data: | |
all_messages.append(msg_data) | |
api_fetch_count += 1 | |
else: | |
logger.error(f"Failed to fetch message {mid}") | |
logger.info(f"Fetched {mbox_fetch_count} from mbox, {api_fetch_count} from API") | |
# Step 4: Build the conversation tree | |
logger.info("Reconstructing conversation tree...") | |
root_nodes = build_conversation_tree(all_messages) | |
if not root_nodes: | |
logger.error("Could not find any root messages in the conversation") | |
return | |
logger.info(f"Found {len(root_nodes)} root message(s)") | |
# Step 5: Save and display | |
save_to_yaml(yaml_path, root_nodes, all_messages) | |
print("\n" + "="*70) | |
print(" R E C O N S T R U C T E D C O N V E R S A T I O N") | |
print("="*70 + "\n") | |
for i, root in enumerate(root_nodes): | |
if len(root_nodes) > 1: | |
print(f"--- THREAD ROOT {i+1} ---\n") | |
print_reconstructed_thread(root) | |
except ThreadFetchError as e: | |
logger.exception(f"Failed to fetch thread: {e}") | |
raise | |
except KeyboardInterrupt: | |
logger.info("Operation cancelled by user") | |
raise | |
except Exception as e: | |
logger.exception(f"Unexpected error: {e}") | |
raise | |
def cli() -> None: | |
"""Command-line interface for the thread fetcher.""" | |
parser = argparse.ArgumentParser( | |
description="Fetch, reconstruct, and cache Apache mailing list threads for LLM processing.", | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
epilog=""" | |
Examples: | |
# Basic usage, creates <thread_id>.yaml | |
%(prog)s 41b04mg0rolv0sj98jhogsztstxnqfg5 | |
# Use a local mbox file as a primary source for new messages | |
%(prog)s 41b04mg0rolv0sj98jhogsztstxnqfg5 -m path/to/dev_airflow.mbox | |
# Specify a custom output/cache file | |
%(prog)s 41b04mg0rolv0sj98jhogsztstxnqfg5 -o thread_cache.yaml -v | |
""", | |
) | |
parser.add_argument( | |
"thread_id", | |
help="The Apache mailing list thread ID to process", | |
) | |
parser.add_argument( | |
"-m", | |
"--mbox", | |
dest="mbox_path", | |
type=Path, | |
help="Path to an mbox file to use as a data source", | |
) | |
parser.add_argument( | |
"-o", | |
"--output", | |
dest="yaml_path", | |
type=Path, | |
help="Path for the YAML output/cache file (default: <thread_id>.yaml)", | |
) | |
parser.add_argument( | |
"-v", | |
"--verbose", | |
action="store_true", | |
help="Enable verbose (DEBUG) logging", | |
) | |
args = parser.parse_args() | |
# Configure logging level | |
if args.verbose: | |
logging.getLogger().setLevel(logging.DEBUG) | |
try: | |
main(args.thread_id, args.mbox_path, args.yaml_path) | |
except ThreadFetchError: | |
sys.exit(1) | |
except KeyboardInterrupt: | |
sys.exit(130) | |
if __name__ == "__main__": | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Prompt used for turning the output of the script into a markdown document: