Skip to content

Instantly share code, notes, and snippets.

@fullstackwebdev
Last active February 20, 2025 05:54
Show Gist options
  • Save fullstackwebdev/fa4934fb4669cfc3e8c6ced950ea7a22 to your computer and use it in GitHub Desktop.
Save fullstackwebdev/fa4934fb4669cfc3e8c6ced950ea7a22 to your computer and use it in GitHub Desktop.
lytang/LLM-AggreFact
model: Qwen/Qwen2.5-VL-7B-Instruct
two_stage_optimized (openai chat complete Judge Compute direct, no reasoning, zero shot): 60%-70%
labeled fewshots: 81.25%
bootstrap random: 84.50%
mipro2: 88.50%
copro: still running
import httpx
import asyncio
import pandas as pd
from datasets import load_dataset
from typing import List, Dict, Any, Optional, Tuple
from collections import Counter
import json
from tqdm.asyncio import tqdm_asyncio
# Configuration
API_BASE_URL = "http://localhost:6002/v1"
MAX_SAMPLES = 500 # Set to None for full dataset
REPEATS = 3
MAX_WORKERS = 40 # Concurrent API calls
MODEL = "hello"
# Prompts
JUDGE_PROMPT = """
Determine whether the provided claim is consistent with the corresponding document. Consistency in this context implies that
all information presented in the claim is substantiated by the document. If not, it should be considered inconsistent.
Document: {doc}
Claim: {claim}
Please assess the claim's consistency with the document by responding with either "yes" or "no".
Answer:
"""
VERIFIER_PROMPT = """
Check if the given answer correctly reflects whether the claim is consistent with the corresponding document. Consistency in this context implies that
all information presented in the claim is substantiated by the document. If not, it should be considered inconsistent. "yes" means the claim is consistent with the document, and "no" means the claim is not consistent with the document.
Document: {doc}
Claim: {claim}
Answer: {choice}
Answer Justification: {explanation}
If you think the answer is correct, return the answer as is. If it's incorrect, return the opposite (if answer = "yes", return "no", and if answer = "no", return "yes").
"""
# Modified Judge prompt using falsification logic
FALSIFIER_PROMPT = """
Examine the claim against the document and try to find any potential contradictions or unsupported statements.
Your task is to determine if the claim can be falsified based on the document.
Document: {doc}
Claim: {claim}
Step 1: List any specific statements in the claim.
Step 2: For each statement, determine if it's fully supported by the document, partially supported, or contradicted.
Step 3: If any statement is contradicted or not supported by the document, the claim is inconsistent.
Is the claim falsifiable based on the document? Respond with "falsified" if you found any contradiction
or unsupported statement, or "unfalsified" if all statements are fully supported.
Answer:
"""
# Modified prompts to extract confidence scores
SCORE_JUDGE_PROMPT = """
Determine whether the provided claim is consistent with the corresponding document. Consistency implies that
all information presented in the claim is substantiated by the document.
Document: {doc}
Claim: {claim}
First, rate your confidence in your assessment on a scale of 1-5, where:
1 = Very uncertain
2 = Somewhat uncertain
3 = Moderately confident
4 = Confident
5 = Very confident
Then provide your final assessment as either "yes" (consistent) or "no" (inconsistent).
Format your response as:
Confidence: [1-5]
Answer: [yes/no]
Explanation: [your reasoning]
"""
# Modified pooling logic using confidence scores
def weighted_pool(results: List[Dict[str, Any]]) -> Tuple[str, float]:
yes_confidence = 0
no_confidence = 0
for result in results:
if 'confidence' in result:
if result['verified_choice'] == 'yes':
yes_confidence += result['confidence']
else:
no_confidence += result['confidence']
else:
# Fallback to unweighted counting if confidence is not available
if result['verified_choice'] == 'yes':
yes_confidence += 1
else:
no_confidence += 1
final_choice = 'yes' if yes_confidence > no_confidence else 'no'
total_confidence = yes_confidence + no_confidence
confidence_score = max(yes_confidence, no_confidence) / total_confidence if total_confidence > 0 else 0.5
return final_choice, confidence_score
# API Client
class OpenAIClient:
def __init__(self, base_url: str, max_retries: int = 3, timeout: int = 30):
self.base_url = base_url
self.max_retries = max_retries
self.timeout = timeout
self.client = httpx.AsyncClient(timeout=timeout, base_url=base_url)
async def chat_completion(self,
messages: List[Dict[str, str]],
model: str = MODEL,
temperature: float = 0.7) -> Dict[str, Any]:
for attempt in range(self.max_retries):
try:
response = await self.client.post(
"/chat/completions",
json={
"model": model,
"messages": messages,
"temperature": temperature
}
)
response.raise_for_status()
return response.json()
except (httpx.HTTPError, httpx.TimeoutException) as e:
if attempt == self.max_retries - 1:
raise e
await asyncio.sleep(2 ** attempt) # Exponential backoff
async def close(self):
await self.client.aclose()
# Pipeline Components
class Judge:
def __init__(self, client: OpenAIClient, temperature: float = 0.7):
self.client = client
self.temperature = temperature
async def evaluate(self, doc: str, claim: str) -> Tuple[str, str]:
prompt = JUDGE_PROMPT.format(doc=doc, claim=claim)
response = await self.client.chat_completion(
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
],
temperature=self.temperature
)
content = response["choices"][0]["message"]["content"]
# Parse for yes/no and explanation
parts = content.lower().split("\n", 1)
choice = "yes" if "yes" in parts[0] else "no"
explanation = parts[1] if len(parts) > 1 else ""
return choice, explanation
class ConfidenceJudge:
def __init__(self, client: OpenAIClient, temperature: float = 0.7):
self.client = client
self.temperature = temperature
async def evaluate(self, doc: str, claim: str) -> Tuple[str, str, int]:
prompt = SCORE_JUDGE_PROMPT.format(doc=doc, claim=claim)
response = await self.client.chat_completion(
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
],
temperature=self.temperature
)
content = response["choices"][0]["message"]["content"]
# Parse confidence, choice and explanation
confidence = 3 # Default moderate confidence
choice = "no" # Default conservative answer
explanation = ""
lines = content.strip().split('\n')
for line in lines:
line = line.strip().lower()
if line.startswith('confidence:'):
try:
confidence_str = line.split(':', 1)[1].strip()
confidence = int(confidence_str[0]) # Extract first digit
confidence = max(1, min(5, confidence)) # Ensure 1-5 range
except (ValueError, IndexError):
pass
elif line.startswith('answer:'):
answer_part = line.split(':', 1)[1].strip()
choice = "yes" if "yes" in answer_part else "no"
elif line.startswith('explanation:'):
explanation = line.split(':', 1)[1].strip()
return choice, explanation, confidence
class Falsifier:
def __init__(self, client: OpenAIClient, temperature: float = 0.5):
self.client = client
self.temperature = temperature
async def evaluate(self, doc: str, claim: str) -> Tuple[str, str]:
prompt = FALSIFIER_PROMPT.format(doc=doc, claim=claim)
response = await self.client.chat_completion(
messages=[
{"role": "system", "content": "You are a critical evaluator focused on finding inconsistencies."},
{"role": "user", "content": prompt}
],
temperature=self.temperature
)
content = response["choices"][0]["message"]["content"]
parts = content.lower().split("\n", 1)
# Convert falsification result to yes/no format
falsified = "falsified" in parts[0]
choice = "no" if falsified else "yes"
explanation = parts[1] if len(parts) > 1 else ""
return choice, explanation
class Verifier:
def __init__(self, client: OpenAIClient, temperature: float = 0.0):
self.client = client
self.temperature = temperature
async def verify(self, doc: str, claim: str, choice: str, explanation: str) -> str:
prompt = VERIFIER_PROMPT.format(
doc=doc,
claim=claim,
choice=choice,
explanation=explanation
)
response = await self.client.chat_completion(
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
],
temperature=self.temperature
)
content = response["choices"][0]["message"]["content"].lower()
return "yes" if "yes" in content else "no"
class TwoStagePipeline:
def __init__(self, client: OpenAIClient, repeats: int = 3):
self.client = client
self.judge = Judge(client)
self.verifier = Verifier(client)
self.repeats = repeats
async def process_single(self, doc: str, claim: str) -> List[Dict[str, Any]]:
results = []
for _ in range(self.repeats):
choice, explanation = await self.judge.evaluate(doc, claim)
verified_choice = await self.verifier.verify(doc, claim, choice, explanation)
results.append({
"judge_choice": choice,
"judge_explanation": explanation,
"verified_choice": verified_choice
})
return results
def max_pool(self, results: List[Dict[str, Any]]) -> str:
# Get the most common verified choice
choices = [r["verified_choice"] for r in results]
return Counter(choices).most_common(1)[0][0]
class EnhancedPipeline:
def __init__(self, client: OpenAIClient, repeats: int = 3):
self.client = client
self.judge = Judge(client)
self.falsifier = Falsifier(client)
self.verifier = Verifier(client)
self.repeats = repeats
async def process_single(self, doc: str, claim: str) -> List[Dict[str, Any]]:
results = []
for _ in range(self.repeats):
# Run both standard evaluation and falsification in parallel
judge_task = asyncio.create_task(self.judge.evaluate(doc, claim))
falsifier_task = asyncio.create_task(self.falsifier.evaluate(doc, claim))
judge_choice, judge_explanation = await judge_task
falsifier_choice, falsifier_explanation = await falsifier_task
# If judge and falsifier disagree, use verifier to resolve
if judge_choice != falsifier_choice:
verified_choice = await self.verifier.verify(
doc, claim, judge_choice,
f"Judge: {judge_explanation}\nFalsifier: {falsifier_explanation}"
)
else:
# If they agree, use their consensus
verified_choice = judge_choice
results.append({
"judge_choice": judge_choice,
"falsifier_choice": falsifier_choice,
"judge_explanation": judge_explanation,
"falsifier_explanation": falsifier_explanation,
"verified_choice": verified_choice
})
return results
def max_pool(self, results: List[Dict[str, Any]]) -> str:
# Get the most common verified choice
choices = [r["verified_choice"] for r in results]
return Counter(choices).most_common(1)[0][0]
class ConfidencePipeline:
def __init__(self, client: OpenAIClient, repeats: int = 3):
self.client = client
self.judge = ConfidenceJudge(client)
self.verifier = Verifier(client, temperature=0.0)
self.repeats = repeats
async def process_single(self, doc: str, claim: str) -> List[Dict[str, Any]]:
results = []
for _ in range(self.repeats):
choice, explanation, confidence = await self.judge.evaluate(doc, claim)
verified_choice = await self.verifier.verify(doc, claim, choice, explanation)
# Apply confidence weighting - reduce confidence if verifier disagrees
if verified_choice != choice:
confidence = max(1, confidence - 2)
results.append({
"judge_choice": choice,
"judge_explanation": explanation,
"verified_choice": verified_choice,
"confidence": confidence
})
return results
def max_pool(self, results: List[Dict[str, Any]]) -> str:
return weighted_pool(results)[0]
class OptimizedPipeline:
def __init__(self, client: OpenAIClient, repeats: int = 3):
self.client = client
self.confidence_judge = ConfidenceJudge(client)
self.falsifier = Falsifier(client)
self.verifier = Verifier(client)
self.repeats = repeats
async def process_single(self, doc: str, claim: str) -> List[Dict[str, Any]]:
results = []
for _ in range(self.repeats):
# Run both evaluations in parallel
judge_task = asyncio.create_task(self.confidence_judge.evaluate(doc, claim))
falsifier_task = asyncio.create_task(self.falsifier.evaluate(doc, claim))
judge_choice, judge_explanation, confidence = await judge_task
falsifier_choice, falsifier_explanation = await falsifier_task
# Decision logic:
# 1. If falsifier finds contradiction (falsifier_choice is "no"), it overrides
# 2. Otherwise, use judge with confidence score
# 3. If confidence is low, use verifier to confirm
verified_choice = judge_choice
# Falsification overrides on "no" (inconsistent) findings
if falsifier_choice == "no":
if judge_choice == "yes" and confidence >= 4:
# Only if judge is very confident, verify the conflict
verified_choice = await self.verifier.verify(
doc, claim, judge_choice,
f"Judge ({confidence}/5): {judge_explanation}\nFalsifier: {falsifier_explanation}"
)
else:
verified_choice = "no" # Trust falsification
confidence = max(confidence, 4) # Boost confidence for falsification
# For uncertain judgments, verify
elif confidence <= 3:
verified_choice = await self.verifier.verify(
doc, claim, judge_choice, judge_explanation
)
results.append({
"judge_choice": judge_choice,
"falsifier_choice": falsifier_choice,
"judge_explanation": judge_explanation,
"falsifier_explanation": falsifier_explanation,
"verified_choice": verified_choice,
"confidence": confidence
})
return results
def max_pool(self, results: List[Dict[str, Any]]) -> Tuple[str, float]:
return weighted_pool(results)
# Dataset processing
async def load_and_process_dataset():
print("Loading dataset...")
dataset = load_dataset('lytang/LLM-AggreFact')
test_data = dataset['test'].filter(lambda row: row['dataset'] == 'ExpertQA')
if MAX_SAMPLES:
test_data = test_data.select(range(min(MAX_SAMPLES, len(test_data))))
return test_data
# Main execution
async def main():
# Initialize API client
client = OpenAIClient(API_BASE_URL)
# Choose the pipeline to use
# pipeline = TwoStagePipeline(client, repeats=REPEATS)
# pipeline = EnhancedPipeline(client, repeats=REPEATS)
# pipeline = ConfidencePipeline(client, repeats=REPEATS)
pipeline = OptimizedPipeline(client, repeats=REPEATS)
# Load dataset
test_data = await load_and_process_dataset()
print(f"Loaded {len(test_data)} examples from ExpertQA dataset")
# Process examples
results = []
semaphore = asyncio.Semaphore(MAX_WORKERS)
async def process_with_semaphore(example):
async with semaphore:
doc = example['doc']
claim = example['claim']
label = example['label']
try:
stage_results = await pipeline.process_single(doc, claim)
# Handle different pipeline types
if isinstance(pipeline, OptimizedPipeline):
final_choice, confidence_score = pipeline.max_pool(stage_results)
return {
"id": len(results),
"doc": doc,
"claim": claim,
"ground_truth": label,
"judge_results": stage_results,
"prediction": final_choice == "yes",
"pooled_choice": final_choice,
"confidence_score": confidence_score
}
else:
final_choice = pipeline.max_pool(stage_results)
return {
"id": len(results),
"doc": doc,
"claim": claim,
"ground_truth": label,
"judge_results": stage_results,
"prediction": final_choice == "yes",
"pooled_choice": final_choice
}
except Exception as e:
print(f"Error processing example: {e}")
return None
print("Processing examples...")
tasks = [process_with_semaphore(example) for example in test_data]
processed_results = await tqdm_asyncio.gather(*tasks)
results.extend([r for r in processed_results if r is not None])
# Convert to DataFrame for analysis
df = pd.DataFrame(results)
# Calculate statistics
if len(df) > 0:
accuracy = (df['prediction'] == df['ground_truth']).mean()
print(f"\nEvaluation Results:")
print(f"Total examples: {len(df)}")
print(f"Accuracy: {accuracy:.4f}")
# Confusion matrix
tp = ((df['prediction'] == True) & (df['ground_truth'] == True)).sum()
tn = ((df['prediction'] == False) & (df['ground_truth'] == False)).sum()
fp = ((df['prediction'] == True) & (df['ground_truth'] == False)).sum()
fn = ((df['prediction'] == False) & (df['ground_truth'] == True)).sum()
# Calculate precision, recall and F1
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
print(f"\nConfusion Matrix:")
print(f"True Positives: {tp}")
print(f"True Negatives: {tn}")
print(f"False Positives: {fp}")
print(f"False Negatives: {fn}")
print(f"\nClassification Metrics:")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1_score:.4f}")
print(f"Specificity: {specificity:.4f}")
# Add confidence analysis if available
if 'confidence_score' in df.columns:
avg_confidence = df['confidence_score'].mean()
correct_confidence = df.loc[df['prediction'] == df['ground_truth'], 'confidence_score'].mean()
incorrect_confidence = df.loc[df['prediction'] != df['ground_truth'], 'confidence_score'].mean()
print(f"\nConfidence Analysis:")
print(f"Average confidence: {avg_confidence:.4f}")
print(f"Confidence when correct: {correct_confidence:.4f}")
print(f"Confidence when incorrect: {incorrect_confidence:.4f}")
# Add confidence metrics to metrics dataframe
confidence_metrics = {
'Metric': ['Average Confidence', 'Confidence When Correct', 'Confidence When Incorrect'],
'Value': [avg_confidence, correct_confidence, incorrect_confidence]
}
confidence_df = pd.DataFrame(confidence_metrics)
# Create metrics dataframe
metrics_df = pd.DataFrame({
'Metric': ['Accuracy', 'Precision', 'Recall', 'F1 Score', 'Specificity',
'True Positives', 'True Negatives', 'False Positives', 'False Negatives'],
'Value': [accuracy, precision, recall, f1_score, specificity, tp, tn, fp, fn]
})
# Append confidence metrics if available
if 'confidence_score' in df.columns:
metrics_df = pd.concat([metrics_df, confidence_df], ignore_index=True)
# Save results
df.to_csv('evaluation_results.csv', index=False)
metrics_df.to_csv('evaluation_metrics.csv', index=False)
print("\nResults saved to evaluation_results.csv")
print("Metrics saved to evaluation_metrics.csv")
else:
print("No valid results obtained")
await client.close()
# Analysis visualization function if needed
def analyze_results(results_file='evaluation_results.csv', metrics_file='evaluation_metrics.csv'):
"""
Analyze and visualize results after running the evaluation.
This can be called separately after the main execution.
"""
import matplotlib.pyplot as plt
import seaborn as sns
# Load results
df = pd.read_csv(results_file)
metrics = pd.read_csv(metrics_file)
# Set up plots
plt.figure(figsize=(15, 10))
# 1. Confusion matrix heatmap
confusion_data = metrics[metrics['Metric'].isin(['True Positives', 'False Positives',
'False Negatives', 'True Negatives'])]
cm_values = confusion_data['Value'].values.reshape(2, 2)
plt.subplot(2, 2, 1)
sns.heatmap(cm_values, annot=True, fmt='g', cmap='Blues',
xticklabels=['Negative', 'Positive'],
yticklabels=['Negative', 'Positive'])
plt.title('Confusion Matrix')
plt.ylabel('Actual')
plt.xlabel('Predicted')
# 2. Performance metrics
perf_metrics = metrics[metrics['Metric'].isin(['Accuracy', 'Precision', 'Recall', 'F1 Score', 'Specificity'])]
plt.subplot(2, 2, 2)
sns.barplot(x='Metric', y='Value', data=perf_metrics)
plt.title('Performance Metrics')
plt.ylim(0, 1)
plt.xticks(rotation=45)
# 3. Confidence distribution if available
if 'confidence_score' in df.columns:
plt.subplot(2, 2, 3)
sns.histplot(df['confidence_score'], bins=20, kde=True)
plt.title('Confidence Score Distribution')
plt.xlabel('Confidence Score')
plt.ylabel('Count')
# 4. Confidence vs correctness
plt.subplot(2, 2, 4)
df['correct'] = df['prediction'] == df['ground_truth']
sns.boxplot(x='correct', y='confidence_score', data=df)
plt.title('Confidence Score by Prediction Correctness')
plt.xlabel('Prediction Correct')
plt.ylabel('Confidence Score')
plt.tight_layout()
plt.savefig('evaluation_analysis.png')
print("Analysis visualization saved to evaluation_analysis.png")
plt.close()
if __name__ == "__main__":
asyncio.run(main())
# Uncomment to run visualization after evaluation
# analyze_results()
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: False, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# 2025/02/19 21:07:47 INFO dspy.evaluate.evaluate: Average Metric: 169 / 200 (84.5%)
# Evaluated optimized pipeline
# Optimized pipeline score: 84.5000
# Processing evaluation examples...
# 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:00<00:00, 4974.80it/s]
# Evaluation Results:
# Total examples: 200
# Correct predictions: 169
# Accuracy: 0.8450
# (
import dspy
import asyncio
from typing import Dict, Any, List
from datasets import load_dataset
from tqdm import tqdm
# Configure DSPy with your preferred LLM
API_BASE = "http://localhost:6002/v1/"
MODEL_NAME = "openai/hello"
lm = dspy.LM(MODEL_NAME, cache=True, model_type="chat", api_key="..", api_base=API_BASE)
dspy.configure(lm=lm)
# Define Signatures for our modules
class JudgeSignature(dspy.Signature):
"""Determine if a claim is consistent with a document."""
document = dspy.InputField(desc="Document contents to check.")
claim = dspy.InputField(desc="Claim to verify.")
reasoning = dspy.OutputField(desc="Step-by-step reasoning about the claim's consistency.")
is_consistent = dspy.OutputField(desc="Whether the claim is consistent with the document (yes/no).")
class VerifierSignature(dspy.Signature):
"""Verify the consistency judgment by checking for errors."""
document = dspy.InputField(desc="Document contents to check.")
claim = dspy.InputField(desc="Claim to verify.")
judge_reasoning = dspy.InputField(desc="Judge's reasoning about consistency.")
judge_decision = dspy.InputField(desc="Judge's decision (yes/no).")
verification_reasoning = dspy.OutputField(desc="Step-by-step verification of the judge's reasoning.")
verified_result = dspy.OutputField(desc="Final verified result (yes/no).")
# Judge Module using Chain-of-Thought
class JudgeModule(dspy.Module):
def __init__(self):
self.judge = dspy.ChainOfThought(JudgeSignature)
def forward(self, document: str, claim: str) -> Dict[str, Any]:
result = self.judge(document=document, claim=claim)
return {
'reasoning': result.reasoning,
'is_consistent': result.is_consistent.lower() == 'yes'
}
# Verifier Module using Chain-of-Thought
class VerifierModule(dspy.Module):
def __init__(self):
self.verifier = dspy.ChainOfThought(VerifierSignature)
def forward(self, document: str, claim: str, judge_reasoning: str, judge_decision: bool) -> Dict[str, Any]:
result = self.verifier(
document=document,
claim=claim,
judge_reasoning=judge_reasoning,
judge_decision="yes" if judge_decision else "no"
)
verified_result = result.verified_result.lower() == 'yes'
return {
'verification_reasoning': result.verification_reasoning,
'verified_result': verified_result
}
# Full Pipeline that combines Judge and Verifier with explicit output
class TwoStagePipeline(dspy.Module):
def __init__(self):
self.judge = JudgeModule()
self.verifier = VerifierModule()
def forward(self, document: str, claim: str):
# Stage 1: Judge
judge_result = self.judge(document=document, claim=claim)
# Stage 2: Verify
verify_result = self.verifier(
document=document,
claim=claim,
judge_reasoning=judge_result['reasoning'],
judge_decision=judge_result['is_consistent']
)
# Create proper output that matches what's expected by metric
verified_result = verify_result['verified_result']
# Return result as object with verified_result attribute
return dspy.Prediction(verified_result=verified_result)
# Define evaluation metric with better debugging
def consistency_metric(example, prediction, trace=None):
"""Evaluate if the model's prediction matches the ground truth with detailed logging."""
# Get predicted value
if hasattr(prediction, 'verified_result'):
predicted_value = prediction.verified_result
else:
try:
predicted_value = prediction['verified_result']
except:
print(f"Warning: Can't find verified_result in prediction: {prediction}")
return False
# Convert prediction to boolean
if isinstance(predicted_value, str):
prediction_value = predicted_value.lower() == 'yes'
else:
prediction_value = bool(predicted_value)
# Get ground truth
if isinstance(example.label, bool):
ground_truth = example.label
else:
ground_truth = bool(example.label)
# Log comparison for debugging
print(f"Comparing - Ground truth: {ground_truth}, Prediction: {prediction_value}")
return ground_truth == prediction_value
# Load and prepare dataset
def prepare_dataset():
"""Load and prepare the dataset for training and evaluation."""
dataset = load_dataset('lytang/LLM-AggreFact')
# Filter to just use ExpertQA subset for this example
test_data = dataset['test'].filter(lambda row: row['dataset'] == 'ExpertQA')
# Limit size for demonstration purposes
max_samples = 1000
if len(test_data) > max_samples:
test_data = test_data.select(range(max_samples))
# Convert to DSPy examples with proper input fields set
dspy_examples = []
for item in test_data:
example = dspy.Example(
document=item['doc'],
claim=item['claim'],
label=item['label']
).with_inputs('document', 'claim') # Explicitly set the input fields
dspy_examples.append(example)
# import random
# Set random seed for reproducibility
# random.seed(42)
# Shuffle the data
# shuffled_examples = dspy_examples.copy()
# random.shuffle(shuffled_examples)
# Take examples for training and development
# trainset = shuffled_examples[:16]
# devset = shuffled_examples[16:32]
train_size = int(len(dspy_examples) * 0.8)
trainset = dspy_examples[:train_size]
devset = dspy_examples[train_size:]
return trainset, devset
# Bootstrap with Random Search optimization
def optimize_pipeline(trainset, devset):
"""Bootstrap the pipeline using BootstrapFewShotWithRandomSearch."""
# Create base pipeline
base_pipeline = TwoStagePipeline()
# Initialize BootstrapFewShotWithRandomSearch
from dspy.teleprompt import BootstrapFewShotWithRandomSearch
print("Creating BootstrapFewShotWithRandomSearch optimizer...")
bootstrap_optimizer = BootstrapFewShotWithRandomSearch(
metric=consistency_metric,
max_labeled_demos=4, # Maximum number of human-labeled demonstrations to use
max_bootstrapped_demos=4, # Maximum number of bootstrapped demonstrations to generate
num_candidate_programs=8, # Number of candidate programs to generate and evaluate
num_threads=40, # Number of threads to use for parallelization
)
print("Created BootstrapFewShotWithRandomSearch optimizer")
optimized_pipeline = bootstrap_optimizer.compile(
student=base_pipeline.deepcopy(), # Create a deep copy to avoid modifying the original
trainset=trainset,
valset=devset # Provide validation set for evaluation during optimization
)
print("Compiled BootstrapFewShotWithRandomSearch optimizer")
# Save optimized pipeline
optimized_pipeline.save("bootstrap_random_search_pipeline.json")
print("Saved optimized pipeline")
# Evaluate optimized pipeline
print(f"Devset size: {len(devset)}")
print(f"Trainset size: {len(trainset)}")
evaluate = dspy.Evaluate(devset=devset, metric=consistency_metric)
print("Evaluating optimized pipeline...")
score = evaluate(optimized_pipeline)
print("Evaluated optimized pipeline")
print(f"Optimized pipeline score: {score:.4f}")
return optimized_pipeline
# Simplified process examples without confidence
def process_examples(pipeline, examples, repeats=1):
"""Process multiple examples with the optimized pipeline."""
results = []
for example in tqdm(examples):
# Get a single prediction
prediction = pipeline(document=example.document, claim=example.claim)
# Get the verified result (boolean)
if hasattr(prediction, 'verified_result'):
predicted_result = prediction.verified_result
else:
predicted_result = prediction['verified_result']
# Convert to boolean if needed
if isinstance(predicted_result, str):
predicted_result = predicted_result.lower() == 'yes'
results.append({
"document": example.document,
"claim": example.claim,
"ground_truth": example.label,
"prediction": predicted_result,
"correct": bool(predicted_result) == bool(example.label)
})
return results
# Main execution
async def main():
# Prepare dataset
print("Preparing dataset...")
trainset, devset = prepare_dataset()
print(f"Dataset prepared: {len(trainset)} training examples, {len(devset)} evaluation examples")
# Optimize pipeline
optimized_pipeline = optimize_pipeline(trainset, devset)
# Process examples with optimized pipeline
print("Processing evaluation examples...")
eval_results = process_examples(optimized_pipeline, devset)
# Calculate and print metrics
correct_count = sum(1 for r in eval_results if r['correct'])
accuracy = correct_count / len(eval_results)
print(f"\nEvaluation Results:")
print(f"Total examples: {len(eval_results)}")
print(f"Correct predictions: {correct_count}")
print(f"Accuracy: {accuracy:.4f}")
if __name__ == "__main__":
asyncio.run(main())
# Average Metric: 19.00 / 22 (86.4%): 69%|███████████████████████████████████████████████████████████████████████████████████████████████▌ | 22/32 [00:55<00:20, 2.09s/it]Comparing - Ground truth: True, Prediction: True
# Average Metric: 20.00 / 23 (87.0%): 72%|███████████████████████████████████████████████████████████████████████████████████████████████████▉ | 23/32 [00:58<00:22, 2.46s/it]Comparing - Ground truth: False, Prediction: True
# Average Metric: 20.00 / 24 (83.3%): 75%|████████████████████████████████████████████████████████████████████████████████████████████████████████▎ | 24/32 [01:02<00:22, 2.86s/it]Comparing - Ground truth: True, Prediction: True
# Average Metric: 21.00 / 25 (84.0%): 78%|████████████████████████████████████████████████████████████████████████████████████████████████████████████▌ | 25/32 [01:04<00:17, 2.44s/it]Comparing - Ground truth: True, Prediction: True
# Average Metric: 22.00 / 26 (84.6%): 81%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████▉ | 26/32 [01:04<00:11, 1.91s/it]Comparing - Ground truth: True, Prediction: True
# Average Metric: 23.00 / 27 (85.2%): 84%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▎ | 27/32 [01:09<00:14, 2.82s/it]Comparing - Ground truth: False, Prediction: False
# Average Metric: 24.00 / 28 (85.7%): 88%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▋ | 28/32 [01:12<00:11, 2.86s/it]Comparing - Ground truth: True, Prediction: True
# Average Metric: 25.00 / 29 (86.2%): 91%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▉ | 29/32 [01:13<00:06, 2.17s/it]Comparing - Ground truth: True, Prediction: True
# Average Metric: 26.00 / 30 (86.7%): 94%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▎ | 30/32 [01:16<00:04, 2.40s/it]Comparing - Ground truth: False, Prediction: True
# Average Metric: 26.00 / 31 (83.9%): 97%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▋ | 31/32 [01:18<00:02, 2.51s/it]Comparing - Ground truth: True, Prediction: False
# Average Metric: 26.00 / 32 (81.2%): 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 32/32 [01:21<00:00, 2.56s/it]
# 2025/02/19 20:34:06 INFO dspy.evaluate.evaluate: Average Metric: 26 / 32 (81.2%)
# Scores so far: [81.25, 81.25, 81.25, 84.38, 81.25, 78.12, 84.38, 78.12, 71.88, 84.38, 81.25]
# Best score so far: 84.38
# 11 candidate programs found.
# Compiled BootstrapFewShotWithRandomSearch optimizer
# Saved optimized pipeline
# Devset size: 32
# Trainset size: 16
# Evaluating optimized pipeline...
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: False, Prediction: False
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: False, Prediction: False
# Comparing - Ground truth: True, Prediction: False
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: False, Prediction: False
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: False, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: False
# Comparing - Ground truth: False, Prediction: False
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: False, Prediction: True
# Comparing - Ground truth: True, Prediction: False
# 2025/02/19 20:34:06 INFO dspy.evaluate.evaluate: Average Metric: 27 / 32 (84.4%)
# Evaluated optimized pipeline
# Optimized pipeline score: 84.3800
# Processing evaluation examples...
# 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 32/32 [00:00<00:00, 2524.93it/s]
# Evaluation Results:
# Total examples: 32
# Correct predictions: 27
# Accuracy: 0.8438
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: False, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# 2025/02/19 21:07:47 INFO dspy.evaluate.evaluate: Average Metric: 169 / 200 (84.5%)
# Evaluated optimized pipeline
# Optimized pipeline score: 84.5000
# Processing evaluation examples...
# 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:00<00:00, 4974.80it/s]
# Evaluation Results:
# Total examples: 200
# Correct predictions: 169
# Accuracy: 0.8450
# (
# Average Metric: 38.00 / 50 (76.0%): 92%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▉ | 46/50 [03:38<00:20, 5.21s/it]Comparing - Ground truth: True, Prediction: True
# Average Metric: 39.00 / 50 (78.0%): 94%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▋ | 47/50 [03:43<00:15, 5.13s/it]Comparing - Ground truth: True, Prediction: True
# Average Metric: 40.00 / 50 (80.0%): 96%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▍ | 48/50 [03:49<00:10, 5.29s/it]Comparing - Ground truth: True, Prediction: True
# Average Metric: 41.00 / 50 (82.0%): 98%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏ | 49/50 [03:54<00:05, 5.35s/it]Comparing - Ground truth: True, Prediction: True
# Average Metric: 42.00 / 50 (84.0%): 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 50/50 [03:58<00:00, 4.76s/it]
# 2025/02/19 21:46:52 INFO dspy.evaluate.evaluate: Average Metric: 42 / 50 (84.0%)
# Compiled COPRO optimizer
# Saved optimized pipeline
# Devset size: 200
# Trainset size: 800
# Evaluating optimized pipeline...
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
import dspy
from typing import Dict, Any
from datasets import load_dataset
from tqdm import tqdm
# Configure DSPy with your preferred LLM
API_BASE = "http://localhost:6002/v1/"
MODEL_NAME = "openai/hello"
lm = dspy.LM(MODEL_NAME, cache=True, model_type="chat", api_key="..", api_base=API_BASE)
dspy.configure(lm=lm)
# Define Signatures for our modules
class JudgeSignature(dspy.Signature):
"""Determine if a claim is consistent with a document."""
document = dspy.InputField(desc="Document contents to check.")
claim = dspy.InputField(desc="Claim to verify.")
reasoning = dspy.OutputField(desc="Step-by-step reasoning about the claim's consistency.")
is_consistent = dspy.OutputField(desc="Whether the claim is consistent with the document (yes/no).")
class VerifierSignature(dspy.Signature):
"""Verify the consistency judgment by checking for errors."""
document = dspy.InputField(desc="Document contents to check.")
claim = dspy.InputField(desc="Claim to verify.")
judge_reasoning = dspy.InputField(desc="Judge's reasoning about consistency.")
judge_decision = dspy.InputField(desc="Judge's decision (yes/no).")
verification_reasoning = dspy.OutputField(desc="Step-by-step verification of the judge's reasoning.")
verified_result = dspy.OutputField(desc="Final verified result (yes/no).")
# Judge Module using Chain-of-Thought
class JudgeModule(dspy.Module):
def __init__(self):
self.judge = dspy.ChainOfThought(JudgeSignature)
def forward(self, document: str, claim: str) -> Dict[str, Any]:
result = self.judge(document=document, claim=claim)
return {
'reasoning': result.reasoning,
'is_consistent': result.is_consistent.lower() == 'yes'
}
# Verifier Module using Chain-of-Thought
class VerifierModule(dspy.Module):
def __init__(self):
self.verifier = dspy.ChainOfThought(VerifierSignature)
def forward(self, document: str, claim: str, judge_reasoning: str, judge_decision: bool) -> Dict[str, Any]:
result = self.verifier(
document=document,
claim=claim,
judge_reasoning=judge_reasoning,
judge_decision="yes" if judge_decision else "no"
)
verified_result = result.verified_result.lower() == 'yes'
return {
'verification_reasoning': result.verification_reasoning,
'verified_result': verified_result
}
# Full Pipeline that combines Judge and Verifier with explicit output
class TwoStagePipeline(dspy.Module):
def __init__(self):
self.judge = JudgeModule()
self.verifier = VerifierModule()
def forward(self, document: str, claim: str):
# Stage 1: Judge
judge_result = self.judge(document=document, claim=claim)
# Stage 2: Verify
verify_result = self.verifier(
document=document,
claim=claim,
judge_reasoning=judge_result['reasoning'],
judge_decision=judge_result['is_consistent']
)
# Create proper output that matches what's expected by metric
verified_result = verify_result['verified_result']
# Return result as object with verified_result attribute
return dspy.Prediction(verified_result=verified_result)
# Define evaluation metric with better debugging
def consistency_metric(example, prediction, trace=None):
"""Evaluate if the model's prediction matches the ground truth with detailed logging."""
# Get predicted value
if hasattr(prediction, 'verified_result'):
predicted_value = prediction.verified_result
else:
try:
predicted_value = prediction['verified_result']
except:
print(f"Warning: Can't find verified_result in prediction: {prediction}")
return False
# Convert prediction to boolean
if isinstance(predicted_value, str):
prediction_value = predicted_value.lower() == 'yes'
else:
prediction_value = bool(predicted_value)
# Get ground truth
if isinstance(example.label, bool):
ground_truth = example.label
else:
ground_truth = bool(example.label)
# Log comparison for debugging
print(f"Comparing - Ground truth: {ground_truth}, Prediction: {prediction_value}")
return ground_truth == prediction_value
# Load and prepare dataset
def prepare_dataset():
"""Load and prepare the dataset for training and evaluation."""
dataset = load_dataset('lytang/LLM-AggreFact')
# Filter to just use ExpertQA subset for this example
test_data = dataset['test'].filter(lambda row: row['dataset'] == 'ExpertQA')
# Limit size for demonstration purposes
max_samples = 1000
if len(test_data) > max_samples:
test_data = test_data.select(range(max_samples))
# Convert to DSPy examples
dspy_examples = []
for item in test_data:
example = dspy.Example(
document=item['doc'],
claim=item['claim'],
label=item['label']
).with_inputs('document', 'claim') # Explicitly set the input fields
dspy_examples.append(example)
train_size = int(len(dspy_examples) * 0.8)
trainset = dspy_examples[:train_size]
devset = dspy_examples[train_size:]
return trainset, devset
# Optimize with COPRO
def optimize_pipeline_with_copro(trainset, devset):
"""Optimize the pipeline using COPRO."""
# Create base pipeline
base_pipeline = TwoStagePipeline()
# Initialize COPRO
from dspy.teleprompt import COPRO
print("Creating COPRO optimizer...")
copro_optimizer = COPRO(
metric=consistency_metric,
breadth=5, # Number of candidate instructions to consider
depth=3, # Number of optimization iterations
num_threads=8, # For parallel evaluation
auto="light", # Setting auto parameter for easier configuration
verbose=True # Enable verbose logging
)
print("Created COPRO optimizer")
# Optimize program with COPRO
print("Optimizing program with COPRO...")
optimized_pipeline = copro_optimizer.compile(
base_pipeline.deepcopy(), # Create a deep copy to avoid modifying the original
trainset=trainset[:50], # Using a subset for faster optimization
# valset=devset[:50], # Using a subset for validation
eval_kwargs={'display_progress': True}
)
print("Compiled COPRO optimizer")
# Save optimized pipeline
optimized_pipeline.save("copro_optimized_pipeline.json")
print("Saved optimized pipeline")
# Evaluate optimized pipeline
print(f"Devset size: {len(devset)}")
print(f"Trainset size: {len(trainset)}")
evaluate = dspy.Evaluate(devset=devset, metric=consistency_metric)
print("Evaluating optimized pipeline...")
score = evaluate(optimized_pipeline)
print("Evaluated optimized pipeline")
print(f"Optimized pipeline score: {score:.4f}")
return optimized_pipeline
# Ensemble optimization
def optimize_with_ensemble(trainset, devset):
"""Create an ensemble of optimized pipelines."""
from dspy.teleprompt import BootstrapFewShotWithRandomSearch
from dspy.teleprompt.ensemble import Ensemble
print("Creating ensemble of optimized pipelines...")
# First, create multiple candidate programs using BootstrapFewShotWithRandomSearch
bootstrap_optimizer = BootstrapFewShotWithRandomSearch(
metric=consistency_metric,
max_bootstrapped_demos=2,
num_candidate_programs=5, # Create 5 candidate programs
num_threads=8
)
base_pipeline = TwoStagePipeline()
# Compile to get multiple candidate programs
candidate_programs = bootstrap_optimizer.compile(
student=base_pipeline.deepcopy(),
trainset=trainset[:100], # Using subset for faster optimization
valset=devset[:50]
)
# Create ensemble using majority vote
ensemble_optimizer = Ensemble(reduce_fn=dspy.majority)
final_program = ensemble_optimizer.compile(candidate_programs)
# Save ensemble pipeline
final_program.save("ensemble_optimized_pipeline.json")
# Evaluate ensemble pipeline
evaluate = dspy.Evaluate(devset=devset, metric=consistency_metric)
ensemble_score = evaluate(final_program)
print(f"Ensemble pipeline score: {ensemble_score:.4f}")
return final_program
# Process examples without confidence
def process_examples(pipeline, examples):
"""Process multiple examples with the optimized pipeline."""
results = []
for example in tqdm(examples):
# Get a single prediction
prediction = pipeline(document=example.document, claim=example.claim)
# Get the verified result (boolean)
if hasattr(prediction, 'verified_result'):
predicted_result = prediction.verified_result
else:
predicted_result = prediction['verified_result']
# Convert to boolean if needed
if isinstance(predicted_result, str):
predicted_result = predicted_result.lower() == 'yes'
results.append({
"document": example.document,
"claim": example.claim,
"ground_truth": example.label,
"prediction": predicted_result,
"correct": bool(predicted_result) == bool(example.label)
})
return results
# Main execution
def main():
# Prepare dataset
print("Preparing dataset...")
trainset, devset = prepare_dataset()
print(f"Dataset prepared: {len(trainset)} training examples, {len(devset)} evaluation examples")
# Option 1: Optimize with COPRO
copro_pipeline = optimize_pipeline_with_copro(trainset, devset)
# Option 2: Optimize with Ensemble
ensemble_pipeline = optimize_with_ensemble(trainset, devset)
# Process examples with both pipelines for comparison
print("Processing evaluation examples with COPRO pipeline...")
copro_results = process_examples(copro_pipeline, devset)
print("Processing evaluation examples with Ensemble pipeline...")
ensemble_results = process_examples(ensemble_pipeline, devset)
# Calculate and print metrics for both approaches
copro_correct = sum(1 for r in copro_results if r['correct'])
copro_accuracy = copro_correct / len(copro_results)
ensemble_correct = sum(1 for r in ensemble_results if r['correct'])
ensemble_accuracy = ensemble_correct / len(ensemble_results)
print(f"\nEvaluation Results:")
print(f"Total examples: {len(devset)}")
print(f"COPRO correct predictions: {copro_correct}")
print(f"COPRO accuracy: {copro_accuracy:.4f}")
print(f"Ensemble correct predictions: {ensemble_correct}")
print(f"Ensemble accuracy: {ensemble_accuracy:.4f}")
# Determine which approach performed better
if copro_accuracy > ensemble_accuracy:
print("\nCOPRO optimization performed better in this case.")
return copro_pipeline
else:
print("\nEnsemble optimization performed better in this case.")
return ensemble_pipeline
if __name__ == "__main__":
main()
# python ./verify_dspy.py
# Preparing dataset...
# Dataset prepared: 16 training examples, 32 evaluation examples
# Created LabeledFewShot optimizer
# Compiled LabeledFewShot optimizer
# Saving optimized pipeline
# Devset size: 32
# Trainset size: 16
# Evaluating optimized pipeline
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: False, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: False, Prediction: False
# Comparing - Ground truth: True, Prediction: False
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: False, Prediction: False
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: False
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: False, Prediction: False
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: False
# Comparing - Ground truth: False, Prediction: False
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: False, Prediction: True
# Comparing - Ground truth: True, Prediction: False
# 2025/02/19 20:20:08 INFO dspy.evaluate.evaluate: Average Metric: 26 / 32 (81.2%)
# Evaluated optimized pipeline
# Optimized pipeline score: 81.2500
# Processing evaluation examples...
# 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 32/32 [00:00<00:00, 2989.53it/s]
# Evaluation Results:
# Total examples: 32
# Correct predictions: 26
# Accuracy: 0.8125
# (base) ➜ verifier git:(master) ✗
import dspy
import asyncio
from typing import Dict, Any, List
from datasets import load_dataset
from tqdm import tqdm
# Configure DSPy with your preferred LLM
API_BASE = "http://localhost:6002/v1/"
MODEL_NAME = "openai/hello"
lm = dspy.LM(MODEL_NAME, cache=True, model_type="chat", api_key="..", api_base=API_BASE)
dspy.configure(lm=lm)
# Define Signatures for our modules
class JudgeSignature(dspy.Signature):
"""Determine if a claim is consistent with a document."""
document = dspy.InputField(desc="Document contents to check.")
claim = dspy.InputField(desc="Claim to verify.")
reasoning = dspy.OutputField(desc="Step-by-step reasoning about the claim's consistency.")
is_consistent = dspy.OutputField(desc="Whether the claim is consistent with the document (yes/no).")
class VerifierSignature(dspy.Signature):
"""Verify the consistency judgment by checking for errors."""
document = dspy.InputField(desc="Document contents to check.")
claim = dspy.InputField(desc="Claim to verify.")
judge_reasoning = dspy.InputField(desc="Judge's reasoning about consistency.")
judge_decision = dspy.InputField(desc="Judge's decision (yes/no).")
verification_reasoning = dspy.OutputField(desc="Step-by-step verification of the judge's reasoning.")
verified_result = dspy.OutputField(desc="Final verified result (yes/no).")
# Judge Module using Chain-of-Thought
class JudgeModule(dspy.Module):
def __init__(self):
self.judge = dspy.ChainOfThought(JudgeSignature)
def forward(self, document: str, claim: str) -> Dict[str, Any]:
result = self.judge(document=document, claim=claim)
return {
'reasoning': result.reasoning,
'is_consistent': result.is_consistent.lower() == 'yes'
}
# Verifier Module using Chain-of-Thought
class VerifierModule(dspy.Module):
def __init__(self):
self.verifier = dspy.ChainOfThought(VerifierSignature)
def forward(self, document: str, claim: str, judge_reasoning: str, judge_decision: bool) -> Dict[str, Any]:
result = self.verifier(
document=document,
claim=claim,
judge_reasoning=judge_reasoning,
judge_decision="yes" if judge_decision else "no"
)
verified_result = result.verified_result.lower() == 'yes'
return {
'verification_reasoning': result.verification_reasoning,
'verified_result': verified_result
}
# Full Pipeline that combines Judge and Verifier with explicit output
class TwoStagePipeline(dspy.Module):
def __init__(self):
self.judge = JudgeModule()
self.verifier = VerifierModule()
def forward(self, document: str, claim: str):
# Stage 1: Judge
judge_result = self.judge(document=document, claim=claim)
# Stage 2: Verify
verify_result = self.verifier(
document=document,
claim=claim,
judge_reasoning=judge_result['reasoning'],
judge_decision=judge_result['is_consistent']
)
# Create proper output that matches what's expected by metric
verified_result = verify_result['verified_result']
# Return result as object with verified_result attribute
return dspy.Prediction(verified_result=verified_result)
# Define evaluation metric with better debugging
def consistency_metric(example, prediction):
"""Evaluate if the model's prediction matches the ground truth with detailed logging."""
# Get predicted value
if hasattr(prediction, 'verified_result'):
predicted_value = prediction.verified_result
else:
try:
predicted_value = prediction['verified_result']
except:
print(f"Warning: Can't find verified_result in prediction: {prediction}")
return False
# Convert prediction to boolean
if isinstance(predicted_value, str):
prediction_value = predicted_value.lower() == 'yes'
else:
prediction_value = bool(predicted_value)
# Get ground truth
if isinstance(example.label, bool):
ground_truth = example.label
else:
ground_truth = bool(example.label)
# Log comparison for debugging
print(f"Comparing - Ground truth: {ground_truth}, Prediction: {prediction_value}")
return ground_truth == prediction_value
# Load and prepare dataset
def prepare_dataset():
"""Load and prepare the dataset for training and evaluation."""
dataset = load_dataset('lytang/LLM-AggreFact')
# Filter to just use ExpertQA subset for this example
test_data = dataset['test'].filter(lambda row: row['dataset'] == 'ExpertQA')
# Limit size for demonstration purposes
max_samples = 1000
if len(test_data) > max_samples:
test_data = test_data.select(range(max_samples))
# Convert to DSPy examples with proper input fields set
dspy_examples = []
for item in test_data:
example = dspy.Example(
document=item['doc'],
claim=item['claim'],
label=item['label']
).with_inputs('document', 'claim') # Explicitly set the input fields
dspy_examples.append(example)
# # Split into train/dev
# train_size = int(len(dspy_examples) * 0.8)
# trainset = dspy_examples[:train_size]
# devset = dspy_examples[train_size:]
import random
# Assuming dspy_examples is your list of examples
# Set random seed for reproducibility
random.seed(42)
# Shuffle the data
shuffled_examples = dspy_examples.copy()
random.shuffle(shuffled_examples)
# Take 32 random examples for training
trainset = shuffled_examples[:16]
# Put the rest in dev
devset = shuffled_examples[:32]
return trainset, devset
# Simple LabeledFewShot optimization
def optimize_pipeline(trainset, devset):
"""Bootstrap the pipeline using LabeledFewShot."""
# Create base pipeline
base_pipeline = TwoStagePipeline()
# Initialize LabeledFewShot
from dspy.teleprompt import LabeledFewShot
labeled_fewshot_optimizer = LabeledFewShot()
print("Created LabeledFewShot optimizer")
optimized_pipeline = labeled_fewshot_optimizer.compile(
student=base_pipeline,
trainset=trainset
)
print("Compiled LabeledFewShot optimizer")
# Save optimized pipeline
optimized_pipeline.save("labeled_fewshot_pipeline.json")
print("Saving optimized pipeline")
# Evaluate optimized pipeline
print(f"Devset size: {len(devset)}")
print(f"Trainset size: {len(trainset)}")
evaluate = dspy.Evaluate(devset=devset, metric=consistency_metric)
print("Evaluating optimized pipeline")
score = evaluate(optimized_pipeline)
print("Evaluated optimized pipeline")
print(f"Optimized pipeline score: {score:.4f}")
return optimized_pipeline
# Simplified process examples without confidence
def process_examples(pipeline, examples, repeats=1):
"""Process multiple examples with the optimized pipeline."""
results = []
for example in tqdm(examples):
# Get a single prediction
prediction = pipeline(document=example.document, claim=example.claim)
# Get the verified result (boolean)
if hasattr(prediction, 'verified_result'):
predicted_result = prediction.verified_result
else:
predicted_result = prediction['verified_result']
# Convert to boolean if needed
if isinstance(predicted_result, str):
predicted_result = predicted_result.lower() == 'yes'
results.append({
"document": example.document,
"claim": example.claim,
"ground_truth": example.label,
"prediction": predicted_result,
"correct": bool(predicted_result) == bool(example.label)
})
return results
# Main execution
async def main():
# Prepare dataset
print("Preparing dataset...")
trainset, devset = prepare_dataset()
print(f"Dataset prepared: {len(trainset)} training examples, {len(devset)} evaluation examples")
# Optimize pipeline
optimized_pipeline = optimize_pipeline(trainset, devset)
# Process examples with optimized pipeline
print("Processing evaluation examples...")
eval_results = process_examples(optimized_pipeline, devset)
# Calculate and print metrics
correct_count = sum(1 for r in eval_results if r['correct'])
accuracy = correct_count / len(eval_results)
print(f"\nEvaluation Results:")
print(f"Total examples: {len(eval_results)}")
print(f"Correct predictions: {correct_count}")
print(f"Accuracy: {accuracy:.4f}")
if __name__ == "__main__":
asyncio.run(main())
# █████████████████████████████████████████████████████ | 95/100 [01:51<00:08, 1.71s/it]Comparing - Ground truth: False, Prediction: True
# Average Metric: 74.00 / 96 (77.1%): 96%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▍ | 96/100 [01:53<00:07, 1.76s/it]Comparing - Ground truth: True, Prediction: True
# Average Metric: 75.00 / 97 (77.3%): 97%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▊ | 97/100 [01:53<00:04, 1.56s/it]Comparing - Ground truth: True, Prediction: True
# Average Metric: 76.00 / 98 (77.6%): 98%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▏ | 98/100 [01:57<00:04, 2.02s/it]Comparing - Ground truth: True, Prediction: True
# Average Metric: 77.00 / 99 (77.8%): 99%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▌ | 99/100 [02:00<00:02, 2.18s/it]Comparing - Ground truth: False, Prediction: True
# Average Metric: 77.00 / 100 (77.0%): 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [02:04<00:00, 1.25s/it]
# 2025/02/19 21:04:54 INFO dspy.evaluate.evaluate: Average Metric: 77 / 100 (77.0%)
# 2025/02/19 21:04:54 INFO dspy.teleprompt.mipro_optimizer_v2: New best full eval score! Score: 77.0
# 2025/02/19 21:04:54 INFO dspy.teleprompt.mipro_optimizer_v2: Full eval scores so far: [73.0, 77.0]
# 2025/02/19 21:04:54 INFO dspy.teleprompt.mipro_optimizer_v2: Best full score so far: 77.0
# 2025/02/19 21:04:54 INFO dspy.teleprompt.mipro_optimizer_v2: =======================
# 2025/02/19 21:04:54 INFO dspy.teleprompt.mipro_optimizer_v2:
# 2025/02/19 21:04:54 INFO dspy.teleprompt.mipro_optimizer_v2: Returning best identified program with score 77.0!
# Compiled MIPROv2 optimizer
# Saved optimized pipeline
# Devset size: 200
# Trainset size: 800
# Evaluating optimized pipeline...
# Comparing - Ground truth: True, Prediction: True
# # Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# Comparing - Ground truth: False, Prediction: True
# Comparing - Ground truth: True, Prediction: True
# 2025/02/19 21:21:10 INFO dspy.evaluate.evaluate: Average Metric: 177 / 200 (88.5%)
# Evaluated optimized pipeline
# Optimized pipeline score: 88.5000
# Processing evaluation examples...
# 100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:00<00:00, 4238.45it/s]
# Evaluation Results:
# Total examples: 200
# Correct predictions: 177
# Accuracy: 0.8850
# (base) ➜ verifier git:(master) ✗
import dspy
import asyncio
from typing import Dict, Any, List
from datasets import load_dataset
from tqdm import tqdm
# Configure DSPy with your preferred LLM
API_BASE = "http://localhost:6002/v1/"
MODEL_NAME = "openai/hello"
lm = dspy.LM(MODEL_NAME, cache=True, model_type="chat", api_key="..", api_base=API_BASE)
dspy.configure(lm=lm)
# Define Signatures for our modules
class JudgeSignature(dspy.Signature):
"""Determine if a claim is consistent with a document."""
document = dspy.InputField(desc="Document contents to check.")
claim = dspy.InputField(desc="Claim to verify.")
reasoning = dspy.OutputField(desc="Step-by-step reasoning about the claim's consistency.")
is_consistent = dspy.OutputField(desc="Whether the claim is consistent with the document (yes/no).")
class VerifierSignature(dspy.Signature):
"""Verify the consistency judgment by checking for errors."""
document = dspy.InputField(desc="Document contents to check.")
claim = dspy.InputField(desc="Claim to verify.")
judge_reasoning = dspy.InputField(desc="Judge's reasoning about consistency.")
judge_decision = dspy.InputField(desc="Judge's decision (yes/no).")
verification_reasoning = dspy.OutputField(desc="Step-by-step verification of the judge's reasoning.")
verified_result = dspy.OutputField(desc="Final verified result (yes/no).")
# Judge Module using Chain-of-Thought
class JudgeModule(dspy.Module):
def __init__(self):
self.judge = dspy.ChainOfThought(JudgeSignature)
def forward(self, document: str, claim: str) -> Dict[str, Any]:
result = self.judge(document=document, claim=claim)
return {
'reasoning': result.reasoning,
'is_consistent': result.is_consistent.lower() == 'yes'
}
# Verifier Module using Chain-of-Thought
class VerifierModule(dspy.Module):
def __init__(self):
self.verifier = dspy.ChainOfThought(VerifierSignature)
def forward(self, document: str, claim: str, judge_reasoning: str, judge_decision: bool) -> Dict[str, Any]:
result = self.verifier(
document=document,
claim=claim,
judge_reasoning=judge_reasoning,
judge_decision="yes" if judge_decision else "no"
)
verified_result = result.verified_result.lower() == 'yes'
return {
'verification_reasoning': result.verification_reasoning,
'verified_result': verified_result
}
# Full Pipeline that combines Judge and Verifier with explicit output
class TwoStagePipeline(dspy.Module):
def __init__(self):
self.judge = JudgeModule()
self.verifier = VerifierModule()
def forward(self, document: str, claim: str):
# Stage 1: Judge
judge_result = self.judge(document=document, claim=claim)
# Stage 2: Verify
verify_result = self.verifier(
document=document,
claim=claim,
judge_reasoning=judge_result['reasoning'],
judge_decision=judge_result['is_consistent']
)
# Create proper output that matches what's expected by metric
verified_result = verify_result['verified_result']
# Return result as object with verified_result attribute
return dspy.Prediction(verified_result=verified_result)
# Define evaluation metric with better debugging
def consistency_metric(example, prediction, trace=None):
"""Evaluate if the model's prediction matches the ground truth with detailed logging."""
# Get predicted value
if hasattr(prediction, 'verified_result'):
predicted_value = prediction.verified_result
else:
try:
predicted_value = prediction['verified_result']
except:
print(f"Warning: Can't find verified_result in prediction: {prediction}")
return False
# Convert prediction to boolean
if isinstance(predicted_value, str):
prediction_value = predicted_value.lower() == 'yes'
else:
prediction_value = bool(predicted_value)
# Get ground truth
if isinstance(example.label, bool):
ground_truth = example.label
else:
ground_truth = bool(example.label)
# Log comparison for debugging
print(f"Comparing - Ground truth: {ground_truth}, Prediction: {prediction_value}")
return ground_truth == prediction_value
# Load and prepare dataset
def prepare_dataset():
"""Load and prepare the dataset for training and evaluation."""
dataset = load_dataset('lytang/LLM-AggreFact')
# Filter to just use ExpertQA subset for this example
test_data = dataset['test'].filter(lambda row: row['dataset'] == 'ExpertQA')
# Limit size for demonstration purposes
max_samples = 1000
if len(test_data) > max_samples:
test_data = test_data.select(range(max_samples))
# Convert to DSPy examples with proper input fields set
dspy_examples = []
for item in test_data:
example = dspy.Example(
document=item['doc'],
claim=item['claim'],
label=item['label']
).with_inputs('document', 'claim') # Explicitly set the input fields
dspy_examples.append(example)
train_size = int(len(dspy_examples) * 0.8)
trainset = dspy_examples[:train_size]
devset = dspy_examples[train_size:]
return trainset, devset
# Optimize with MIPROv2
def optimize_pipeline(trainset, devset):
"""Optimize the pipeline using MIPROv2."""
# Create base pipeline
base_pipeline = TwoStagePipeline()
# Initialize MIPROv2
from dspy.teleprompt import MIPROv2
print("Creating MIPROv2 optimizer...")
mipro_optimizer = MIPROv2(
metric=consistency_metric,
auto="light", # Can choose between light, medium, and heavy optimization runs
)
print("Created MIPROv2 optimizer")
# Optimize program with MIPRO
print("Optimizing program with MIPRO...")
optimized_pipeline = mipro_optimizer.compile(
base_pipeline.deepcopy(), # Create a deep copy to avoid modifying the original
trainset=trainset,
max_bootstrapped_demos=3,
max_labeled_demos=4,
requires_permission_to_run=False,
)
print("Compiled MIPROv2 optimizer")
# Save optimized pipeline
optimized_pipeline.save("mipro_optimized_pipeline.json")
print("Saved optimized pipeline")
# Evaluate optimized pipeline
print(f"Devset size: {len(devset)}")
print(f"Trainset size: {len(trainset)}")
evaluate = dspy.Evaluate(devset=devset, metric=consistency_metric)
print("Evaluating optimized pipeline...")
score = evaluate(optimized_pipeline)
print("Evaluated optimized pipeline")
print(f"Optimized pipeline score: {score:.4f}")
return optimized_pipeline
# Simplified process examples without confidence
def process_examples(pipeline, examples, repeats=1):
"""Process multiple examples with the optimized pipeline."""
results = []
for example in tqdm(examples):
# Get a single prediction
prediction = pipeline(document=example.document, claim=example.claim)
# Get the verified result (boolean)
if hasattr(prediction, 'verified_result'):
predicted_result = prediction.verified_result
else:
predicted_result = prediction['verified_result']
# Convert to boolean if needed
if isinstance(predicted_result, str):
predicted_result = predicted_result.lower() == 'yes'
results.append({
"document": example.document,
"claim": example.claim,
"ground_truth": example.label,
"prediction": predicted_result,
"correct": bool(predicted_result) == bool(example.label)
})
return results
# Main execution
async def main():
# Prepare dataset
print("Preparing dataset...")
trainset, devset = prepare_dataset()
print(f"Dataset prepared: {len(trainset)} training examples, {len(devset)} evaluation examples")
# Optimize pipeline
optimized_pipeline = optimize_pipeline(trainset, devset)
# Process examples with optimized pipeline
print("Processing evaluation examples...")
eval_results = process_examples(optimized_pipeline, devset)
# Calculate and print metrics
correct_count = sum(1 for r in eval_results if r['correct'])
accuracy = correct_count / len(eval_results)
print(f"\nEvaluation Results:")
print(f"Total examples: {len(eval_results)}")
print(f"Correct predictions: {correct_count}")
print(f"Accuracy: {accuracy:.4f}")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment