Skip to content

Instantly share code, notes, and snippets.

@swyxio
Last active March 31, 2025 00:56
Show Gist options
  • Save swyxio/9ab20648acca2d2b9ce5c1e91222fecb to your computer and use it in GitHub Desktop.
Save swyxio/9ab20648acca2d2b9ce5c1e91222fecb to your computer and use it in GitHub Desktop.
pokemon hackathon submission - swyx prompted version
#!/usr/bin/env python3
# /// script
# dependencies = [
# "morphcloud",
# "requests",
# "pillow",
# "rich",
# "anthropic",
# "flask",
# ]
# ///
"""
Pokemon Agent Dashboard - Single File Version
This script provides a web interface for managing Pokemon agents.
Just run this file and open your browser to http://localhost:5000.
Dependencies: flask
"""
import os
import sys
import time
import signal
import subprocess
import threading
import re
from collections import deque
import webbrowser
from flask import Flask, request, jsonify
# Try to import MorphCloudClient for snapshot operations
try:
from morphcloud.api import MorphCloudClient
except ImportError:
print("Warning: morphcloud package not found. Some snapshot features may not work.")
MorphCloudClient = None
# The HTML interface with enhanced snapshot viewer tab
HTML_TEMPLATE = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Pokemon Agent Dashboard</title>
<style>
body, html {
margin: 0;
padding: 0;
height: 100%;
font-family: Arial, sans-serif;
}
.container {
display: flex;
height: 100vh;
}
.sidebar {
width: 320px;
background-color: #f5f5f5;
padding: 15px;
display: flex;
flex-direction: column;
overflow: hidden;
}
.controls {
flex: 0 0 auto;
}
.console, .snapshots {
flex: 1;
margin-top: 15px;
overflow: hidden;
display: flex;
flex-direction: column;
}
.console-title, .snapshots-title {
font-weight: bold;
margin-bottom: 5px;
display: flex;
justify-content: space-between;
align-items: center;
}
.console-output, .snapshots-list {
flex: 1;
background-color: #222;
color: #f0f0f0;
font-family: monospace;
padding: 10px;
overflow-y: auto;
white-space: pre-wrap;
font-size: 12px;
border-radius: 4px;
}
.tabs {
display: flex;
margin-bottom: 10px;
}
.tab {
padding: 8px 16px;
cursor: pointer;
background-color: #e0e0e0;
border: 1px solid #ccc;
border-bottom: none;
border-radius: 4px 4px 0 0;
margin-right: 5px;
}
.tab.active {
background-color: #f0f0f0;
font-weight: bold;
}
.tab-content {
display: none;
flex: 1;
overflow: hidden;
flex-direction: column;
}
.tab-content.active {
display: flex;
}
.main-view {
flex: 1;
display: flex;
flex-direction: column;
overflow: hidden;
}
.game-frame {
flex: 1;
border: none;
}
label {
display: block;
margin-top: 10px;
font-weight: bold;
}
input {
width: 100%;
padding: 8px;
margin-top: 4px;
box-sizing: border-box;
}
button {
margin-top: 10px;
padding: 8px;
width: 100%;
background-color: #4CAF50;
color: white;
border: none;
cursor: pointer;
border-radius: 4px;
}
button:hover {
background-color: #45a049;
}
button:disabled {
background-color: #cccccc;
cursor: not-allowed;
}
button.stop {
background-color: #f44336;
}
button.stop:hover {
background-color: #d32f2f;
}
.status {
margin-top: 10px;
padding: 8px;
background-color: #e0e0e0;
border-radius: 4px;
}
.clear-logs, .refresh-snapshots {
margin-top: 5px;
font-size: 12px;
padding: 4px;
background-color: #666;
width: auto;
}
/* Snapshot tree styling */
.snapshot-node {
margin-bottom: 8px;
padding: 5px;
background-color: #333;
border-radius: 3px;
cursor: pointer;
}
.snapshot-node:hover {
background-color: #444;
}
.snapshot-node.current {
background-color: #1e5922;
}
.snapshot-node .title {
font-weight: bold;
color: #4CAF50;
}
.snapshot-node .id {
color: #aaa;
font-size: 10px;
}
.snapshot-node .metadata {
color: #888;
font-size: 11px;
margin-top: 2px;
}
.snapshot-actions {
margin-top: 10px;
display: flex;
gap: 5px;
}
.snapshot-action {
font-size: 12px;
padding: 4px 8px;
margin: 0;
flex: 1;
}
.info-panel {
margin-top: 10px;
padding: 8px;
background-color: #444;
border-radius: 4px;
font-size: 12px;
color: #ddd;
}
</style>
</head>
<body>
<div class="container">
<div class="sidebar">
<div class="controls">
<h2>Pokemon Agent</h2>
<form id="agentForm">
<label for="snapshotId">Snapshot ID:</label>
<input type="text" id="snapshotId" required>
<label for="steps">Steps:</label>
<input type="number" id="steps" value="10" min="1">
<div style="display: flex; gap: 10px; margin-top: 10px;">
<div style="flex: 1;">
<button type="submit" id="startButton">Start Agent</button>
</div>
<div style="flex: 1;">
<button type="button" id="stopButton" class="stop" disabled>Stop Agent</button>
</div>
</div>
</form>
<div id="statusDisplay" class="status">Status: Idle</div>
<div style="margin-top: 10px; display: flex; gap: 10px; align-items: center;">
<label for="autoRefresh" style="margin-top: 0; display: flex; align-items: center; font-weight: normal;">
<input type="checkbox" id="autoRefresh" checked style="width: auto; margin-right: 5px;">
Auto-refresh
</label>
<select id="refreshRate" style="width: 100px;">
<option value="500">0.5s</option>
<option value="1000" selected>1s</option>
<option value="2000">2s</option>
<option value="5000">5s</option>
<option value="10000">10s</option>
</select>
<button id="manualRefresh" style="width: auto; margin-top: 0;">Refresh</button>
</div>
</div>
<div class="tabs">
<div class="tab active" data-tab="console">Console</div>
<div class="tab" data-tab="snapshots">Snapshots</div>
</div>
<div class="tab-content active" id="console-tab">
<div class="console">
<div class="console-title">
<span>Agent Output</span>
<button class="clear-logs" id="clearLogsButton">Clear</button>
</div>
<div class="console-output" id="consoleOutput"></div>
</div>
</div>
<div class="tab-content" id="snapshots-tab">
<div class="snapshots">
<div class="snapshots-title">
<span>Snapshots</span>
<button class="refresh-snapshots" id="refreshSnapshotsButton">Refresh</button>
</div>
<div class="snapshots-list" id="snapshotsList"></div>
<div class="snapshot-actions">
<button class="snapshot-action" id="loadSnapshotButton" disabled>Load Selected</button>
<button class="snapshot-action" id="viewSnapshotButton" disabled>View Details</button>
</div>
<div class="info-panel" id="snapshotInfoPanel">
Select a snapshot to view details
</div>
</div>
</div>
</div>
<div class="main-view">
<div style="display: flex; justify-content: flex-end; padding: 5px; background-color: #333;">
<button id="reloadVncButton" style="margin: 0; padding: 5px 10px; width: auto; font-size: 12px; background-color: #666;">
🔄 Reload VNC
</button>
</div>
<iframe id="gameFrame" class="game-frame" src="about:blank"></iframe>
</div>
</div>
<script>
let agentRunning = false;
let logPollingInterval = null;
let logPosition = 0;
let refreshRate = 1000; // Default refresh rate in milliseconds
let autoRefreshEnabled = true;
let currentSnapshotId = null;
let selectedSnapshotId = null;
let snapshots = []; // Store snapshot data
let currentVncUrl = null;
// Tab switching
document.querySelectorAll('.tab').forEach(tab => {
tab.addEventListener('click', function() {
// Remove active class from all tabs and contents
document.querySelectorAll('.tab').forEach(t => t.classList.remove('active'));
document.querySelectorAll('.tab-content').forEach(c => c.classList.remove('active'));
// Add active class to clicked tab and corresponding content
this.classList.add('active');
document.getElementById(this.dataset.tab + '-tab').classList.add('active');
});
});
// Handle refresh control changes
document.getElementById('autoRefresh').addEventListener('change', function(e) {
autoRefreshEnabled = e.target.checked;
if (autoRefreshEnabled && agentRunning) {
startLogPolling();
} else {
stopLogPolling();
}
});
document.getElementById('refreshRate').addEventListener('change', function(e) {
refreshRate = parseInt(e.target.value);
if (logPollingInterval) {
stopLogPolling();
if (autoRefreshEnabled && agentRunning) {
startLogPolling();
}
}
});
document.getElementById('manualRefresh').addEventListener('click', function() {
fetchLogs();
});
// Form submission handler
document.getElementById('agentForm').addEventListener('submit', async function(e) {
e.preventDefault();
if (agentRunning) return;
const snapshotId = document.getElementById('snapshotId').value;
const steps = document.getElementById('steps').value;
// Store as the current snapshot ID
currentSnapshotId = snapshotId;
// Update UI
document.getElementById('statusDisplay').textContent = "Status: Starting agent...";
document.getElementById('startButton').disabled = true;
document.getElementById('stopButton').disabled = false;
agentRunning = true;
try {
// Start the agent
const response = await fetch('/start', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
snapshotId,
steps
})
});
const result = await response.json();
if (result.success) {
document.getElementById('statusDisplay').textContent = "Status: Agent running";
// Reset the log position counter
logPosition = 0;
// Start polling for logs
startLogPolling();
// Also fetch snapshots after a short delay to let them start being created
setTimeout(fetchSnapshots, 5000);
} else {
document.getElementById('statusDisplay').textContent = "Status: Error - " + result.error;
resetAgentState();
}
} catch (error) {
document.getElementById('statusDisplay').textContent = "Status: Connection error";
console.error(error);
resetAgentState();
}
});
// Stop button handler
document.getElementById('stopButton').addEventListener('click', async function() {
if (!agentRunning) return;
document.getElementById('statusDisplay').textContent = "Status: Stopping agent...";
try {
const response = await fetch('/stop', {
method: 'POST'
});
const result = await response.json();
if (result.success) {
document.getElementById('statusDisplay').textContent = "Status: Agent stopped";
// Fetch snapshots one more time to ensure we have the final ones
fetchSnapshots();
} else {
document.getElementById('statusDisplay').textContent = "Status: Failed to stop agent";
}
} catch (error) {
document.getElementById('statusDisplay').textContent = "Status: Connection error";
console.error(error);
}
resetAgentState();
});
// Clear logs button handler
document.getElementById('clearLogsButton').addEventListener('click', function() {
document.getElementById('consoleOutput').textContent = '';
});
// Refresh snapshots button handler
document.getElementById('refreshSnapshotsButton').addEventListener('click', function() {
fetchSnapshots();
});
// Load snapshot button handler
document.getElementById('loadSnapshotButton').addEventListener('click', async function() {
if (!selectedSnapshotId) return;
// Set the selected snapshot as the current one in the form
document.getElementById('snapshotId').value = selectedSnapshotId;
// Highlight the selection as current
updateSnapshotDisplay();
// Display a message
document.getElementById('snapshotInfoPanel').textContent =
`Snapshot ${selectedSnapshotId} loaded into form. Click 'Start Agent' to begin from this snapshot.`;
});
// View snapshot details button handler
document.getElementById('viewSnapshotButton').addEventListener('click', function() {
if (!selectedSnapshotId) return;
// Find the selected snapshot
const snapshot = snapshots.find(s => s.id === selectedSnapshotId);
if (!snapshot) return;
// Display detailed information
const infoPanel = document.getElementById('snapshotInfoPanel');
let details = `Snapshot: ${snapshot.name}\n`;
details += `ID: ${snapshot.id}\n`;
details += `Created: ${new Date(snapshot.created * 1000).toLocaleString()}\n\n`;
details += `Lineage:\n`;
details += `- Parent: ${snapshot.metadata?.parent_snapshot || 'None'}\n`;
details += `- Previous: ${snapshot.metadata?.prev_snapshot || 'None'}\n`;
details += `- Step: ${snapshot.metadata?.step_number || 'Unknown'}\n`;
details += `- Dashboard Run: ${snapshot.metadata?.dashboard_run_id || 'None'}\n`;
infoPanel.textContent = details;
});
// Helper function to reset UI state
function resetAgentState() {
agentRunning = false;
document.getElementById('startButton').disabled = false;
document.getElementById('stopButton').disabled = true;
// Stop polling for logs
stopLogPolling();
}
// Snapshot selection handler
function handleSnapshotClick(snapshotId) {
selectedSnapshotId = snapshotId;
updateSnapshotDisplay();
// Enable action buttons
document.getElementById('loadSnapshotButton').disabled = false;
document.getElementById('viewSnapshotButton').disabled = false;
// Show basic info
const snapshot = snapshots.find(s => s.id === snapshotId);
if (snapshot) {
const infoPanel = document.getElementById('snapshotInfoPanel');
infoPanel.textContent = `Selected: ${snapshot.name} (${snapshot.id})\nStep: ${snapshot.metadata?.step_number || 'Unknown'}\nRun: ${snapshot.metadata?.dashboard_run_id ? snapshot.metadata.dashboard_run_id.substring(0, 8) + '...' : 'None'}\n\nClick 'View Details' for more information.`;
}
}
// Update snapshot display highlighting
function updateSnapshotDisplay() {
// Remove highlighting from all nodes
document.querySelectorAll('.snapshot-node').forEach(node => {
node.classList.remove('current');
});
// Add current class to current snapshot
if (currentSnapshotId) {
const currentNode = document.querySelector(`.snapshot-node[data-id="${currentSnapshotId}"]`);
if (currentNode) {
currentNode.classList.add('current');
}
}
}
// Start polling for new log entries
function startLogPolling() {
if (logPollingInterval) {
clearInterval(logPollingInterval);
}
if (autoRefreshEnabled) {
logPollingInterval = setInterval(fetchLogs, refreshRate);
}
}
// Stop polling for logs
function stopLogPolling() {
if (logPollingInterval) {
clearInterval(logPollingInterval);
logPollingInterval = null;
}
}
// Fetch new log entries from the server
async function fetchLogs() {
try {
const response = await fetch(`/logs?position=${logPosition}`);
const data = await response.json();
if (data.logs) {
appendLogs(data.logs);
logPosition = data.nextPosition;
}
// Check if the agent is still running
if (data.agentRunning === false && agentRunning) {
document.getElementById('statusDisplay').textContent = "Status: Agent finished";
resetAgentState();
// Fetch snapshots one more time to get final state
fetchSnapshots();
}
// Check for VNC URL
if (data.vncUrl && data.vncUrl !== 'null' && data.vncUrl !== '') {
const gameFrame = document.getElementById('gameFrame');
// Store the URL for reload button usage
currentVncUrl = data.vncUrl;
// Only update the iframe if it's not already showing this URL
if (gameFrame.src !== data.vncUrl) {
console.log("Setting game frame URL to: " + data.vncUrl);
gameFrame.src = data.vncUrl;
}
}
} catch (error) {
console.error("Error fetching logs:", error);
}
}
// Fetch snapshots from the server
async function fetchSnapshots() {
try {
const response = await fetch('/snapshots');
const data = await response.json();
if (data.snapshots) {
snapshots = data.snapshots;
renderSnapshots(data.snapshots);
}
} catch (error) {
console.error("Error fetching snapshots:", error);
}
}
// Render snapshots in the UI
function renderSnapshots(snapshotsList) {
const snapshotsContainer = document.getElementById('snapshotsList');
snapshotsContainer.innerHTML = '';
if (snapshotsList.length === 0) {
snapshotsContainer.textContent = 'No snapshots available for this session';
return;
}
// Organize snapshots by step
const snapshotsByStep = {};
// First, create the parent snapshot entry
snapshotsList.forEach(snapshot => {
const step = snapshot.metadata?.step_number ? parseInt(snapshot.metadata.step_number) : null;
if (!snapshotsByStep[step]) {
snapshotsByStep[step] = [];
}
snapshotsByStep[step].push(snapshot);
});
// Sort steps numerically
const sortedSteps = Object.keys(snapshotsByStep)
.filter(step => step !== 'null')
.map(step => parseInt(step))
.sort((a, b) => a - b);
// Add parent snapshots (those without step numbers) at the top
if (snapshotsByStep['null']) {
sortedSteps.unshift('null');
}
// Create nodes for each snapshot, grouped by step
sortedSteps.forEach(step => {
const group = snapshotsByStep[step];
group.forEach(snapshot => {
const node = document.createElement('div');
node.className = 'snapshot-node';
node.dataset.id = snapshot.id;
// Mark current snapshot
if (snapshot.id === currentSnapshotId) {
node.classList.add('current');
}
// Create snapshot content
let nodeHtml = '';
// Title (step number or parent)
if (step === 'null') {
nodeHtml += `<div class="title">Parent</div>`;
} else {
nodeHtml += `<div class="title">Step ${step}</div>`;
}
// ID
nodeHtml += `<div class="id">${snapshot.id}</div>`;
// Metadata
const metadata = snapshot.metadata || {};
let metadataText = [];
if (snapshot.name) {
metadataText.push(`Name: ${snapshot.name}`);
}
if (metadata.timestamp) {
const date = new Date(parseInt(metadata.timestamp) * 1000);
metadataText.push(`Time: ${date.toLocaleTimeString()}`);
}
if (metadata.dashboard_run_id) {
metadataText.push(`Run: ${metadata.dashboard_run_id.substring(0, 8)}...`);
}
nodeHtml += `<div class="metadata">${metadataText.join(' | ')}</div>`;
node.innerHTML = nodeHtml;
// Add click handler
node.addEventListener('click', () => handleSnapshotClick(snapshot.id));
snapshotsContainer.appendChild(node);
});
});
}
// Append new log entries to the console output
function appendLogs(logs) {
if (!logs || logs.length === 0) return;
const consoleOutput = document.getElementById('consoleOutput');
// Add each log line
logs.forEach(line => {
// Create a new div for each line
const logLine = document.createElement('div');
logLine.textContent = line;
// Add color based on log content
if (line.includes('[ERROR]') || line.includes('Error')) {
logLine.style.color = '#ff5252';
} else if (line.includes('[WARNING]') || line.includes('Warning')) {
logLine.style.color = '#ffb142';
} else if (line.includes('[Claude]')) {
logLine.style.color = '#4fc3f7';
} else if (line.includes('[Tool Use]') || line.includes('[Claude Action]')) {
logLine.style.color = '#66bb6a';
} else if (line.includes('Snapshot created')) {
logLine.style.color = '#ba68c8';
}
consoleOutput.appendChild(logLine);
});
// Auto-scroll to bottom
consoleOutput.scrollTop = consoleOutput.scrollHeight;
}
// Fetch snapshots periodically when agent is running
if (autoRefreshEnabled) {
setInterval(() => {
if (agentRunning) {
fetchSnapshots();
}
}, 5000); // Check for new snapshots every 5 seconds
}
// Reload VNC button handler
document.getElementById('reloadVncButton').addEventListener('click', function() {
const gameFrame = document.getElementById('gameFrame');
// Store the current URL
if (gameFrame.src && gameFrame.src !== 'about:blank') {
currentVncUrl = gameFrame.src;
}
// If we have a VNC URL, reload it
if (currentVncUrl) {
console.log("Reloading VNC iframe with URL: " + currentVncUrl);
// First clear the frame
gameFrame.src = 'about:blank';
// Then after a brief delay, set it back to the VNC URL
setTimeout(() => {
gameFrame.src = currentVncUrl;
}, 500);
// Log to console
const timestamp = new Date().toLocaleTimeString();
const consoleOutput = document.getElementById('consoleOutput');
const logLine = document.createElement('div');
logLine.textContent = `[${timestamp}] VNC display manually reloaded`;
logLine.style.color = '#ffb142';
consoleOutput.appendChild(logLine);
consoleOutput.scrollTop = consoleOutput.scrollHeight;
} else {
console.log("No VNC URL available to reload");
}
});
</script>
</body>
</html>
"""
# Initialize Flask app
app = Flask(__name__)
# Global variables for agent state
agent_process = None
agent_logs = deque(maxlen=1000) # Store up to 1000 log lines
log_lock = threading.Lock()
agent_running = False
vnc_url = None
parent_snapshot_id = None
morph_client = None
def extract_vnc_url(line):
"""Extract VNC URL from log line"""
match = re.search(r"(https://novnc-[^\s]*\.http\.cloud\.morph\.so[^\s]*)", line)
if match:
return match.group(1)
return None
def extract_snapshot_id(line):
"""Extract snapshot ID from log line"""
match = re.search(r"Snapshot created with ID: ([a-zA-Z0-9_]+)", line)
if match:
return match.group(1)
return None
def log_reader(process):
"""Read logs from the process stdout/stderr in real-time"""
global agent_logs, agent_running, vnc_url
for line in iter(process.stdout.readline, b''):
try:
decoded_line = line.decode('utf-8').rstrip()
# Check if this line contains the VNC URL
extracted_url = extract_vnc_url(decoded_line)
if extracted_url:
print(f"Found VNC URL: {extracted_url}")
vnc_url = extracted_url
# Add timestamp to the log line
timestamp = time.strftime("%H:%M:%S", time.localtime())
log_line = f"[{timestamp}] {decoded_line}"
# Add to log buffer with thread safety
with log_lock:
agent_logs.append(log_line)
print(log_line)
except Exception as e:
print(f"Error processing log line: {e}")
# Process has ended
with log_lock:
agent_logs.append(f"[{time.strftime('%H:%M:%S', time.localtime())}] Agent process terminated")
agent_running = False
def initialize_morph_client():
"""Initialize MorphCloud client if possible"""
global morph_client
if MorphCloudClient is not None:
try:
morph_client = MorphCloudClient()
print("MorphCloud client initialized successfully")
return True
except Exception as e:
print(f"Error initializing MorphCloud client: {e}")
return False
@app.route('/')
def index():
"""Serve the main page"""
return HTML_TEMPLATE
@app.route('/logs')
def get_logs():
"""Get new log entries since the given position"""
global agent_logs, vnc_url
position = int(request.args.get('position', 0))
with log_lock:
# Convert deque to list for easier slicing
all_logs = list(agent_logs)
# Get logs from the requested position
if position < len(all_logs):
new_logs = all_logs[position:]
next_position = len(all_logs)
else:
new_logs = []
next_position = position
return jsonify({
"logs": new_logs,
"nextPosition": next_position,
"agentRunning": agent_running,
"vncUrl": vnc_url
})
@app.route('/snapshots')
def get_snapshots():
"""Get snapshots for the current session"""
global morph_client, parent_snapshot_id
if morph_client is None:
return jsonify({"snapshots": [], "error": "MorphCloud client not available"})
if parent_snapshot_id is None:
return jsonify({"snapshots": [], "message": "No parent snapshot set for this session"})
try:
# Get snapshots that have our dashboard run ID in metadata
# Extract the original snapshot ID if we're using it for tracking
snapshots = morph_client.snapshots.list(metadata={"dashboard_run_id": parent_snapshot_id})
# Convert to dictionaries for JSON serialization
snapshot_dicts = []
for snapshot in snapshots:
snapshot_dict = {
"id": snapshot.id,
"name": getattr(snapshot, 'name', None),
"created": getattr(snapshot, 'created', 0),
"metadata": getattr(snapshot, 'metadata', {})
}
snapshot_dicts.append(snapshot_dict)
return jsonify({"snapshots": snapshot_dicts})
except Exception as e:
print(f"Error fetching snapshots: {e}")
return jsonify({"snapshots": [], "error": str(e)})
@app.route('/start', methods=['POST'])
def start_agent():
"""Start the Pokemon agent"""
global agent_process, agent_running, agent_logs, vnc_url, parent_snapshot_id
# Check if agent is already running
if agent_running:
return jsonify({"success": False, "error": "Agent is already running"})
try:
data = request.json
snapshot_id = data.get('snapshotId')
steps = data.get('steps', 10)
# Always create a new run ID for each agent start
# This ensures previous snapshots don't appear in the current run's view
run_timestamp = int(time.time())
parent_snapshot_id = f"{snapshot_id}_{run_timestamp}"
print(f"Setting new run ID for this session: {parent_snapshot_id}")
# Clear previous logs
with log_lock:
agent_logs.clear()
vnc_url = None
# Check if the agent script exists
if not os.path.exists("minimal_agent.py"):
return jsonify({
"success": False,
"error": "minimal_agent.py not found. Please ensure the agent file is in the current directory."
})
# Extract the original snapshot ID if we're using a combined run ID
actual_snapshot_id = snapshot_id
actual_parent_id = parent_snapshot_id
# If parent_snapshot_id contains a timestamp, extract the actual snapshot ID
if '_' in parent_snapshot_id:
parts = parent_snapshot_id.split('_')
if len(parts) >= 2: # Ensure it has the expected format
actual_parent_id = parts[0] # First part is the actual snapshot ID
# Build command
cmd = [
sys.executable,
"minimal_agent.py",
"--snapshot-id", actual_snapshot_id,
"--steps", str(steps),
"--no-browser", # Suppress browser auto-open since we're using the dashboard
"--parent-snapshot-id", actual_parent_id, # The actual snapshot for lineage
"--dashboard-run-id", parent_snapshot_id, # The combined run ID for filtering
"--snapshot-prefix", f"dash_{int(time.time())}"
]
# Start the process with pipes for stdout/stderr
print(f"Starting agent with command: {' '.join(cmd)}")
agent_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
bufsize=1,
universal_newlines=False
)
agent_running = True
# Start a thread to read the logs
log_thread = threading.Thread(target=log_reader, args=(agent_process,))
log_thread.daemon = True
log_thread.start()
# Add initial log entry
with log_lock:
timestamp = time.strftime("%H:%M:%S", time.localtime())
agent_logs.append(f"[{timestamp}] Started agent with snapshot {snapshot_id} for {steps} steps")
agent_logs.append(f"[{timestamp}] Using parent snapshot {parent_snapshot_id} for lineage tracking")
agent_logs.append(f"[{timestamp}] All snapshots will be tagged with dashboard_run_id={parent_snapshot_id}")
return jsonify({
"success": True,
"message": "Agent started"
})
except Exception as e:
print(f"Error starting agent: {e}")
agent_running = False
return jsonify({"success": False, "error": str(e)})
@app.route('/stop', methods=['POST'])
def stop_agent():
global agent_process, agent_running
# Wrap everything in try/except to prevent server crashes
try:
if not agent_running or agent_process is None:
return jsonify({"success": False, "error": "No agent is running"})
# Log that we're attempting to stop
print(f"Attempting to stop agent process (PID: {agent_process.pid})")
# Create a local reference to the process
process_to_stop = agent_process
# Clear global references first to avoid deadlocks
agent_running = False
agent_process = None
# Then terminate the process
try:
process_to_stop.terminate()
process_to_stop.wait(timeout=2)
except Exception as inner_e:
print(f"Error during graceful termination: {inner_e}")
try:
process_to_stop.kill()
except:
pass # Already dead or can't be killed
# Add log entry
with log_lock:
timestamp = time.strftime("%H:%M:%S", time.localtime())
agent_logs.append(f"[{timestamp}] Agent stopped")
return jsonify({
"success": True,
"message": "Agent stopped"
})
except Exception as e:
# Critical error handling - log it but don't crash
print(f"CRITICAL ERROR in stop_agent: {e}")
import traceback
traceback.print_exc()
# Reset state to be safe
agent_running = False
agent_process = None
# Always return a response
return jsonify({"success": False, "error": f"Server error: {str(e)}"})
def main():
"""Main function"""
print("Pokemon Agent Dashboard")
print("======================")
print("1. Make sure your minimal_agent.py file is in the current directory")
print("2. Opening browser to http://127.0.0.1:5001/")
print("3. Press Ctrl+C to stop the server")
# Initialize MorphCloud client
initialize_morph_client()
# Open browser automatically
webbrowser.open("http://127.0.0.1:5001/")
# Run the Flask app
app.run(host='0.0.0.0', port=5001, threaded=True)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
# /// script
# dependencies = [
# "morphcloud",
# "requests",
# "pillow",
# "rich",
# "anthropic",
# ]
# ///
"""Run a non-interactive server agent that plays Pokemon automatically.
This script combines the EmulatorClient and PokemonAgent to set up a basic agent.
"""
import io
import sys
import json
import copy
import typing
from typing import Dict, List, Optional, Any
import base64
import logging
import argparse
import time
import requests
import webbrowser
from PIL import Image
from anthropic import Anthropic
from rich.console import Console
from morphcloud.api import MorphCloudClient
# Set up logging - this will be configured properly in main() based on command line args
logger = logging.getLogger(__name__)
# Configuration
MAX_TOKENS = 4096
MODEL_NAME = "claude-3-7-sonnet-20250219"
TEMPERATURE = 0.7
USE_NAVIGATOR = True
class EmulatorClient:
def __init__(self, host: str = "127.0.0.1", port: int = 9876):
# Check if host already includes the protocol, if not add http://
if host.startswith("http://") or host.startswith("https://"):
# For MorphVM URLs, don't append port as it's handled by the URL routing
if "cloud.morph.so" in host or port is None:
self.base_url = host
# For other URLs, handle port as before
elif ":" not in host.split("/")[-1]:
self.base_url = f"{host}:{port}"
else:
# Host already has port, use it as is
self.base_url = host
else:
# For MorphVM URLs, don't append port
if "cloud.morph.so" in host:
self.base_url = f"https://{host}"
else:
self.base_url = f"http://{host}:{port}"
logger.info(f"Initialized client connecting to {self.base_url}")
def get_screenshot(self):
"""Get current screenshot as PIL Image"""
response = requests.get(f"{self.base_url}/api/screenshot")
if response.status_code != 200:
logger.error(f"Error getting screenshot: {response.status_code}")
return None
return Image.open(io.BytesIO(response.content))
def get_screenshot_base64(self):
"""Get current screenshot as base64 string"""
response = requests.get(f"{self.base_url}/api/screenshot")
if response.status_code != 200:
logger.error(f"Error getting screenshot: {response.status_code}")
return ""
return base64.b64encode(response.content).decode("utf-8")
def get_game_state(self):
"""Get complete game state from server"""
response = requests.get(f"{self.base_url}/api/game_state")
if response.status_code != 200:
logger.error(
f"Error response from server: {response.status_code} - {response.text}"
)
return {}
try:
return response.json()
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {e}")
logger.error(f"Response content: {response.text[:100]}...")
return {}
# Compatibility methods to match Emulator interface
def get_state_from_memory(self):
"""Get game state string - mimics Emulator.get_state_from_memory()"""
state_data = self.get_game_state()
return state_data.get("game_state", "")
def get_collision_map(self):
"""Get collision map - mimics Emulator.get_collision_map()"""
state_data = self.get_game_state()
return state_data.get("collision_map", "")
def get_valid_moves(self):
"""Get valid moves - mimics Emulator.get_valid_moves()"""
state_data = self.get_game_state()
return state_data.get("valid_moves", [])
def find_path(self, row, col):
"""Find path to position - mimics Emulator.find_path()"""
result = self.navigate(row, col)
if not isinstance(result, dict):
return "Failed to navigate", []
return result.get("status", "Navigation failed"), result.get("path", [])
def press_buttons(
self, buttons, wait=True, include_state=False, include_screenshot=False
):
"""Press a sequence of buttons on the Game Boy
Args:
buttons: List of buttons to press
wait: Whether to pause briefly after each button press
include_state: Whether to include game state in response
include_screenshot: Whether to include screenshot in response
Returns:
dict: Response data which may include button press result, game state, and screenshot
"""
data = {
"buttons": buttons,
"wait": wait,
"include_state": include_state,
"include_screenshot": include_screenshot,
}
response = requests.post(f"{self.base_url}/api/press_buttons", json=data)
if response.status_code != 200:
logger.error(
f"Error pressing buttons: {response.status_code} - {response.text}"
)
return {"error": f"Error: {response.status_code}"}
return response.json()
def navigate(self, row, col, include_state=False, include_screenshot=False):
"""Navigate to a specific position on the grid
Args:
row: Target row coordinate
col: Target column coordinate
include_state: Whether to include game state in response
include_screenshot: Whether to include screenshot in response
Returns:
dict: Response data which may include navigation result, game state, and screenshot
"""
data = {
"row": row,
"col": col,
"include_state": include_state,
"include_screenshot": include_screenshot,
}
response = requests.post(f"{self.base_url}/api/navigate", json=data)
if response.status_code != 200:
logger.error(f"Error navigating: {response.status_code} - {response.text}")
return {"status": f"Error: {response.status_code}", "path": []}
return response.json()
def read_memory(self, address):
"""Read a specific memory address"""
response = requests.get(f"{self.base_url}/api/memory/{address}")
if response.status_code != 200:
logger.error(
f"Error reading memory: {response.status_code} - {response.text}"
)
return {"error": f"Error: {response.status_code}"}
return response.json()
def load_state(self, state_path):
"""Load a saved state"""
data = {"state_path": state_path}
response = requests.post(f"{self.base_url}/api/load_state", json=data)
if response.status_code != 200:
logger.error(
f"Error loading state: {response.status_code} - {response.text}"
)
return {"error": f"Error: {response.status_code}"}
return response.json()
def save_screenshot(self, filename="screenshot.png"):
"""Save current screenshot to a file"""
screenshot = self.get_screenshot()
if screenshot:
screenshot.save(filename)
logger.info(f"Screenshot saved as {filename}")
return True
return False
def initialize(self, max_retries=5, retry_delay=3):
"""
Initialize method with retry capability for compatibility with Emulator
Args:
max_retries (int): Maximum number of retry attempts
retry_delay (int): Delay between retries in seconds
Returns:
bool: True if server is ready, False otherwise
"""
logger.info(f"Client initialization requested (compatibility method) with {max_retries} retries")
# Implement retry logic
for attempt in range(1, max_retries + 1):
try:
logger.info(f"Checking server status (attempt {attempt}/{max_retries})")
response = requests.get(f"{self.base_url}/api/status", timeout=10)
status = response.json()
ready = status.get("ready", False)
if ready:
logger.info("Server reports ready status")
return True
else:
logger.warning(f"Server reports not ready (attempt {attempt}/{max_retries})")
# If not ready and we have more attempts, wait before trying again
if attempt < max_retries:
logger.info(f"Waiting {retry_delay} seconds before retry...")
time.sleep(retry_delay)
except requests.exceptions.Timeout:
logger.warning(f"Connection timeout (attempt {attempt}/{max_retries})")
if attempt < max_retries:
time.sleep(retry_delay)
except requests.exceptions.ConnectionError as e:
logger.warning(f"Connection error: {e} (attempt {attempt}/{max_retries})")
if attempt < max_retries:
time.sleep(retry_delay)
except Exception as e:
logger.error(f"Error checking server status: {e} (attempt {attempt}/{max_retries})")
if attempt < max_retries:
time.sleep(retry_delay)
logger.error(f"Server not ready after {max_retries} attempts")
return False
def stop(self):
"""Empty stop method for compatibility with Emulator"""
logger.info("Client stop requested (compatibility method)")
# Nothing to do for client
pass
def get_screenshot_base64(screenshot, upscale=1):
"""Convert PIL image to base64 string."""
# Resize if needed
if upscale > 1:
new_size = (screenshot.width * upscale, screenshot.height * upscale)
screenshot = screenshot.resize(new_size)
# Convert to base64
buffered = io.BytesIO()
screenshot.save(buffered, format="PNG")
return base64.standard_b64encode(buffered.getvalue()).decode()
class PokemonAgent:
def __init__(
self,
server_host="127.0.0.1",
server_port: typing.Optional[int] = 9876,
max_history=60,
display_config=None,
morph_client=None, # Add MorphCloudClient as a parameter
parent_snapshot_id=None, # Add parent snapshot ID parameter
dashboard_run_id=None, # Add dashboard run ID parameter
):
"""Initialize the server agent.
Args:
server_host: Host where the game server is running
server_port: Port number of the game server
max_history: Maximum number of messages in history before summarization
display_config: Dictionary with display configuration options
morph_client: Optional MorphCloudClient instance for snapshot creation
parent_snapshot_id: Optional ID of the parent snapshot for lineage tracking
dashboard_run_id: Optional ID for grouping snapshots by dashboard run
"""
self.client = EmulatorClient(host=server_host, port=server_port or 9876)
self.anthropic = Anthropic()
self.running = True
self.message_history = [
{"role": "user", "content": "You may now begin playing."}
]
self.max_history = max_history
# Store the MorphCloud client and snapshot tracking IDs
self.morph_client = morph_client
self.parent_snapshot_id = parent_snapshot_id
self.dashboard_run_id = dashboard_run_id or parent_snapshot_id # Use parent as fallback
self.last_snapshot_id = parent_snapshot_id # Track the last created snapshot ID
# Set display configuration with defaults
self.display_config = display_config or {
"show_game_state": False,
"show_collision_map": False,
"quiet_mode": False,
}
# Log initialization with chosen configuration
logger.debug(f"Agent initialized with display config: {self.display_config}")
if self.morph_client and self.parent_snapshot_id:
logger.info(f"Snapshot tracking enabled. Parent snapshot: {self.parent_snapshot_id}")
if self.dashboard_run_id:
logger.info(f"Dashboard run ID for grouping snapshots: {self.dashboard_run_id}")
# Check if the server is ready
if not self.client.initialize():
logger.error(
"Server not ready - please start the server before running the agent"
)
raise RuntimeError("Server not ready")
SYSTEM_PROMPT = """You are playing Pokemon Red. You can see the game screen and control the game by executing emulator commands.
Your goal is to play through Pokemon Red and eventually defeat the Elite Four. Make decisions based on what you see on the screen.
check your tools! for example, try to use 'navigate_to' to help you move faster and better when looking at the map, with positions from game state.
Before each action, explain your reasoning briefly, plan your immediate next few steps needed (low level tool calls and actions, e.g. 'to reach the Cave from here, I need to go 1. right, 2. right, 3. right, 4. up', not high level goals like '1. explore the Cave 2. ??? 3. win!') to get there, then use the available actions to execute the next step in the game.
The game commands always register perfectly, so if you see no reaction to them, you have made an invalid command and misunderstood the game state. In battles, when you see an attack that isn't effective, you should examine your assumptions and update your beliefs. In general, search the solution space (try different things) before getting stuck in ruts.
Mistakes you have made before:
- do not talk to NPCs
- do not plan with high level goals
- DON'T FIGHT ANY BATTLES IF YOU CAN HELP IT. IF YOU ENCOUNTER A WILD (non trainer) BATTLE - JUST RUN. JUST RUN.
- do not insist on your prior knowledge about what attacks are strong against what types of Pokemon works when the evidence is the opposite
The conversation history may occasionally be summarized to save context space. If you see a message labeled "CONVERSATION HISTORY SUMMARY", this contains the key information about your progress so far. Use this information to maintain continuity in your gameplay."""
SUMMARY_PROMPT = """I need you to create a detailed summary of our conversation history up to this point. This summary will replace the full conversation history to manage the context window.
Please include:
1. Key game events and milestones you've reached
2. Important decisions you've made
3. Current objectives or goals you're working toward
4. Your current location and Pokémon team status
5. Any strategies or plans you've mentioned
The summary should be comprehensive enough that you can continue gameplay without losing important context about what has happened so far."""
AVAILABLE_TOOLS = [
{
"name": "press_buttons",
"description": "Press a sequence of buttons on the Game Boy.",
"input_schema": {
"type": "object",
"properties": {
"buttons": {
"type": "array",
"items": {
"type": "string",
"enum": [
"a",
"b",
"start",
"select",
"up",
"down",
"left",
"right",
],
},
"description": "List of buttons to press in sequence. Valid buttons: 'a', 'b', 'start', 'select', 'up', 'down', 'left', 'right'",
},
"wait": {
"type": "boolean",
"description": "Whether to wait for a brief period after pressing each button. Defaults to true.",
},
},
"required": ["buttons"],
},
}
]
# Add navigation tool if enabled
if USE_NAVIGATOR:
AVAILABLE_TOOLS.append(
{
"name": "navigate_to",
"description": "Automatically navigate to a position on the map grid. The screen is divided into a 9x10 grid, with the top-left corner as (0, 0). This tool is only available in the overworld.",
"input_schema": {
"type": "object",
"properties": {
"row": {
"type": "integer",
"description": "The row coordinate to navigate to (0-8).",
},
"col": {
"type": "integer",
"description": "The column coordinate to navigate to (0-9).",
},
},
"required": ["row", "col"],
},
}
)
def process_tool_call(self, tool_call):
"""Process a single tool call."""
tool_name = tool_call.name
tool_input = tool_call.input
# In quiet mode, only log at debug level
if self.display_config["quiet_mode"]:
logger.debug(f"Processing tool call: {tool_name}")
else:
logger.info(f"Processing tool call: {tool_name}")
if tool_name == "press_buttons":
buttons = tool_input["buttons"]
wait = tool_input.get("wait", True)
# Log the button press action
if self.display_config["quiet_mode"]:
logger.debug(f"[Buttons] Pressing: {buttons} (wait={wait})")
else:
logger.info(f"[Buttons] Pressing: {buttons} (wait={wait})")
# Use enhanced client method to get result, state, and screenshot in one call
response = self.client.press_buttons(
buttons, wait=wait, include_state=True, include_screenshot=True
)
# Extract results from response
result = response.get("result", f"Pressed buttons: {', '.join(buttons)}")
# Get game state from response or fetch it if not included
if "game_state" in response:
memory_info = response["game_state"].get("game_state", "")
if self.display_config["show_game_state"]:
logger.info(f"[Memory State from response]")
logger.info(memory_info)
else:
logger.debug(f"[Memory State from response]")
logger.debug(memory_info)
collision_map = response["game_state"].get("collision_map", "")
if collision_map and self.display_config["show_collision_map"]:
logger.info(f"[Collision Map from response]\n{collision_map}")
elif collision_map:
logger.debug(f"[Collision Map from response]\n{collision_map}")
else:
# Fallback to separate calls if state not included
memory_info = self.client.get_state_from_memory()
if self.display_config["show_game_state"]:
logger.info(f"[Memory State after action]")
logger.info(memory_info)
else:
logger.debug(f"[Memory State after action]")
logger.debug(memory_info)
collision_map = self.client.get_collision_map()
if collision_map and self.display_config["show_collision_map"]:
logger.info(f"[Collision Map after action]\n{collision_map}")
elif collision_map:
logger.debug(f"[Collision Map after action]\n{collision_map}")
# Get screenshot from response or fetch it if not included
if "screenshot" in response:
screenshot_b64 = response["screenshot"]
else:
screenshot = self.client.get_screenshot()
screenshot_b64 = get_screenshot_base64(screenshot, upscale=2)
# Build response content based on display configuration
content = [
{"type": "text", "text": f"Pressed buttons: {', '.join(buttons)}"},
{
"type": "text",
"text": "\nHere is a screenshot of the screen after your button presses:",
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": screenshot_b64,
},
},
]
# Add game state to Claude's view if enabled
content.append(
{
"type": "text",
"text": f"\nGame state information from memory after your action:\n{memory_info}",
}
)
# Return tool result as a dictionary
return {
"type": "tool_result",
"tool_use_id": tool_call.id,
"content": content,
}
elif tool_name == "navigate_to":
row = tool_input["row"]
col = tool_input["col"]
# Log the navigation action
if self.display_config["quiet_mode"]:
logger.debug(f"[Navigation] Navigating to: ({row}, {col})")
else:
logger.info(f"[Navigation] Navigating to: ({row}, {col})")
# Use enhanced client method to get result, state, and screenshot in one call
response = self.client.navigate(
row, col, include_state=True, include_screenshot=True
)
# Extract navigation result
status = response.get("status", "Unknown status")
path = response.get("path", [])
if path:
result = f"Navigation successful: followed path with {len(path)} steps"
else:
result = f"Navigation failed: {status}"
# Get game state from response or fetch it if not included
if "game_state" in response:
memory_info = response["game_state"].get("game_state", "")
if self.display_config["show_game_state"]:
logger.info(f"[Memory State from response]")
logger.info(memory_info)
else:
logger.debug(f"[Memory State from response]")
logger.debug(memory_info)
collision_map = response["game_state"].get("collision_map", "")
if collision_map and self.display_config["show_collision_map"]:
logger.info(f"[Collision Map from response]\n{collision_map}")
elif collision_map:
logger.debug(f"[Collision Map from response]\n{collision_map}")
else:
# Fallback to separate calls if state not included
memory_info = self.client.get_state_from_memory()
if self.display_config["show_game_state"]:
logger.info(f"[Memory State after action]")
logger.info(memory_info)
else:
logger.debug(f"[Memory State after action]")
logger.debug(memory_info)
collision_map = self.client.get_collision_map()
if collision_map and self.display_config["show_collision_map"]:
logger.info(f"[Collision Map after action]\n{collision_map}")
elif collision_map:
logger.debug(f"[Collision Map after action]\n{collision_map}")
# Get screenshot from response or fetch it if not included
if "screenshot" in response:
screenshot_b64 = response["screenshot"]
else:
screenshot = self.client.get_screenshot()
screenshot_b64 = get_screenshot_base64(screenshot, upscale=2)
# Build response content based on display configuration
content = [
{"type": "text", "text": f"Navigation result: {result}"},
{
"type": "text",
"text": "\nHere is a screenshot of the screen after navigation:",
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": screenshot_b64,
},
},
]
# Add game state to Claude's view if enabled
content.append(
{
"type": "text",
"text": f"\nGame state information from memory after your action:\n{memory_info}",
}
)
# Return tool result as a dictionary
return {
"type": "tool_result",
"tool_use_id": tool_call.id,
"content": content,
}
else:
logger.error(f"Unknown tool called: {tool_name}")
return {
"type": "tool_result",
"tool_use_id": tool_call.id,
"content": [
{"type": "text", "text": f"Error: Unknown tool '{tool_name}'"}
],
}
async def brainstorm_action_sequences(self) -> List[str]:
"""Use Claude to brainstorm different action sequences to try."""
game_state = self.client.get_game_state()
prompt = f"""Current game state:
{game_state}
Brainstorm 3 different strategic approaches for what to do next in Pokemon Red:
1. A cautious exploration-focused approach
2. A progress-focused approach
3. A resource/preparation-focused approach
For each approach, describe the specific next actions to take in 2-3 sentences.
Be concrete and specific about the immediate next steps, not high-level goals.
"""
response = await self.anthropic.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=1000,
messages=[{
"role": "user",
"content": prompt
}]
)
return [response.content[0].text] # Return as a list of strategies
async def evaluate_action_branches(self, instance_id: str, steps_per_branch: int = 5) -> dict:
"""
Create and evaluate multiple branches with different action sequences.
Args:
instance_id: ID of the current instance to branch from
steps_per_branch: Number of steps to simulate for each branch
Returns:
Dictionary containing evaluation results for each branch
"""
# Get brainstormed strategies from Claude
strategies = await self.brainstorm_action_sequences()
# Create branches using MorphCloud
snapshot, clones = self.morph_client.instances.get(instance_id).branch(count=len(strategies))
logger.info(f"Created snapshot {snapshot.id} with {len(clones)} branches")
results = {}
# For each branch, create a new agent instance and let it play through the strategy
for clone, strategy in zip(clones, strategies):
# Create a new agent instance for this branch
branch_agent = PokemonAgent(
server_host=self.client.base_url,
morph_client=self.morph_client,
parent_snapshot_id=clone.id
)
# Initialize the branch agent with the strategy
branch_agent.message_history = [
{"role": "user", "content": f"You are exploring this strategic approach: {strategy}. Take actions aligned with this strategy."}
]
# Let the branch agent play for the specified number of steps
branch_result = await self._evaluate_single_branch(clone.id, branch_agent, steps_per_branch)
results[clone.id] = branch_result
# Clean up the branch agent
branch_agent.stop()
return {
"snapshot_id": snapshot.id,
"branch_results": results
}
async def _evaluate_single_branch(self, instance_id: str, branch_agent: 'PokemonAgent', num_steps: int) -> dict:
"""Evaluate a single branch by letting an agent play through it."""
game_states = []
actions_taken = []
# Let the branch agent play for the specified number of steps
for _ in range(num_steps):
if not branch_agent.running:
break
# Process one step and capture the game state
await branch_agent.process_next_step()
game_state = branch_agent.client.get_game_state()
game_states.append(game_state)
# Record the last action taken (from message history)
if branch_agent.message_history:
last_message = branch_agent.message_history[-1]
if last_message["role"] == "assistant":
actions_taken.append(str(last_message["content"]))
# Get Claude's assessment
assessment = await self._get_branch_assessment(game_states)
return {
"instance_id": instance_id,
"actions_taken": actions_taken,
"game_states": game_states,
"claude_assessment": assessment
}
async def _get_branch_assessment(self, game_states: List[str]) -> str:
"""Get Claude's assessment of the branch outcome."""
prompt = f"""Analyze this sequence of Pokemon game states and actions:
{chr(10).join(game_states)}
Provide a brief assessment of:
1. The effectiveness of this action sequence
2. Any risks or opportunities identified
3. Whether this branch seems promising to pursue
"""
response = await self.anthropic.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=300,
messages=[{
"role": "user",
"content": prompt
}]
)
return response.content[0].text
async def run_with_branching(self, num_steps=1000, instance_id=None, snapshot_name_prefix=None, branching_interval=20):
"""
Run the agent with periodic branching to explore different strategies.
Args:
num_steps: Total number of steps to run
instance_id: ID of the current instance to branch from
snapshot_name_prefix: Prefix for naming snapshots
branching_interval: Number of steps between branching evaluations
"""
steps_taken = 0
while steps_taken < num_steps and self.running:
# Run normal steps for a while
for _ in range(branching_interval):
if not self.running or steps_taken >= num_steps:
break
await self.process_next_step()
steps_taken += 1
if self.running and steps_taken < num_steps:
# Evaluate branches to find the best path forward
branch_results = await self.evaluate_action_branches(instance_id, steps_per_branch=5)
# Log branch evaluations
logger.info(f"Branch evaluations at step {steps_taken}:")
for branch_id, result in branch_results["branch_results"].items():
logger.info(f"\nBranch {branch_id}:")
logger.info(f"Assessment: {result['claude_assessment']}")
# Could add logic here to automatically select and switch to most promising branch
return steps_taken
def process_next_step(self):
"""Process the next step in the agent loop."""
messages = copy.deepcopy(self.message_history)
if len(messages) >= 3:
if (
messages[-1]["role"] == "user"
and isinstance(messages[-1]["content"], list)
and messages[-1]["content"]
):
messages[-1]["content"][-1]["cache_control"] = {
"type": "ephemeral"
}
if (
len(messages) >= 5
and messages[-3]["role"] == "user"
and isinstance(messages[-3]["content"], list)
and messages[-3]["content"]
):
messages[-3]["content"][-1]["cache_control"] = {
"type": "ephemeral"
}
# Get model response
response = self.anthropic.messages.create(
model=MODEL_NAME,
max_tokens=MAX_TOKENS,
system=self.SYSTEM_PROMPT,
messages=messages,
tools=self.AVAILABLE_TOOLS,
temperature=TEMPERATURE,
)
# Log token usage
if self.display_config["quiet_mode"]:
logger.debug(f"Response usage: {response.usage}")
else:
logger.info(f"Response usage: {response.usage}")
# Extract tool calls
tool_calls = [
block for block in response.content if block.type == "tool_use"
]
# Display the model's reasoning
for block in response.content:
if block.type == "text":
# Claude's thoughts should always be visible, even in quiet mode
logger.info(f"[Claude] {block.text}")
elif block.type == "tool_use":
# Tool calls should be visible at info level by default
if self.display_config["quiet_mode"]:
logger.info(
f"[Claude Action] Using tool: {block.name} with input: {block.input}"
)
else:
logger.info(
f"[Tool Use] {block.name} with input: {block.input}"
)
# Process tool calls
if tool_calls:
# Add assistant message to history
assistant_content = []
for block in response.content:
if block.type == "text":
assistant_content.append(
{"type": "text", "text": block.text}
)
elif block.type == "tool_use":
assistant_content.append(
{"type": "tool_use", **dict(block)}
)
self.message_history.append(
{"role": "assistant", "content": assistant_content}
)
# Process tool calls and create tool results
tool_results = []
for tool_call in tool_calls:
tool_result = self.process_tool_call(tool_call)
tool_results.append(tool_result)
# Add tool results to message history
self.message_history.append(
{"role": "user", "content": tool_results}
)
# Check if we need to summarize the history
if len(self.message_history) >= self.max_history:
self.summarize_history()
def run(self, num_steps=1000, instance_id=None, snapshot_name_prefix=None):
"""Main agent loop.
Args:
num_steps: Number of steps to run for
instance_id: ID of the current instance for snapshot creation
snapshot_name_prefix: Prefix for naming snapshots
"""
if self.display_config["quiet_mode"]:
logger.debug(f"Starting agent loop for {num_steps} steps")
else:
logger.info(f"Starting agent loop for {num_steps} steps")
steps_completed = 0
snapshots = []
while self.running and steps_completed < num_steps:
try:
messages = copy.deepcopy(self.message_history)
if len(messages) >= 3:
if (
messages[-1]["role"] == "user"
and isinstance(messages[-1]["content"], list)
and messages[-1]["content"]
):
messages[-1]["content"][-1]["cache_control"] = {
"type": "ephemeral"
}
if (
len(messages) >= 5
and messages[-3]["role"] == "user"
and isinstance(messages[-3]["content"], list)
and messages[-3]["content"]
):
messages[-3]["content"][-1]["cache_control"] = {
"type": "ephemeral"
}
# Get model response
response = self.anthropic.messages.create(
model=MODEL_NAME,
max_tokens=MAX_TOKENS,
system=self.SYSTEM_PROMPT,
messages=messages,
tools=self.AVAILABLE_TOOLS,
temperature=TEMPERATURE,
)
# Log token usage
if self.display_config["quiet_mode"]:
logger.debug(f"Response usage: {response.usage}")
else:
logger.info(f"Response usage: {response.usage}")
# Extract tool calls
tool_calls = [
block for block in response.content if block.type == "tool_use"
]
# Display the model's reasoning
for block in response.content:
if block.type == "text":
# Claude's thoughts should always be visible, even in quiet mode
logger.info(f"[Claude] {block.text}")
elif block.type == "tool_use":
# Tool calls should be visible at info level by default
if self.display_config["quiet_mode"]:
logger.info(
f"[Claude Action] Using tool: {block.name} with input: {block.input}"
)
else:
logger.info(
f"[Tool Use] {block.name} with input: {block.input}"
)
# Process tool calls
if tool_calls:
# Add assistant message to history
assistant_content = []
for block in response.content:
if block.type == "text":
assistant_content.append(
{"type": "text", "text": block.text}
)
elif block.type == "tool_use":
assistant_content.append(
{"type": "tool_use", **dict(block)}
)
self.message_history.append(
{"role": "assistant", "content": assistant_content}
)
# Process tool calls and create tool results
tool_results = []
for tool_call in tool_calls:
tool_result = self.process_tool_call(tool_call)
tool_results.append(tool_result)
# Add tool results to message history
self.message_history.append(
{"role": "user", "content": tool_results}
)
# Check if we need to summarize the history
if len(self.message_history) >= self.max_history:
self.summarize_history()
steps_completed += 1
if self.display_config["quiet_mode"]:
logger.debug(f"Completed step {steps_completed}/{num_steps}")
else:
logger.info(f"Completed step {steps_completed}/{num_steps}")
# Create a snapshot after each step if morph_client and instance_id are provided
if self.morph_client and instance_id:
step_num = steps_completed
snapshot_name = f"{snapshot_name_prefix}_step_{step_num}" if snapshot_name_prefix else f"pokemon_step_{step_num}"
logger.info(f"Creating snapshot after step {step_num}...")
try:
# Create metadata dictionary to track lineage
metadata = {
"step_number": str(step_num),
"timestamp": str(int(time.time())),
}
# Add parent_snapshot if we have one
if self.parent_snapshot_id:
metadata["parent_snapshot"] = self.parent_snapshot_id
# Add dashboard_run_id for filtering in dashboard
if self.dashboard_run_id:
metadata["dashboard_run_id"] = self.dashboard_run_id
# Add previous snapshot if we have one
if self.last_snapshot_id:
metadata["prev_snapshot"] = self.last_snapshot_id
# Create the snapshot with metadata
instance = self.morph_client.instances.get(instance_id)
snapshot = instance.snapshot()
snapshot.set_metadata(metadata)
# Update our last snapshot ID
self.last_snapshot_id = snapshot.id
logger.info(f"✅ Snapshot created with ID: {snapshot.id}")
logger.info(f" Metadata: parent={metadata.get('parent_snapshot', 'None')}, prev={metadata.get('prev_snapshot', 'None')}, step={step_num}, dashboard_run_id={metadata.get('dashboard_run_id', 'None')}")
# Keep track of all snapshots
snapshots.append({
'step': step_num,
'snapshot_id': snapshot.id,
'name': snapshot_name,
'metadata': metadata
})
except Exception as e:
logger.error(f"Failed to create snapshot: {e}")
except KeyboardInterrupt:
logger.info("Received keyboard interrupt, stopping")
self.running = False
except Exception as e:
logger.error(f"Error in agent loop: {e}")
logger.exception(e)
raise e
if not self.running:
self.client.stop()
return steps_completed, snapshots
def summarize_history(self):
"""Generate a summary of the conversation history and replace the history with just the summary."""
if self.display_config["quiet_mode"]:
logger.debug(f"[Agent] Generating conversation summary...")
else:
logger.info(f"[Agent] Generating conversation summary...")
# Get a new screenshot for the summary
screenshot = self.client.get_screenshot()
screenshot_b64 = get_screenshot_base64(screenshot, upscale=2)
# Create messages for the summarization request - pass the entire conversation history
messages = copy.deepcopy(self.message_history)
if len(messages) >= 3:
if (
messages[-1]["role"] == "user"
and isinstance(messages[-1]["content"], list)
and messages[-1]["content"]
):
messages[-1]["content"][-1]["cache_control"] = {"type": "ephemeral"}
if (
len(messages) >= 5
and messages[-3]["role"] == "user"
and isinstance(messages[-3]["content"], list)
and messages[-3]["content"]
):
messages[-3]["content"][-1]["cache_control"] = {"type": "ephemeral"}
messages += [
{
"role": "user",
"content": [
{
"type": "text",
"text": self.SUMMARY_PROMPT,
}
],
}
]
# Get summary from Claude
response = self.anthropic.messages.create(
model=MODEL_NAME,
max_tokens=MAX_TOKENS,
system=self.SYSTEM_PROMPT,
messages=messages,
temperature=TEMPERATURE,
)
# Extract the summary text
summary_text = " ".join(
[block.text for block in response.content if block.type == "text"]
)
# Log the summary - use info level even in quiet mode as it's important
logger.info(f"[Claude Summary] Game Progress Summary:")
logger.info(f"{summary_text}")
# Replace message history with just the summary
self.message_history = [
{
"role": "user",
"content": [
{
"type": "text",
"text": f"CONVERSATION HISTORY SUMMARY (representing {self.max_history} previous messages): {summary_text}",
},
{
"type": "text",
"text": "\n\nCurrent game screenshot for reference:",
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": screenshot_b64,
},
},
{
"type": "text",
"text": "You were just asked to summarize your playthrough so far, which is the summary you see above. You may now continue playing by selecting your next action.",
},
],
}
]
if self.display_config["quiet_mode"]:
logger.debug(f"[Agent] Message history condensed into summary.")
else:
logger.info(f"[Agent] Message history condensed into summary.")
def stop(self):
"""Stop the agent."""
self.running = False
self.client.stop()
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description="Run a Pokemon Game Server Agent")
parser.add_argument(
"--snapshot-id", type=str, required=True, help="Morph snapshot ID to run"
)
parser.add_argument(
"--api-key", type=str, help="Morph API key (defaults to MORPH_API_KEY env var)"
)
parser.add_argument(
"--steps", type=int, default=10, help="Number of steps to run (default: 10)"
)
parser.add_argument(
"--max-history",
type=int,
default=30,
help="Maximum history size before summarizing (default: 30)",
)
# Add parent snapshot tracking option
parser.add_argument(
"--parent-snapshot-id",
type=str,
help="Parent snapshot ID for lineage tracking (defaults to the starting snapshot-id)"
)
parser.add_argument(
"--dashboard-run-id",
type=str,
help="Dashboard run ID for grouping snapshots (defaults to parent-snapshot-id)"
)
parser.add_argument(
"--snapshot-prefix",
type=str,
default="pokemon",
help="Prefix for snapshot names (default: 'pokemon')"
)
# Add verbosity and display options
parser.add_argument(
"--verbose",
"-v",
action="count",
default=0,
help="Increase output verbosity (can be used multiple times, e.g. -vv)",
)
parser.add_argument(
"--show-game-state",
action="store_true",
help="Show full game state information in the logs",
)
parser.add_argument(
"--show-collision-map",
action="store_true",
help="Show collision map in the logs",
)
parser.add_argument(
"--log-file",
type=str,
help="Path to log file. If not provided, logs will only go to stderr",
)
parser.add_argument(
"--quiet",
"-q",
action="store_true",
help="Only show Claude's thoughts and actions, minimal logging",
)
parser.add_argument(
"--no-browser",
action="store_true",
help="Suppress auto-opening the browser to display the game",
)
return parser.parse_args()
def main():
args = parse_arguments()
# Configure logging based on command line arguments
log_handlers = []
# Set up console handler with formatting
console_handler = logging.StreamHandler()
if args.quiet:
console_format = "%(message)s" # Minimal format for quiet mode
else:
console_format = "%(asctime)s - %(levelname)s - %(message)s"
console_handler.setFormatter(logging.Formatter(console_format))
log_handlers.append(console_handler)
# Add file handler if log file specified
if args.log_file:
file_handler = logging.FileHandler(args.log_file)
# Full detailed format for log files
file_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
file_handler.setFormatter(logging.Formatter(file_format))
log_handlers.append(file_handler)
# Set log level based on verbosity
if args.quiet:
log_level = logging.WARNING
elif args.verbose == 0:
log_level = logging.INFO
elif args.verbose == 1:
log_level = logging.DEBUG
else: # args.verbose >= 2
log_level = logging.DEBUG # Maximum verbosity
# Configure the root logger
logging.basicConfig(level=log_level, handlers=log_handlers, force=True)
# Create a rich console for nice output
console = Console()
console.print(
f"Starting Pokemon Game Server Agent from snapshot {args.snapshot_id}"
)
console.print(
f"Will run for {args.steps} steps with max history of {args.max_history}"
)
# Set parent snapshot ID (if not provided, use the starting snapshot as parent)
parent_snapshot_id = args.parent_snapshot_id or args.snapshot_id
console.print(f"Parent snapshot ID for lineage tracking: {parent_snapshot_id}")
if not args.quiet:
console.print(
f"Log level: {'QUIET' if args.quiet else logging.getLevelName(log_level)}"
)
if args.show_game_state:
console.print("Game state display: Enabled")
if args.show_collision_map:
console.print("Collision map display: Enabled")
if args.log_file:
console.print(f"Logging to file: {args.log_file}")
console.print("=" * 50)
# Create the MorphCloud client
morph_client = MorphCloudClient(api_key=args.api_key)
# Start instance from snapshot
console.print("Starting instance from snapshot...")
instance = morph_client.instances.start(
snapshot_id=args.snapshot_id, ttl_seconds=60 * 60 * 24 # 24 hours
)
# Wait for instance to be ready
console.print("Waiting for instance to be ready...")
instance.wait_until_ready()
# Get the instance URL
instance_url = next(
service.url
for service in instance.networking.http_services
if service.name == "web"
)
remote_desktop_url = next(
service.url
for service in instance.networking.http_services
if service.name == "novnc"
)
novnc_url = f"{remote_desktop_url}/vnc_lite.html"
console.print(f"Pokemon remote desktop available at: {novnc_url}")
# Open the NoVNC URL automatically in the default browser if not suppressed
if not args.no_browser:
webbrowser.open(novnc_url)
else:
console.print("Browser auto-open suppressed. Use the URL above to view the game.")
# Create a "game display" configuration object to pass to the agent
display_config = {
"show_game_state": args.show_game_state or args.verbose > 0,
"show_collision_map": args.show_collision_map or args.verbose > 1,
"quiet_mode": args.quiet,
}
# Run agent with the instance URL
console.print("Initializing agent...")
try:
agent = PokemonAgent(
server_host=instance_url,
server_port=None, # Not needed since URL already includes the port
max_history=args.max_history,
display_config=display_config,
morph_client=morph_client, # Pass the client for snapshot creation
parent_snapshot_id=parent_snapshot_id, # Pass the parent snapshot ID
dashboard_run_id=args.dashboard_run_id, # Pass the dashboard run ID
)
console.print("✅ Agent initialized successfully!")
console.print("=" * 50)
# Run the agent
console.print(f"Starting agent loop for {args.steps} steps...")
steps_completed, snapshots = agent.run(
num_steps=args.steps,
instance_id=instance.id,
snapshot_name_prefix=args.snapshot_prefix
)
console.print("=" * 50)
console.print(f"✅ Agent completed {steps_completed} steps")
# Display a summary of created snapshots
if snapshots:
console.print(f"\nCreated {len(snapshots)} snapshots:")
for snapshot in snapshots:
console.print(f" - Step {snapshot['step']}: {snapshot['snapshot_id']} ({snapshot['name']})")
except ConnectionError as e:
console.print(f"❌ Connection error: {e}")
console.print(f"Make sure the server is running on the instance")
sys.exit(1)
except KeyboardInterrupt:
console.print("Received keyboard interrupt, stopping agent")
except Exception as e:
console.print(f"❌ Error: {e}")
sys.exit(1)
finally:
if "agent" in locals():
agent.stop()
# Stop the Morph instance
console.print("Stopping Morph instance...")
instance.stop()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# /// script
# dependencies = [
# "morphcloud",
# "requests",
# "pillow",
# "rich",
# "anthropic",
# ]
# ///
"""Run a non-interactive server agent that plays Pokemon automatically.
This script combines the EmulatorClient and PokemonAgent to set up a basic agent.
"""
import io
import sys
import json
import copy
import typing
import base64
import logging
import argparse
import time
import requests
import webbrowser
from PIL import Image
from anthropic import Anthropic
from rich.console import Console
from morphcloud.api import MorphCloudClient
# Set up logging - this will be configured properly in main() based on command line args
logger = logging.getLogger(__name__)
# Configuration
MAX_TOKENS = 4096
MODEL_NAME = "claude-3-7-sonnet-20250219"
TEMPERATURE = 0.7
USE_NAVIGATOR = True
class EmulatorClient:
def __init__(self, host: str = "127.0.0.1", port: int = 9876):
# Check if host already includes the protocol, if not add http://
if host.startswith("http://") or host.startswith("https://"):
# For MorphVM URLs, don't append port as it's handled by the URL routing
if "cloud.morph.so" in host or port is None:
self.base_url = host
# For other URLs, handle port as before
elif ":" not in host.split("/")[-1]:
self.base_url = f"{host}:{port}"
else:
# Host already has port, use it as is
self.base_url = host
else:
# For MorphVM URLs, don't append port
if "cloud.morph.so" in host:
self.base_url = f"https://{host}"
else:
self.base_url = f"http://{host}:{port}"
logger.info(f"Initialized client connecting to {self.base_url}")
def get_screenshot(self):
"""Get current screenshot as PIL Image"""
response = requests.get(f"{self.base_url}/api/screenshot")
if response.status_code != 200:
logger.error(f"Error getting screenshot: {response.status_code}")
return None
return Image.open(io.BytesIO(response.content))
def get_screenshot_base64(self):
"""Get current screenshot as base64 string"""
response = requests.get(f"{self.base_url}/api/screenshot")
if response.status_code != 200:
logger.error(f"Error getting screenshot: {response.status_code}")
return ""
return base64.b64encode(response.content).decode("utf-8")
def get_game_state(self):
"""Get complete game state from server"""
response = requests.get(f"{self.base_url}/api/game_state")
if response.status_code != 200:
logger.error(
f"Error response from server: {response.status_code} - {response.text}"
)
return {}
try:
return response.json()
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {e}")
logger.error(f"Response content: {response.text[:100]}...")
return {}
# Compatibility methods to match Emulator interface
def get_state_from_memory(self):
"""Get game state string - mimics Emulator.get_state_from_memory()"""
state_data = self.get_game_state()
return state_data.get("game_state", "")
def get_collision_map(self):
"""Get collision map - mimics Emulator.get_collision_map()"""
state_data = self.get_game_state()
return state_data.get("collision_map", "")
def get_valid_moves(self):
"""Get valid moves - mimics Emulator.get_valid_moves()"""
state_data = self.get_game_state()
return state_data.get("valid_moves", [])
def find_path(self, row, col):
"""Find path to position - mimics Emulator.find_path()"""
result = self.navigate(row, col)
if not isinstance(result, dict):
return "Failed to navigate", []
return result.get("status", "Navigation failed"), result.get("path", [])
def press_buttons(
self, buttons, wait=True, include_state=False, include_screenshot=False
):
"""Press a sequence of buttons on the Game Boy
Args:
buttons: List of buttons to press
wait: Whether to pause briefly after each button press
include_state: Whether to include game state in response
include_screenshot: Whether to include screenshot in response
Returns:
dict: Response data which may include button press result, game state, and screenshot
"""
data = {
"buttons": buttons,
"wait": wait,
"include_state": include_state,
"include_screenshot": include_screenshot,
}
response = requests.post(f"{self.base_url}/api/press_buttons", json=data)
if response.status_code != 200:
logger.error(
f"Error pressing buttons: {response.status_code} - {response.text}"
)
return {"error": f"Error: {response.status_code}"}
return response.json()
def navigate(self, row, col, include_state=False, include_screenshot=False):
"""Navigate to a specific position on the grid
Args:
row: Target row coordinate
col: Target column coordinate
include_state: Whether to include game state in response
include_screenshot: Whether to include screenshot in response
Returns:
dict: Response data which may include navigation result, game state, and screenshot
"""
data = {
"row": row,
"col": col,
"include_state": include_state,
"include_screenshot": include_screenshot,
}
response = requests.post(f"{self.base_url}/api/navigate", json=data)
if response.status_code != 200:
logger.error(f"Error navigating: {response.status_code} - {response.text}")
return {"status": f"Error: {response.status_code}", "path": []}
return response.json()
def read_memory(self, address):
"""Read a specific memory address"""
response = requests.get(f"{self.base_url}/api/memory/{address}")
if response.status_code != 200:
logger.error(
f"Error reading memory: {response.status_code} - {response.text}"
)
return {"error": f"Error: {response.status_code}"}
return response.json()
def load_state(self, state_path):
"""Load a saved state"""
data = {"state_path": state_path}
response = requests.post(f"{self.base_url}/api/load_state", json=data)
if response.status_code != 200:
logger.error(
f"Error loading state: {response.status_code} - {response.text}"
)
return {"error": f"Error: {response.status_code}"}
return response.json()
def save_screenshot(self, filename="screenshot.png"):
"""Save current screenshot to a file"""
screenshot = self.get_screenshot()
if screenshot:
screenshot.save(filename)
logger.info(f"Screenshot saved as {filename}")
return True
return False
def initialize(self, max_retries=5, retry_delay=3):
"""
Initialize method with retry capability for compatibility with Emulator
Args:
max_retries (int): Maximum number of retry attempts
retry_delay (int): Delay between retries in seconds
Returns:
bool: True if server is ready, False otherwise
"""
logger.info(f"Client initialization requested (compatibility method) with {max_retries} retries")
# Implement retry logic
for attempt in range(1, max_retries + 1):
try:
logger.info(f"Checking server status (attempt {attempt}/{max_retries})")
response = requests.get(f"{self.base_url}/api/status", timeout=10)
status = response.json()
ready = status.get("ready", False)
if ready:
logger.info("Server reports ready status")
return True
else:
logger.warning(f"Server reports not ready (attempt {attempt}/{max_retries})")
# If not ready and we have more attempts, wait before trying again
if attempt < max_retries:
logger.info(f"Waiting {retry_delay} seconds before retry...")
time.sleep(retry_delay)
except requests.exceptions.Timeout:
logger.warning(f"Connection timeout (attempt {attempt}/{max_retries})")
if attempt < max_retries:
time.sleep(retry_delay)
except requests.exceptions.ConnectionError as e:
logger.warning(f"Connection error: {e} (attempt {attempt}/{max_retries})")
if attempt < max_retries:
time.sleep(retry_delay)
except Exception as e:
logger.error(f"Error checking server status: {e} (attempt {attempt}/{max_retries})")
if attempt < max_retries:
time.sleep(retry_delay)
logger.error(f"Server not ready after {max_retries} attempts")
return False
def stop(self):
"""Empty stop method for compatibility with Emulator"""
logger.info("Client stop requested (compatibility method)")
# Nothing to do for client
pass
def get_screenshot_base64(screenshot, upscale=1):
"""Convert PIL image to base64 string."""
# Resize if needed
if upscale > 1:
new_size = (screenshot.width * upscale, screenshot.height * upscale)
screenshot = screenshot.resize(new_size)
# Convert to base64
buffered = io.BytesIO()
screenshot.save(buffered, format="PNG")
return base64.standard_b64encode(buffered.getvalue()).decode()
class PokemonAgent:
def __init__(
self,
server_host="127.0.0.1",
server_port: typing.Optional[int] = 9876,
max_history=60,
display_config=None,
morph_client=None, # Add MorphCloudClient as a parameter
parent_snapshot_id=None, # Add parent snapshot ID parameter
dashboard_run_id=None, # Add dashboard run ID parameter
):
"""Initialize the server agent.
Args:
server_host: Host where the game server is running
server_port: Port number of the game server
max_history: Maximum number of messages in history before summarization
display_config: Dictionary with display configuration options
morph_client: Optional MorphCloudClient instance for snapshot creation
parent_snapshot_id: Optional ID of the parent snapshot for lineage tracking
dashboard_run_id: Optional ID for grouping snapshots by dashboard run
"""
self.client = EmulatorClient(host=server_host, port=server_port or 9876)
self.anthropic = Anthropic()
self.running = True
self.message_history = [
{"role": "user", "content": "You may now begin playing."}
]
self.max_history = max_history
# Store the MorphCloud client and snapshot tracking IDs
self.morph_client = morph_client
self.parent_snapshot_id = parent_snapshot_id
self.dashboard_run_id = dashboard_run_id or parent_snapshot_id # Use parent as fallback
self.last_snapshot_id = parent_snapshot_id # Track the last created snapshot ID
# Set display configuration with defaults
self.display_config = display_config or {
"show_game_state": False,
"show_collision_map": False,
"quiet_mode": False,
}
# Log initialization with chosen configuration
logger.debug(f"Agent initialized with display config: {self.display_config}")
if self.morph_client and self.parent_snapshot_id:
logger.info(f"Snapshot tracking enabled. Parent snapshot: {self.parent_snapshot_id}")
if self.dashboard_run_id:
logger.info(f"Dashboard run ID for grouping snapshots: {self.dashboard_run_id}")
# Check if the server is ready
if not self.client.initialize():
logger.error(
"Server not ready - please start the server before running the agent"
)
raise RuntimeError("Server not ready")
SYSTEM_PROMPT = """You are playing Pokemon Red. You can see the game screen and control the game by executing emulator commands.
Your goal is to play through Pokemon Red and eventually defeat the Elite Four. Make decisions based on what you see on the screen.
check your tools! for example, try to use 'navigate_to' to help you move faster and better when looking at the map, with positions from game state.
Before each action, explain your reasoning briefly, plan your immediate next few steps needed (low level tool calls and actions, e.g. 'to reach the Cave from here, I need to go 1. right, 2. right, 3. right, 4. up', not high level goals like '1. explore the Cave 2. ??? 3. win!') to get there, then use the available actions to execute the next step in the game.
The game commands always register perfectly, so if you see no reaction to them, you have made an invalid command and misunderstood the game state. In battles, when you see an attack that isn't effective, you should examine your assumptions and update your beliefs. In general, search the solution space (try different things) before getting stuck in ruts.
Mistakes you have made before:
- do not talk to NPCs
- do not plan with high level goals
- DON'T FIGHT ANY BATTLES IF YOU CAN HELP IT. IF YOU ENCOUNTER A WILD (non trainer) BATTLE - JUST RUN
- do not insist on your prior knowledge about what attacks are strong against what types of Pokemon works when the evidence is the opposite
- you often miss the cave, which is a black hole to the side of the pokemon center.
The conversation history may occasionally be summarized to save context space. If you see a message labeled "CONVERSATION HISTORY SUMMARY", this contains the key information about your progress so far. Use this information to maintain continuity in your gameplay."""
SUMMARY_PROMPT = """I need you to create a detailed summary of our conversation history up to this point. This summary will replace the full conversation history to manage the context window.
Please include:
1. Key game events and milestones you've reached
2. Important decisions you've made
3. Current objectives or goals you're working toward
4. Your current location and Pokémon team status
5. Any strategies or plans you've mentioned
The summary should be comprehensive enough that you can continue gameplay without losing important context about what has happened so far."""
AVAILABLE_TOOLS = [
{
"name": "press_buttons",
"description": "Press a sequence of buttons on the Game Boy.",
"input_schema": {
"type": "object",
"properties": {
"buttons": {
"type": "array",
"items": {
"type": "string",
"enum": [
"a",
"b",
"start",
"select",
"up",
"down",
"left",
"right",
],
},
"description": "List of buttons to press in sequence. Valid buttons: 'a', 'b', 'start', 'select', 'up', 'down', 'left', 'right'",
},
"wait": {
"type": "boolean",
"description": "Whether to wait for a brief period after pressing each button. Defaults to true.",
},
},
"required": ["buttons"],
},
}
]
# Add navigation tool if enabled
if USE_NAVIGATOR:
AVAILABLE_TOOLS.append(
{
"name": "navigate_to",
"description": "Automatically navigate to a position on the map grid. The screen is divided into a 9x10 grid, with the top-left corner as (0, 0). This tool is only available in the overworld.",
"input_schema": {
"type": "object",
"properties": {
"row": {
"type": "integer",
"description": "The row coordinate to navigate to (0-8).",
},
"col": {
"type": "integer",
"description": "The column coordinate to navigate to (0-9).",
},
},
"required": ["row", "col"],
},
}
)
def process_tool_call(self, tool_call):
"""Process a single tool call."""
tool_name = tool_call.name
tool_input = tool_call.input
# In quiet mode, only log at debug level
if self.display_config["quiet_mode"]:
logger.debug(f"Processing tool call: {tool_name}")
else:
logger.info(f"Processing tool call: {tool_name}")
if tool_name == "press_buttons":
buttons = tool_input["buttons"]
wait = tool_input.get("wait", True)
# Log the button press action
if self.display_config["quiet_mode"]:
logger.debug(f"[Buttons] Pressing: {buttons} (wait={wait})")
else:
logger.info(f"[Buttons] Pressing: {buttons} (wait={wait})")
# Use enhanced client method to get result, state, and screenshot in one call
response = self.client.press_buttons(
buttons, wait=wait, include_state=True, include_screenshot=True
)
# Extract results from response
result = response.get("result", f"Pressed buttons: {', '.join(buttons)}")
# Get game state from response or fetch it if not included
if "game_state" in response:
memory_info = response["game_state"].get("game_state", "")
if self.display_config["show_game_state"]:
logger.info(f"[Memory State from response]")
logger.info(memory_info)
else:
logger.debug(f"[Memory State from response]")
logger.debug(memory_info)
collision_map = response["game_state"].get("collision_map", "")
if collision_map and self.display_config["show_collision_map"]:
logger.info(f"[Collision Map from response]\n{collision_map}")
elif collision_map:
logger.debug(f"[Collision Map from response]\n{collision_map}")
else:
# Fallback to separate calls if state not included
memory_info = self.client.get_state_from_memory()
if self.display_config["show_game_state"]:
logger.info(f"[Memory State after action]")
logger.info(memory_info)
else:
logger.debug(f"[Memory State after action]")
logger.debug(memory_info)
collision_map = self.client.get_collision_map()
if collision_map and self.display_config["show_collision_map"]:
logger.info(f"[Collision Map after action]\n{collision_map}")
elif collision_map:
logger.debug(f"[Collision Map after action]\n{collision_map}")
# Get screenshot from response or fetch it if not included
if "screenshot" in response:
screenshot_b64 = response["screenshot"]
else:
screenshot = self.client.get_screenshot()
screenshot_b64 = get_screenshot_base64(screenshot, upscale=2)
# Build response content based on display configuration
content = [
{"type": "text", "text": f"Pressed buttons: {', '.join(buttons)}"},
{
"type": "text",
"text": "\nHere is a screenshot of the screen after your button presses:",
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": screenshot_b64,
},
},
]
# Add game state to Claude's view if enabled
content.append(
{
"type": "text",
"text": f"\nGame state information from memory after your action:\n{memory_info}",
}
)
# Return tool result as a dictionary
return {
"type": "tool_result",
"tool_use_id": tool_call.id,
"content": content,
}
elif tool_name == "navigate_to":
row = tool_input["row"]
col = tool_input["col"]
# Log the navigation action
if self.display_config["quiet_mode"]:
logger.debug(f"[Navigation] Navigating to: ({row}, {col})")
else:
logger.info(f"[Navigation] Navigating to: ({row}, {col})")
# Use enhanced client method to get result, state, and screenshot in one call
response = self.client.navigate(
row, col, include_state=True, include_screenshot=True
)
# Extract navigation result
status = response.get("status", "Unknown status")
path = response.get("path", [])
if path:
result = f"Navigation successful: followed path with {len(path)} steps"
else:
result = f"Navigation failed: {status}"
# Get game state from response or fetch it if not included
if "game_state" in response:
memory_info = response["game_state"].get("game_state", "")
if self.display_config["show_game_state"]:
logger.info(f"[Memory State from response]")
logger.info(memory_info)
else:
logger.debug(f"[Memory State from response]")
logger.debug(memory_info)
collision_map = response["game_state"].get("collision_map", "")
if collision_map and self.display_config["show_collision_map"]:
logger.info(f"[Collision Map from response]\n{collision_map}")
elif collision_map:
logger.debug(f"[Collision Map from response]\n{collision_map}")
else:
# Fallback to separate calls if state not included
memory_info = self.client.get_state_from_memory()
if self.display_config["show_game_state"]:
logger.info(f"[Memory State after action]")
logger.info(memory_info)
else:
logger.debug(f"[Memory State after action]")
logger.debug(memory_info)
collision_map = self.client.get_collision_map()
if collision_map and self.display_config["show_collision_map"]:
logger.info(f"[Collision Map after action]\n{collision_map}")
elif collision_map:
logger.debug(f"[Collision Map after action]\n{collision_map}")
# Get screenshot from response or fetch it if not included
if "screenshot" in response:
screenshot_b64 = response["screenshot"]
else:
screenshot = self.client.get_screenshot()
screenshot_b64 = get_screenshot_base64(screenshot, upscale=2)
# Build response content based on display configuration
content = [
{"type": "text", "text": f"Navigation result: {result}"},
{
"type": "text",
"text": "\nHere is a screenshot of the screen after navigation:",
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": screenshot_b64,
},
},
]
# Add game state to Claude's view if enabled
content.append(
{
"type": "text",
"text": f"\nGame state information from memory after your action:\n{memory_info}",
}
)
# Return tool result as a dictionary
return {
"type": "tool_result",
"tool_use_id": tool_call.id,
"content": content,
}
else:
logger.error(f"Unknown tool called: {tool_name}")
return {
"type": "tool_result",
"tool_use_id": tool_call.id,
"content": [
{"type": "text", "text": f"Error: Unknown tool '{tool_name}'"}
],
}
def run(self, num_steps=1, instance_id=None, snapshot_name_prefix=None):
"""Main agent loop.
Args:
num_steps: Number of steps to run for
instance_id: ID of the current instance for snapshot creation
snapshot_name_prefix: Prefix for naming snapshots
"""
if self.display_config["quiet_mode"]:
logger.debug(f"Starting agent loop for {num_steps} steps")
else:
logger.info(f"Starting agent loop for {num_steps} steps")
steps_completed = 0
snapshots = []
while self.running and steps_completed < num_steps:
try:
messages = copy.deepcopy(self.message_history)
if len(messages) >= 3:
if (
messages[-1]["role"] == "user"
and isinstance(messages[-1]["content"], list)
and messages[-1]["content"]
):
messages[-1]["content"][-1]["cache_control"] = {
"type": "ephemeral"
}
if (
len(messages) >= 5
and messages[-3]["role"] == "user"
and isinstance(messages[-3]["content"], list)
and messages[-3]["content"]
):
messages[-3]["content"][-1]["cache_control"] = {
"type": "ephemeral"
}
# Get model response
response = self.anthropic.messages.create(
model=MODEL_NAME,
max_tokens=MAX_TOKENS,
system=self.SYSTEM_PROMPT,
messages=messages,
tools=self.AVAILABLE_TOOLS,
temperature=TEMPERATURE,
)
# Log token usage
if self.display_config["quiet_mode"]:
logger.debug(f"Response usage: {response.usage}")
else:
logger.info(f"Response usage: {response.usage}")
# Extract tool calls
tool_calls = [
block for block in response.content if block.type == "tool_use"
]
# Display the model's reasoning
for block in response.content:
if block.type == "text":
# Claude's thoughts should always be visible, even in quiet mode
logger.info(f"[Claude] {block.text}")
elif block.type == "tool_use":
# Tool calls should be visible at info level by default
if self.display_config["quiet_mode"]:
logger.info(
f"[Claude Action] Using tool: {block.name} with input: {block.input}"
)
else:
logger.info(
f"[Tool Use] {block.name} with input: {block.input}"
)
# Process tool calls
if tool_calls:
# Add assistant message to history
assistant_content = []
for block in response.content:
if block.type == "text":
assistant_content.append(
{"type": "text", "text": block.text}
)
elif block.type == "tool_use":
assistant_content.append(
{"type": "tool_use", **dict(block)}
)
self.message_history.append(
{"role": "assistant", "content": assistant_content}
)
# Process tool calls and create tool results
tool_results = []
for tool_call in tool_calls:
tool_result = self.process_tool_call(tool_call)
tool_results.append(tool_result)
# Add tool results to message history
self.message_history.append(
{"role": "user", "content": tool_results}
)
# Check if we need to summarize the history
if len(self.message_history) >= self.max_history:
self.summarize_history()
steps_completed += 1
if self.display_config["quiet_mode"]:
logger.debug(f"Completed step {steps_completed}/{num_steps}")
else:
logger.info(f"Completed step {steps_completed}/{num_steps}")
# Create a snapshot after each step if morph_client and instance_id are provided
if self.morph_client and instance_id:
step_num = steps_completed
snapshot_name = f"{snapshot_name_prefix}_step_{step_num}" if snapshot_name_prefix else f"pokemon_step_{step_num}"
logger.info(f"Creating snapshot after step {step_num}...")
try:
# Create metadata dictionary to track lineage
metadata = {
"step_number": str(step_num),
"timestamp": str(int(time.time())),
}
# Add parent_snapshot if we have one
if self.parent_snapshot_id:
metadata["parent_snapshot"] = self.parent_snapshot_id
# Add dashboard_run_id for filtering in dashboard
if self.dashboard_run_id:
metadata["dashboard_run_id"] = self.dashboard_run_id
# Add previous snapshot if we have one
if self.last_snapshot_id:
metadata["prev_snapshot"] = self.last_snapshot_id
# Create the snapshot with metadata
instance = self.morph_client.instances.get(instance_id)
snapshot = instance.snapshot()
snapshot.set_metadata(metadata)
# Update our last snapshot ID
self.last_snapshot_id = snapshot.id
logger.info(f"✅ Snapshot created with ID: {snapshot.id}")
logger.info(f" Metadata: parent={metadata.get('parent_snapshot', 'None')}, prev={metadata.get('prev_snapshot', 'None')}, step={step_num}, dashboard_run_id={metadata.get('dashboard_run_id', 'None')}")
# Keep track of all snapshots
snapshots.append({
'step': step_num,
'snapshot_id': snapshot.id,
'name': snapshot_name,
'metadata': metadata
})
except Exception as e:
logger.error(f"Failed to create snapshot: {e}")
except KeyboardInterrupt:
logger.info("Received keyboard interrupt, stopping")
self.running = False
except Exception as e:
logger.error(f"Error in agent loop: {e}")
logger.exception(e)
raise e
if not self.running:
self.client.stop()
return steps_completed, snapshots
def summarize_history(self):
"""Generate a summary of the conversation history and replace the history with just the summary."""
if self.display_config["quiet_mode"]:
logger.debug(f"[Agent] Generating conversation summary...")
else:
logger.info(f"[Agent] Generating conversation summary...")
# Get a new screenshot for the summary
screenshot = self.client.get_screenshot()
screenshot_b64 = get_screenshot_base64(screenshot, upscale=2)
# Create messages for the summarization request - pass the entire conversation history
messages = copy.deepcopy(self.message_history)
if len(messages) >= 3:
if (
messages[-1]["role"] == "user"
and isinstance(messages[-1]["content"], list)
and messages[-1]["content"]
):
messages[-1]["content"][-1]["cache_control"] = {"type": "ephemeral"}
if (
len(messages) >= 5
and messages[-3]["role"] == "user"
and isinstance(messages[-3]["content"], list)
and messages[-3]["content"]
):
messages[-3]["content"][-1]["cache_control"] = {"type": "ephemeral"}
messages += [
{
"role": "user",
"content": [
{
"type": "text",
"text": self.SUMMARY_PROMPT,
}
],
}
]
# Get summary from Claude
response = self.anthropic.messages.create(
model=MODEL_NAME,
max_tokens=MAX_TOKENS,
system=self.SYSTEM_PROMPT,
messages=messages,
temperature=TEMPERATURE,
)
# Extract the summary text
summary_text = " ".join(
[block.text for block in response.content if block.type == "text"]
)
# Log the summary - use info level even in quiet mode as it's important
logger.info(f"[Claude Summary] Game Progress Summary:")
logger.info(f"{summary_text}")
# Replace message history with just the summary
self.message_history = [
{
"role": "user",
"content": [
{
"type": "text",
"text": f"CONVERSATION HISTORY SUMMARY (representing {self.max_history} previous messages): {summary_text}",
},
{
"type": "text",
"text": "\n\nCurrent game screenshot for reference:",
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": screenshot_b64,
},
},
{
"type": "text",
"text": "You were just asked to summarize your playthrough so far, which is the summary you see above. You may now continue playing by selecting your next action.",
},
],
}
]
if self.display_config["quiet_mode"]:
logger.debug(f"[Agent] Message history condensed into summary.")
else:
logger.info(f"[Agent] Message history condensed into summary.")
def stop(self):
"""Stop the agent."""
self.running = False
self.client.stop()
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description="Run a Pokemon Game Server Agent")
parser.add_argument(
"--snapshot-id", type=str, required=True, help="Morph snapshot ID to run"
)
parser.add_argument(
"--api-key", type=str, help="Morph API key (defaults to MORPH_API_KEY env var)"
)
parser.add_argument(
"--steps", type=int, default=10, help="Number of steps to run (default: 10)"
)
parser.add_argument(
"--max-history",
type=int,
default=30,
help="Maximum history size before summarizing (default: 30)",
)
# Add parent snapshot tracking option
parser.add_argument(
"--parent-snapshot-id",
type=str,
help="Parent snapshot ID for lineage tracking (defaults to the starting snapshot-id)"
)
parser.add_argument(
"--dashboard-run-id",
type=str,
help="Dashboard run ID for grouping snapshots (defaults to parent-snapshot-id)"
)
parser.add_argument(
"--snapshot-prefix",
type=str,
default="pokemon",
help="Prefix for snapshot names (default: 'pokemon')"
)
# Add verbosity and display options
parser.add_argument(
"--verbose",
"-v",
action="count",
default=0,
help="Increase output verbosity (can be used multiple times, e.g. -vv)",
)
parser.add_argument(
"--show-game-state",
action="store_true",
help="Show full game state information in the logs",
)
parser.add_argument(
"--show-collision-map",
action="store_true",
help="Show collision map in the logs",
)
parser.add_argument(
"--log-file",
type=str,
help="Path to log file. If not provided, logs will only go to stderr",
)
parser.add_argument(
"--quiet",
"-q",
action="store_true",
help="Only show Claude's thoughts and actions, minimal logging",
)
parser.add_argument(
"--no-browser",
action="store_true",
help="Suppress auto-opening the browser to display the game",
)
return parser.parse_args()
def main():
args = parse_arguments()
# Configure logging based on command line arguments
log_handlers = []
# Set up console handler with formatting
console_handler = logging.StreamHandler()
if args.quiet:
console_format = "%(message)s" # Minimal format for quiet mode
else:
console_format = "%(asctime)s - %(levelname)s - %(message)s"
console_handler.setFormatter(logging.Formatter(console_format))
log_handlers.append(console_handler)
# Add file handler if log file specified
if args.log_file:
file_handler = logging.FileHandler(args.log_file)
# Full detailed format for log files
file_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
file_handler.setFormatter(logging.Formatter(file_format))
log_handlers.append(file_handler)
# Set log level based on verbosity
if args.quiet:
log_level = logging.WARNING
elif args.verbose == 0:
log_level = logging.INFO
elif args.verbose == 1:
log_level = logging.DEBUG
else: # args.verbose >= 2
log_level = logging.DEBUG # Maximum verbosity
# Configure the root logger
logging.basicConfig(level=log_level, handlers=log_handlers, force=True)
# Create a rich console for nice output
console = Console()
console.print(
f"Starting Pokemon Game Server Agent from snapshot {args.snapshot_id}"
)
console.print(
f"Will run for {args.steps} steps with max history of {args.max_history}"
)
# Set parent snapshot ID (if not provided, use the starting snapshot as parent)
parent_snapshot_id = args.parent_snapshot_id or args.snapshot_id
console.print(f"Parent snapshot ID for lineage tracking: {parent_snapshot_id}")
if not args.quiet:
console.print(
f"Log level: {'QUIET' if args.quiet else logging.getLevelName(log_level)}"
)
if args.show_game_state:
console.print("Game state display: Enabled")
if args.show_collision_map:
console.print("Collision map display: Enabled")
if args.log_file:
console.print(f"Logging to file: {args.log_file}")
console.print("=" * 50)
# Create the MorphCloud client
morph_client = MorphCloudClient(api_key=args.api_key)
# Start instance from snapshot
console.print("Starting instance from snapshot...")
instance = morph_client.instances.start(
snapshot_id=args.snapshot_id, ttl_seconds=60 * 60 * 24 # 24 hours
)
# Wait for instance to be ready
console.print("Waiting for instance to be ready...")
instance.wait_until_ready()
# Get the instance URL
instance_url = next(
service.url
for service in instance.networking.http_services
if service.name == "web"
)
remote_desktop_url = next(
service.url
for service in instance.networking.http_services
if service.name == "novnc"
)
novnc_url = f"{remote_desktop_url}/vnc_lite.html"
console.print(f"Pokemon remote desktop available at: {novnc_url}")
# Open the NoVNC URL automatically in the default browser if not suppressed
if not args.no_browser:
webbrowser.open(novnc_url)
else:
console.print("Browser auto-open suppressed. Use the URL above to view the game.")
# Create a "game display" configuration object to pass to the agent
display_config = {
"show_game_state": args.show_game_state or args.verbose > 0,
"show_collision_map": args.show_collision_map or args.verbose > 1,
"quiet_mode": args.quiet,
}
# Run agent with the instance URL
console.print("Initializing agent...")
try:
agent = PokemonAgent(
server_host=instance_url,
server_port=None, # Not needed since URL already includes the port
max_history=args.max_history,
display_config=display_config,
morph_client=morph_client, # Pass the client for snapshot creation
parent_snapshot_id=parent_snapshot_id, # Pass the parent snapshot ID
dashboard_run_id=args.dashboard_run_id, # Pass the dashboard run ID
)
console.print("✅ Agent initialized successfully!")
console.print("=" * 50)
# Run the agent
console.print(f"Starting agent loop for {args.steps} steps...")
steps_completed, snapshots = agent.run(
num_steps=args.steps,
instance_id=instance.id,
snapshot_name_prefix=args.snapshot_prefix
)
console.print("=" * 50)
console.print(f"✅ Agent completed {steps_completed} steps")
# Display a summary of created snapshots
if snapshots:
console.print(f"\nCreated {len(snapshots)} snapshots:")
for snapshot in snapshots:
console.print(f" - Step {snapshot['step']}: {snapshot['snapshot_id']} ({snapshot['name']})")
except ConnectionError as e:
console.print(f"❌ Connection error: {e}")
console.print(f"Make sure the server is running on the instance")
sys.exit(1)
except KeyboardInterrupt:
console.print("Received keyboard interrupt, stopping agent")
except Exception as e:
console.print(f"❌ Error: {e}")
sys.exit(1)
finally:
if "agent" in locals():
agent.stop()
# Stop the Morph instance
console.print("Stopping Morph instance...")
instance.stop()
if __name__ == "__main__":
main()
annotated-types==0.7.0
anthropic==0.49.0
anyio==4.9.0
bcrypt==4.3.0
certifi==2025.1.31
cffi==1.17.1
charset-normalizer==3.4.1
click==8.1.8
cryptography==44.0.2
distro==1.9.0
dotenv==0.9.9
fastapi==0.115.12
h11==0.14.0
httpcore==1.0.7
httpx==0.28.1
httpx-sse==0.4.0
idna==3.10
jiter==0.9.0
markdown-it-py==3.0.0
mcp==1.6.0
mdurl==0.1.2
morphcloud==0.1.32
paramiko==3.5.1
pathspec==0.12.1
pillow==11.1.0
psutil==7.0.0
pycparser==2.22
pydantic==2.11.1
pydantic-core==2.33.0
pydantic-settings==2.8.1
pygments==2.19.1
pynacl==1.5.0
python-dotenv==1.1.0
requests==2.32.3
rich==13.9.4
sniffio==1.3.1
sse-starlette==2.2.1
starlette==0.46.1
tqdm==4.67.1
typing-extensions==4.13.0
typing-inspection==0.4.0
urllib3==2.3.0
uvicorn==0.34.0
#!/usr/bin/env python
import argparse
import sys
import time
from morphcloud.api import MorphCloudClient
def main():
# Parse command line arguments
parser = argparse.ArgumentParser(description="Create a new snapshot from an existing one")
args = parser.parse_args()
# Initialize the Morph Cloud client
# API key will be read from MORPH_API_KEY environment variable
client = MorphCloudClient()
print(f"Starting instance from snapshot {args.snapshot_id}...")
instance = client.instances.start(args.snapshot_id)
try:
print(f"Instance {instance.id} created, waiting for it to be ready...")
instance.wait_until_ready(timeout=300)
print(f"Instance {instance.id} is now ready")
print("Creating new snapshot...")
new_snapshot = instance.snapshot()
print(f"New snapshot created: {new_snapshot.id}")
# Output just the snapshot ID for easy capture in shell scripts
print(new_snapshot.id)
return new_snapshot.id
finally:
print(f"Stopping instance {instance.id}...")
instance.stop()
print("Instance stopped")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment