Last active
March 31, 2025 00:56
-
-
Save swyxio/9ab20648acca2d2b9ce5c1e91222fecb to your computer and use it in GitHub Desktop.
pokemon hackathon submission - swyx prompted version
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# /// script | |
# dependencies = [ | |
# "morphcloud", | |
# "requests", | |
# "pillow", | |
# "rich", | |
# "anthropic", | |
# "flask", | |
# ] | |
# /// | |
""" | |
Pokemon Agent Dashboard - Single File Version | |
This script provides a web interface for managing Pokemon agents. | |
Just run this file and open your browser to http://localhost:5000. | |
Dependencies: flask | |
""" | |
import os | |
import sys | |
import time | |
import signal | |
import subprocess | |
import threading | |
import re | |
from collections import deque | |
import webbrowser | |
from flask import Flask, request, jsonify | |
# Try to import MorphCloudClient for snapshot operations | |
try: | |
from morphcloud.api import MorphCloudClient | |
except ImportError: | |
print("Warning: morphcloud package not found. Some snapshot features may not work.") | |
MorphCloudClient = None | |
# The HTML interface with enhanced snapshot viewer tab | |
HTML_TEMPLATE = """<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
<title>Pokemon Agent Dashboard</title> | |
<style> | |
body, html { | |
margin: 0; | |
padding: 0; | |
height: 100%; | |
font-family: Arial, sans-serif; | |
} | |
.container { | |
display: flex; | |
height: 100vh; | |
} | |
.sidebar { | |
width: 320px; | |
background-color: #f5f5f5; | |
padding: 15px; | |
display: flex; | |
flex-direction: column; | |
overflow: hidden; | |
} | |
.controls { | |
flex: 0 0 auto; | |
} | |
.console, .snapshots { | |
flex: 1; | |
margin-top: 15px; | |
overflow: hidden; | |
display: flex; | |
flex-direction: column; | |
} | |
.console-title, .snapshots-title { | |
font-weight: bold; | |
margin-bottom: 5px; | |
display: flex; | |
justify-content: space-between; | |
align-items: center; | |
} | |
.console-output, .snapshots-list { | |
flex: 1; | |
background-color: #222; | |
color: #f0f0f0; | |
font-family: monospace; | |
padding: 10px; | |
overflow-y: auto; | |
white-space: pre-wrap; | |
font-size: 12px; | |
border-radius: 4px; | |
} | |
.tabs { | |
display: flex; | |
margin-bottom: 10px; | |
} | |
.tab { | |
padding: 8px 16px; | |
cursor: pointer; | |
background-color: #e0e0e0; | |
border: 1px solid #ccc; | |
border-bottom: none; | |
border-radius: 4px 4px 0 0; | |
margin-right: 5px; | |
} | |
.tab.active { | |
background-color: #f0f0f0; | |
font-weight: bold; | |
} | |
.tab-content { | |
display: none; | |
flex: 1; | |
overflow: hidden; | |
flex-direction: column; | |
} | |
.tab-content.active { | |
display: flex; | |
} | |
.main-view { | |
flex: 1; | |
display: flex; | |
flex-direction: column; | |
overflow: hidden; | |
} | |
.game-frame { | |
flex: 1; | |
border: none; | |
} | |
label { | |
display: block; | |
margin-top: 10px; | |
font-weight: bold; | |
} | |
input { | |
width: 100%; | |
padding: 8px; | |
margin-top: 4px; | |
box-sizing: border-box; | |
} | |
button { | |
margin-top: 10px; | |
padding: 8px; | |
width: 100%; | |
background-color: #4CAF50; | |
color: white; | |
border: none; | |
cursor: pointer; | |
border-radius: 4px; | |
} | |
button:hover { | |
background-color: #45a049; | |
} | |
button:disabled { | |
background-color: #cccccc; | |
cursor: not-allowed; | |
} | |
button.stop { | |
background-color: #f44336; | |
} | |
button.stop:hover { | |
background-color: #d32f2f; | |
} | |
.status { | |
margin-top: 10px; | |
padding: 8px; | |
background-color: #e0e0e0; | |
border-radius: 4px; | |
} | |
.clear-logs, .refresh-snapshots { | |
margin-top: 5px; | |
font-size: 12px; | |
padding: 4px; | |
background-color: #666; | |
width: auto; | |
} | |
/* Snapshot tree styling */ | |
.snapshot-node { | |
margin-bottom: 8px; | |
padding: 5px; | |
background-color: #333; | |
border-radius: 3px; | |
cursor: pointer; | |
} | |
.snapshot-node:hover { | |
background-color: #444; | |
} | |
.snapshot-node.current { | |
background-color: #1e5922; | |
} | |
.snapshot-node .title { | |
font-weight: bold; | |
color: #4CAF50; | |
} | |
.snapshot-node .id { | |
color: #aaa; | |
font-size: 10px; | |
} | |
.snapshot-node .metadata { | |
color: #888; | |
font-size: 11px; | |
margin-top: 2px; | |
} | |
.snapshot-actions { | |
margin-top: 10px; | |
display: flex; | |
gap: 5px; | |
} | |
.snapshot-action { | |
font-size: 12px; | |
padding: 4px 8px; | |
margin: 0; | |
flex: 1; | |
} | |
.info-panel { | |
margin-top: 10px; | |
padding: 8px; | |
background-color: #444; | |
border-radius: 4px; | |
font-size: 12px; | |
color: #ddd; | |
} | |
</style> | |
</head> | |
<body> | |
<div class="container"> | |
<div class="sidebar"> | |
<div class="controls"> | |
<h2>Pokemon Agent</h2> | |
<form id="agentForm"> | |
<label for="snapshotId">Snapshot ID:</label> | |
<input type="text" id="snapshotId" required> | |
<label for="steps">Steps:</label> | |
<input type="number" id="steps" value="10" min="1"> | |
<div style="display: flex; gap: 10px; margin-top: 10px;"> | |
<div style="flex: 1;"> | |
<button type="submit" id="startButton">Start Agent</button> | |
</div> | |
<div style="flex: 1;"> | |
<button type="button" id="stopButton" class="stop" disabled>Stop Agent</button> | |
</div> | |
</div> | |
</form> | |
<div id="statusDisplay" class="status">Status: Idle</div> | |
<div style="margin-top: 10px; display: flex; gap: 10px; align-items: center;"> | |
<label for="autoRefresh" style="margin-top: 0; display: flex; align-items: center; font-weight: normal;"> | |
<input type="checkbox" id="autoRefresh" checked style="width: auto; margin-right: 5px;"> | |
Auto-refresh | |
</label> | |
<select id="refreshRate" style="width: 100px;"> | |
<option value="500">0.5s</option> | |
<option value="1000" selected>1s</option> | |
<option value="2000">2s</option> | |
<option value="5000">5s</option> | |
<option value="10000">10s</option> | |
</select> | |
<button id="manualRefresh" style="width: auto; margin-top: 0;">Refresh</button> | |
</div> | |
</div> | |
<div class="tabs"> | |
<div class="tab active" data-tab="console">Console</div> | |
<div class="tab" data-tab="snapshots">Snapshots</div> | |
</div> | |
<div class="tab-content active" id="console-tab"> | |
<div class="console"> | |
<div class="console-title"> | |
<span>Agent Output</span> | |
<button class="clear-logs" id="clearLogsButton">Clear</button> | |
</div> | |
<div class="console-output" id="consoleOutput"></div> | |
</div> | |
</div> | |
<div class="tab-content" id="snapshots-tab"> | |
<div class="snapshots"> | |
<div class="snapshots-title"> | |
<span>Snapshots</span> | |
<button class="refresh-snapshots" id="refreshSnapshotsButton">Refresh</button> | |
</div> | |
<div class="snapshots-list" id="snapshotsList"></div> | |
<div class="snapshot-actions"> | |
<button class="snapshot-action" id="loadSnapshotButton" disabled>Load Selected</button> | |
<button class="snapshot-action" id="viewSnapshotButton" disabled>View Details</button> | |
</div> | |
<div class="info-panel" id="snapshotInfoPanel"> | |
Select a snapshot to view details | |
</div> | |
</div> | |
</div> | |
</div> | |
<div class="main-view"> | |
<div style="display: flex; justify-content: flex-end; padding: 5px; background-color: #333;"> | |
<button id="reloadVncButton" style="margin: 0; padding: 5px 10px; width: auto; font-size: 12px; background-color: #666;"> | |
🔄 Reload VNC | |
</button> | |
</div> | |
<iframe id="gameFrame" class="game-frame" src="about:blank"></iframe> | |
</div> | |
</div> | |
<script> | |
let agentRunning = false; | |
let logPollingInterval = null; | |
let logPosition = 0; | |
let refreshRate = 1000; // Default refresh rate in milliseconds | |
let autoRefreshEnabled = true; | |
let currentSnapshotId = null; | |
let selectedSnapshotId = null; | |
let snapshots = []; // Store snapshot data | |
let currentVncUrl = null; | |
// Tab switching | |
document.querySelectorAll('.tab').forEach(tab => { | |
tab.addEventListener('click', function() { | |
// Remove active class from all tabs and contents | |
document.querySelectorAll('.tab').forEach(t => t.classList.remove('active')); | |
document.querySelectorAll('.tab-content').forEach(c => c.classList.remove('active')); | |
// Add active class to clicked tab and corresponding content | |
this.classList.add('active'); | |
document.getElementById(this.dataset.tab + '-tab').classList.add('active'); | |
}); | |
}); | |
// Handle refresh control changes | |
document.getElementById('autoRefresh').addEventListener('change', function(e) { | |
autoRefreshEnabled = e.target.checked; | |
if (autoRefreshEnabled && agentRunning) { | |
startLogPolling(); | |
} else { | |
stopLogPolling(); | |
} | |
}); | |
document.getElementById('refreshRate').addEventListener('change', function(e) { | |
refreshRate = parseInt(e.target.value); | |
if (logPollingInterval) { | |
stopLogPolling(); | |
if (autoRefreshEnabled && agentRunning) { | |
startLogPolling(); | |
} | |
} | |
}); | |
document.getElementById('manualRefresh').addEventListener('click', function() { | |
fetchLogs(); | |
}); | |
// Form submission handler | |
document.getElementById('agentForm').addEventListener('submit', async function(e) { | |
e.preventDefault(); | |
if (agentRunning) return; | |
const snapshotId = document.getElementById('snapshotId').value; | |
const steps = document.getElementById('steps').value; | |
// Store as the current snapshot ID | |
currentSnapshotId = snapshotId; | |
// Update UI | |
document.getElementById('statusDisplay').textContent = "Status: Starting agent..."; | |
document.getElementById('startButton').disabled = true; | |
document.getElementById('stopButton').disabled = false; | |
agentRunning = true; | |
try { | |
// Start the agent | |
const response = await fetch('/start', { | |
method: 'POST', | |
headers: { | |
'Content-Type': 'application/json' | |
}, | |
body: JSON.stringify({ | |
snapshotId, | |
steps | |
}) | |
}); | |
const result = await response.json(); | |
if (result.success) { | |
document.getElementById('statusDisplay').textContent = "Status: Agent running"; | |
// Reset the log position counter | |
logPosition = 0; | |
// Start polling for logs | |
startLogPolling(); | |
// Also fetch snapshots after a short delay to let them start being created | |
setTimeout(fetchSnapshots, 5000); | |
} else { | |
document.getElementById('statusDisplay').textContent = "Status: Error - " + result.error; | |
resetAgentState(); | |
} | |
} catch (error) { | |
document.getElementById('statusDisplay').textContent = "Status: Connection error"; | |
console.error(error); | |
resetAgentState(); | |
} | |
}); | |
// Stop button handler | |
document.getElementById('stopButton').addEventListener('click', async function() { | |
if (!agentRunning) return; | |
document.getElementById('statusDisplay').textContent = "Status: Stopping agent..."; | |
try { | |
const response = await fetch('/stop', { | |
method: 'POST' | |
}); | |
const result = await response.json(); | |
if (result.success) { | |
document.getElementById('statusDisplay').textContent = "Status: Agent stopped"; | |
// Fetch snapshots one more time to ensure we have the final ones | |
fetchSnapshots(); | |
} else { | |
document.getElementById('statusDisplay').textContent = "Status: Failed to stop agent"; | |
} | |
} catch (error) { | |
document.getElementById('statusDisplay').textContent = "Status: Connection error"; | |
console.error(error); | |
} | |
resetAgentState(); | |
}); | |
// Clear logs button handler | |
document.getElementById('clearLogsButton').addEventListener('click', function() { | |
document.getElementById('consoleOutput').textContent = ''; | |
}); | |
// Refresh snapshots button handler | |
document.getElementById('refreshSnapshotsButton').addEventListener('click', function() { | |
fetchSnapshots(); | |
}); | |
// Load snapshot button handler | |
document.getElementById('loadSnapshotButton').addEventListener('click', async function() { | |
if (!selectedSnapshotId) return; | |
// Set the selected snapshot as the current one in the form | |
document.getElementById('snapshotId').value = selectedSnapshotId; | |
// Highlight the selection as current | |
updateSnapshotDisplay(); | |
// Display a message | |
document.getElementById('snapshotInfoPanel').textContent = | |
`Snapshot ${selectedSnapshotId} loaded into form. Click 'Start Agent' to begin from this snapshot.`; | |
}); | |
// View snapshot details button handler | |
document.getElementById('viewSnapshotButton').addEventListener('click', function() { | |
if (!selectedSnapshotId) return; | |
// Find the selected snapshot | |
const snapshot = snapshots.find(s => s.id === selectedSnapshotId); | |
if (!snapshot) return; | |
// Display detailed information | |
const infoPanel = document.getElementById('snapshotInfoPanel'); | |
let details = `Snapshot: ${snapshot.name}\n`; | |
details += `ID: ${snapshot.id}\n`; | |
details += `Created: ${new Date(snapshot.created * 1000).toLocaleString()}\n\n`; | |
details += `Lineage:\n`; | |
details += `- Parent: ${snapshot.metadata?.parent_snapshot || 'None'}\n`; | |
details += `- Previous: ${snapshot.metadata?.prev_snapshot || 'None'}\n`; | |
details += `- Step: ${snapshot.metadata?.step_number || 'Unknown'}\n`; | |
details += `- Dashboard Run: ${snapshot.metadata?.dashboard_run_id || 'None'}\n`; | |
infoPanel.textContent = details; | |
}); | |
// Helper function to reset UI state | |
function resetAgentState() { | |
agentRunning = false; | |
document.getElementById('startButton').disabled = false; | |
document.getElementById('stopButton').disabled = true; | |
// Stop polling for logs | |
stopLogPolling(); | |
} | |
// Snapshot selection handler | |
function handleSnapshotClick(snapshotId) { | |
selectedSnapshotId = snapshotId; | |
updateSnapshotDisplay(); | |
// Enable action buttons | |
document.getElementById('loadSnapshotButton').disabled = false; | |
document.getElementById('viewSnapshotButton').disabled = false; | |
// Show basic info | |
const snapshot = snapshots.find(s => s.id === snapshotId); | |
if (snapshot) { | |
const infoPanel = document.getElementById('snapshotInfoPanel'); | |
infoPanel.textContent = `Selected: ${snapshot.name} (${snapshot.id})\nStep: ${snapshot.metadata?.step_number || 'Unknown'}\nRun: ${snapshot.metadata?.dashboard_run_id ? snapshot.metadata.dashboard_run_id.substring(0, 8) + '...' : 'None'}\n\nClick 'View Details' for more information.`; | |
} | |
} | |
// Update snapshot display highlighting | |
function updateSnapshotDisplay() { | |
// Remove highlighting from all nodes | |
document.querySelectorAll('.snapshot-node').forEach(node => { | |
node.classList.remove('current'); | |
}); | |
// Add current class to current snapshot | |
if (currentSnapshotId) { | |
const currentNode = document.querySelector(`.snapshot-node[data-id="${currentSnapshotId}"]`); | |
if (currentNode) { | |
currentNode.classList.add('current'); | |
} | |
} | |
} | |
// Start polling for new log entries | |
function startLogPolling() { | |
if (logPollingInterval) { | |
clearInterval(logPollingInterval); | |
} | |
if (autoRefreshEnabled) { | |
logPollingInterval = setInterval(fetchLogs, refreshRate); | |
} | |
} | |
// Stop polling for logs | |
function stopLogPolling() { | |
if (logPollingInterval) { | |
clearInterval(logPollingInterval); | |
logPollingInterval = null; | |
} | |
} | |
// Fetch new log entries from the server | |
async function fetchLogs() { | |
try { | |
const response = await fetch(`/logs?position=${logPosition}`); | |
const data = await response.json(); | |
if (data.logs) { | |
appendLogs(data.logs); | |
logPosition = data.nextPosition; | |
} | |
// Check if the agent is still running | |
if (data.agentRunning === false && agentRunning) { | |
document.getElementById('statusDisplay').textContent = "Status: Agent finished"; | |
resetAgentState(); | |
// Fetch snapshots one more time to get final state | |
fetchSnapshots(); | |
} | |
// Check for VNC URL | |
if (data.vncUrl && data.vncUrl !== 'null' && data.vncUrl !== '') { | |
const gameFrame = document.getElementById('gameFrame'); | |
// Store the URL for reload button usage | |
currentVncUrl = data.vncUrl; | |
// Only update the iframe if it's not already showing this URL | |
if (gameFrame.src !== data.vncUrl) { | |
console.log("Setting game frame URL to: " + data.vncUrl); | |
gameFrame.src = data.vncUrl; | |
} | |
} | |
} catch (error) { | |
console.error("Error fetching logs:", error); | |
} | |
} | |
// Fetch snapshots from the server | |
async function fetchSnapshots() { | |
try { | |
const response = await fetch('/snapshots'); | |
const data = await response.json(); | |
if (data.snapshots) { | |
snapshots = data.snapshots; | |
renderSnapshots(data.snapshots); | |
} | |
} catch (error) { | |
console.error("Error fetching snapshots:", error); | |
} | |
} | |
// Render snapshots in the UI | |
function renderSnapshots(snapshotsList) { | |
const snapshotsContainer = document.getElementById('snapshotsList'); | |
snapshotsContainer.innerHTML = ''; | |
if (snapshotsList.length === 0) { | |
snapshotsContainer.textContent = 'No snapshots available for this session'; | |
return; | |
} | |
// Organize snapshots by step | |
const snapshotsByStep = {}; | |
// First, create the parent snapshot entry | |
snapshotsList.forEach(snapshot => { | |
const step = snapshot.metadata?.step_number ? parseInt(snapshot.metadata.step_number) : null; | |
if (!snapshotsByStep[step]) { | |
snapshotsByStep[step] = []; | |
} | |
snapshotsByStep[step].push(snapshot); | |
}); | |
// Sort steps numerically | |
const sortedSteps = Object.keys(snapshotsByStep) | |
.filter(step => step !== 'null') | |
.map(step => parseInt(step)) | |
.sort((a, b) => a - b); | |
// Add parent snapshots (those without step numbers) at the top | |
if (snapshotsByStep['null']) { | |
sortedSteps.unshift('null'); | |
} | |
// Create nodes for each snapshot, grouped by step | |
sortedSteps.forEach(step => { | |
const group = snapshotsByStep[step]; | |
group.forEach(snapshot => { | |
const node = document.createElement('div'); | |
node.className = 'snapshot-node'; | |
node.dataset.id = snapshot.id; | |
// Mark current snapshot | |
if (snapshot.id === currentSnapshotId) { | |
node.classList.add('current'); | |
} | |
// Create snapshot content | |
let nodeHtml = ''; | |
// Title (step number or parent) | |
if (step === 'null') { | |
nodeHtml += `<div class="title">Parent</div>`; | |
} else { | |
nodeHtml += `<div class="title">Step ${step}</div>`; | |
} | |
// ID | |
nodeHtml += `<div class="id">${snapshot.id}</div>`; | |
// Metadata | |
const metadata = snapshot.metadata || {}; | |
let metadataText = []; | |
if (snapshot.name) { | |
metadataText.push(`Name: ${snapshot.name}`); | |
} | |
if (metadata.timestamp) { | |
const date = new Date(parseInt(metadata.timestamp) * 1000); | |
metadataText.push(`Time: ${date.toLocaleTimeString()}`); | |
} | |
if (metadata.dashboard_run_id) { | |
metadataText.push(`Run: ${metadata.dashboard_run_id.substring(0, 8)}...`); | |
} | |
nodeHtml += `<div class="metadata">${metadataText.join(' | ')}</div>`; | |
node.innerHTML = nodeHtml; | |
// Add click handler | |
node.addEventListener('click', () => handleSnapshotClick(snapshot.id)); | |
snapshotsContainer.appendChild(node); | |
}); | |
}); | |
} | |
// Append new log entries to the console output | |
function appendLogs(logs) { | |
if (!logs || logs.length === 0) return; | |
const consoleOutput = document.getElementById('consoleOutput'); | |
// Add each log line | |
logs.forEach(line => { | |
// Create a new div for each line | |
const logLine = document.createElement('div'); | |
logLine.textContent = line; | |
// Add color based on log content | |
if (line.includes('[ERROR]') || line.includes('Error')) { | |
logLine.style.color = '#ff5252'; | |
} else if (line.includes('[WARNING]') || line.includes('Warning')) { | |
logLine.style.color = '#ffb142'; | |
} else if (line.includes('[Claude]')) { | |
logLine.style.color = '#4fc3f7'; | |
} else if (line.includes('[Tool Use]') || line.includes('[Claude Action]')) { | |
logLine.style.color = '#66bb6a'; | |
} else if (line.includes('Snapshot created')) { | |
logLine.style.color = '#ba68c8'; | |
} | |
consoleOutput.appendChild(logLine); | |
}); | |
// Auto-scroll to bottom | |
consoleOutput.scrollTop = consoleOutput.scrollHeight; | |
} | |
// Fetch snapshots periodically when agent is running | |
if (autoRefreshEnabled) { | |
setInterval(() => { | |
if (agentRunning) { | |
fetchSnapshots(); | |
} | |
}, 5000); // Check for new snapshots every 5 seconds | |
} | |
// Reload VNC button handler | |
document.getElementById('reloadVncButton').addEventListener('click', function() { | |
const gameFrame = document.getElementById('gameFrame'); | |
// Store the current URL | |
if (gameFrame.src && gameFrame.src !== 'about:blank') { | |
currentVncUrl = gameFrame.src; | |
} | |
// If we have a VNC URL, reload it | |
if (currentVncUrl) { | |
console.log("Reloading VNC iframe with URL: " + currentVncUrl); | |
// First clear the frame | |
gameFrame.src = 'about:blank'; | |
// Then after a brief delay, set it back to the VNC URL | |
setTimeout(() => { | |
gameFrame.src = currentVncUrl; | |
}, 500); | |
// Log to console | |
const timestamp = new Date().toLocaleTimeString(); | |
const consoleOutput = document.getElementById('consoleOutput'); | |
const logLine = document.createElement('div'); | |
logLine.textContent = `[${timestamp}] VNC display manually reloaded`; | |
logLine.style.color = '#ffb142'; | |
consoleOutput.appendChild(logLine); | |
consoleOutput.scrollTop = consoleOutput.scrollHeight; | |
} else { | |
console.log("No VNC URL available to reload"); | |
} | |
}); | |
</script> | |
</body> | |
</html> | |
""" | |
# Initialize Flask app | |
app = Flask(__name__) | |
# Global variables for agent state | |
agent_process = None | |
agent_logs = deque(maxlen=1000) # Store up to 1000 log lines | |
log_lock = threading.Lock() | |
agent_running = False | |
vnc_url = None | |
parent_snapshot_id = None | |
morph_client = None | |
def extract_vnc_url(line): | |
"""Extract VNC URL from log line""" | |
match = re.search(r"(https://novnc-[^\s]*\.http\.cloud\.morph\.so[^\s]*)", line) | |
if match: | |
return match.group(1) | |
return None | |
def extract_snapshot_id(line): | |
"""Extract snapshot ID from log line""" | |
match = re.search(r"Snapshot created with ID: ([a-zA-Z0-9_]+)", line) | |
if match: | |
return match.group(1) | |
return None | |
def log_reader(process): | |
"""Read logs from the process stdout/stderr in real-time""" | |
global agent_logs, agent_running, vnc_url | |
for line in iter(process.stdout.readline, b''): | |
try: | |
decoded_line = line.decode('utf-8').rstrip() | |
# Check if this line contains the VNC URL | |
extracted_url = extract_vnc_url(decoded_line) | |
if extracted_url: | |
print(f"Found VNC URL: {extracted_url}") | |
vnc_url = extracted_url | |
# Add timestamp to the log line | |
timestamp = time.strftime("%H:%M:%S", time.localtime()) | |
log_line = f"[{timestamp}] {decoded_line}" | |
# Add to log buffer with thread safety | |
with log_lock: | |
agent_logs.append(log_line) | |
print(log_line) | |
except Exception as e: | |
print(f"Error processing log line: {e}") | |
# Process has ended | |
with log_lock: | |
agent_logs.append(f"[{time.strftime('%H:%M:%S', time.localtime())}] Agent process terminated") | |
agent_running = False | |
def initialize_morph_client(): | |
"""Initialize MorphCloud client if possible""" | |
global morph_client | |
if MorphCloudClient is not None: | |
try: | |
morph_client = MorphCloudClient() | |
print("MorphCloud client initialized successfully") | |
return True | |
except Exception as e: | |
print(f"Error initializing MorphCloud client: {e}") | |
return False | |
@app.route('/') | |
def index(): | |
"""Serve the main page""" | |
return HTML_TEMPLATE | |
@app.route('/logs') | |
def get_logs(): | |
"""Get new log entries since the given position""" | |
global agent_logs, vnc_url | |
position = int(request.args.get('position', 0)) | |
with log_lock: | |
# Convert deque to list for easier slicing | |
all_logs = list(agent_logs) | |
# Get logs from the requested position | |
if position < len(all_logs): | |
new_logs = all_logs[position:] | |
next_position = len(all_logs) | |
else: | |
new_logs = [] | |
next_position = position | |
return jsonify({ | |
"logs": new_logs, | |
"nextPosition": next_position, | |
"agentRunning": agent_running, | |
"vncUrl": vnc_url | |
}) | |
@app.route('/snapshots') | |
def get_snapshots(): | |
"""Get snapshots for the current session""" | |
global morph_client, parent_snapshot_id | |
if morph_client is None: | |
return jsonify({"snapshots": [], "error": "MorphCloud client not available"}) | |
if parent_snapshot_id is None: | |
return jsonify({"snapshots": [], "message": "No parent snapshot set for this session"}) | |
try: | |
# Get snapshots that have our dashboard run ID in metadata | |
# Extract the original snapshot ID if we're using it for tracking | |
snapshots = morph_client.snapshots.list(metadata={"dashboard_run_id": parent_snapshot_id}) | |
# Convert to dictionaries for JSON serialization | |
snapshot_dicts = [] | |
for snapshot in snapshots: | |
snapshot_dict = { | |
"id": snapshot.id, | |
"name": getattr(snapshot, 'name', None), | |
"created": getattr(snapshot, 'created', 0), | |
"metadata": getattr(snapshot, 'metadata', {}) | |
} | |
snapshot_dicts.append(snapshot_dict) | |
return jsonify({"snapshots": snapshot_dicts}) | |
except Exception as e: | |
print(f"Error fetching snapshots: {e}") | |
return jsonify({"snapshots": [], "error": str(e)}) | |
@app.route('/start', methods=['POST']) | |
def start_agent(): | |
"""Start the Pokemon agent""" | |
global agent_process, agent_running, agent_logs, vnc_url, parent_snapshot_id | |
# Check if agent is already running | |
if agent_running: | |
return jsonify({"success": False, "error": "Agent is already running"}) | |
try: | |
data = request.json | |
snapshot_id = data.get('snapshotId') | |
steps = data.get('steps', 10) | |
# Always create a new run ID for each agent start | |
# This ensures previous snapshots don't appear in the current run's view | |
run_timestamp = int(time.time()) | |
parent_snapshot_id = f"{snapshot_id}_{run_timestamp}" | |
print(f"Setting new run ID for this session: {parent_snapshot_id}") | |
# Clear previous logs | |
with log_lock: | |
agent_logs.clear() | |
vnc_url = None | |
# Check if the agent script exists | |
if not os.path.exists("minimal_agent.py"): | |
return jsonify({ | |
"success": False, | |
"error": "minimal_agent.py not found. Please ensure the agent file is in the current directory." | |
}) | |
# Extract the original snapshot ID if we're using a combined run ID | |
actual_snapshot_id = snapshot_id | |
actual_parent_id = parent_snapshot_id | |
# If parent_snapshot_id contains a timestamp, extract the actual snapshot ID | |
if '_' in parent_snapshot_id: | |
parts = parent_snapshot_id.split('_') | |
if len(parts) >= 2: # Ensure it has the expected format | |
actual_parent_id = parts[0] # First part is the actual snapshot ID | |
# Build command | |
cmd = [ | |
sys.executable, | |
"minimal_agent.py", | |
"--snapshot-id", actual_snapshot_id, | |
"--steps", str(steps), | |
"--no-browser", # Suppress browser auto-open since we're using the dashboard | |
"--parent-snapshot-id", actual_parent_id, # The actual snapshot for lineage | |
"--dashboard-run-id", parent_snapshot_id, # The combined run ID for filtering | |
"--snapshot-prefix", f"dash_{int(time.time())}" | |
] | |
# Start the process with pipes for stdout/stderr | |
print(f"Starting agent with command: {' '.join(cmd)}") | |
agent_process = subprocess.Popen( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
bufsize=1, | |
universal_newlines=False | |
) | |
agent_running = True | |
# Start a thread to read the logs | |
log_thread = threading.Thread(target=log_reader, args=(agent_process,)) | |
log_thread.daemon = True | |
log_thread.start() | |
# Add initial log entry | |
with log_lock: | |
timestamp = time.strftime("%H:%M:%S", time.localtime()) | |
agent_logs.append(f"[{timestamp}] Started agent with snapshot {snapshot_id} for {steps} steps") | |
agent_logs.append(f"[{timestamp}] Using parent snapshot {parent_snapshot_id} for lineage tracking") | |
agent_logs.append(f"[{timestamp}] All snapshots will be tagged with dashboard_run_id={parent_snapshot_id}") | |
return jsonify({ | |
"success": True, | |
"message": "Agent started" | |
}) | |
except Exception as e: | |
print(f"Error starting agent: {e}") | |
agent_running = False | |
return jsonify({"success": False, "error": str(e)}) | |
@app.route('/stop', methods=['POST']) | |
def stop_agent(): | |
global agent_process, agent_running | |
# Wrap everything in try/except to prevent server crashes | |
try: | |
if not agent_running or agent_process is None: | |
return jsonify({"success": False, "error": "No agent is running"}) | |
# Log that we're attempting to stop | |
print(f"Attempting to stop agent process (PID: {agent_process.pid})") | |
# Create a local reference to the process | |
process_to_stop = agent_process | |
# Clear global references first to avoid deadlocks | |
agent_running = False | |
agent_process = None | |
# Then terminate the process | |
try: | |
process_to_stop.terminate() | |
process_to_stop.wait(timeout=2) | |
except Exception as inner_e: | |
print(f"Error during graceful termination: {inner_e}") | |
try: | |
process_to_stop.kill() | |
except: | |
pass # Already dead or can't be killed | |
# Add log entry | |
with log_lock: | |
timestamp = time.strftime("%H:%M:%S", time.localtime()) | |
agent_logs.append(f"[{timestamp}] Agent stopped") | |
return jsonify({ | |
"success": True, | |
"message": "Agent stopped" | |
}) | |
except Exception as e: | |
# Critical error handling - log it but don't crash | |
print(f"CRITICAL ERROR in stop_agent: {e}") | |
import traceback | |
traceback.print_exc() | |
# Reset state to be safe | |
agent_running = False | |
agent_process = None | |
# Always return a response | |
return jsonify({"success": False, "error": f"Server error: {str(e)}"}) | |
def main(): | |
"""Main function""" | |
print("Pokemon Agent Dashboard") | |
print("======================") | |
print("1. Make sure your minimal_agent.py file is in the current directory") | |
print("2. Opening browser to http://127.0.0.1:5001/") | |
print("3. Press Ctrl+C to stop the server") | |
# Initialize MorphCloud client | |
initialize_morph_client() | |
# Open browser automatically | |
webbrowser.open("http://127.0.0.1:5001/") | |
# Run the Flask app | |
app.run(host='0.0.0.0', port=5001, threaded=True) | |
if __name__ == '__main__': | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# /// script | |
# dependencies = [ | |
# "morphcloud", | |
# "requests", | |
# "pillow", | |
# "rich", | |
# "anthropic", | |
# ] | |
# /// | |
"""Run a non-interactive server agent that plays Pokemon automatically. | |
This script combines the EmulatorClient and PokemonAgent to set up a basic agent. | |
""" | |
import io | |
import sys | |
import json | |
import copy | |
import typing | |
from typing import Dict, List, Optional, Any | |
import base64 | |
import logging | |
import argparse | |
import time | |
import requests | |
import webbrowser | |
from PIL import Image | |
from anthropic import Anthropic | |
from rich.console import Console | |
from morphcloud.api import MorphCloudClient | |
# Set up logging - this will be configured properly in main() based on command line args | |
logger = logging.getLogger(__name__) | |
# Configuration | |
MAX_TOKENS = 4096 | |
MODEL_NAME = "claude-3-7-sonnet-20250219" | |
TEMPERATURE = 0.7 | |
USE_NAVIGATOR = True | |
class EmulatorClient: | |
def __init__(self, host: str = "127.0.0.1", port: int = 9876): | |
# Check if host already includes the protocol, if not add http:// | |
if host.startswith("http://") or host.startswith("https://"): | |
# For MorphVM URLs, don't append port as it's handled by the URL routing | |
if "cloud.morph.so" in host or port is None: | |
self.base_url = host | |
# For other URLs, handle port as before | |
elif ":" not in host.split("/")[-1]: | |
self.base_url = f"{host}:{port}" | |
else: | |
# Host already has port, use it as is | |
self.base_url = host | |
else: | |
# For MorphVM URLs, don't append port | |
if "cloud.morph.so" in host: | |
self.base_url = f"https://{host}" | |
else: | |
self.base_url = f"http://{host}:{port}" | |
logger.info(f"Initialized client connecting to {self.base_url}") | |
def get_screenshot(self): | |
"""Get current screenshot as PIL Image""" | |
response = requests.get(f"{self.base_url}/api/screenshot") | |
if response.status_code != 200: | |
logger.error(f"Error getting screenshot: {response.status_code}") | |
return None | |
return Image.open(io.BytesIO(response.content)) | |
def get_screenshot_base64(self): | |
"""Get current screenshot as base64 string""" | |
response = requests.get(f"{self.base_url}/api/screenshot") | |
if response.status_code != 200: | |
logger.error(f"Error getting screenshot: {response.status_code}") | |
return "" | |
return base64.b64encode(response.content).decode("utf-8") | |
def get_game_state(self): | |
"""Get complete game state from server""" | |
response = requests.get(f"{self.base_url}/api/game_state") | |
if response.status_code != 200: | |
logger.error( | |
f"Error response from server: {response.status_code} - {response.text}" | |
) | |
return {} | |
try: | |
return response.json() | |
except json.JSONDecodeError as e: | |
logger.error(f"JSON decode error: {e}") | |
logger.error(f"Response content: {response.text[:100]}...") | |
return {} | |
# Compatibility methods to match Emulator interface | |
def get_state_from_memory(self): | |
"""Get game state string - mimics Emulator.get_state_from_memory()""" | |
state_data = self.get_game_state() | |
return state_data.get("game_state", "") | |
def get_collision_map(self): | |
"""Get collision map - mimics Emulator.get_collision_map()""" | |
state_data = self.get_game_state() | |
return state_data.get("collision_map", "") | |
def get_valid_moves(self): | |
"""Get valid moves - mimics Emulator.get_valid_moves()""" | |
state_data = self.get_game_state() | |
return state_data.get("valid_moves", []) | |
def find_path(self, row, col): | |
"""Find path to position - mimics Emulator.find_path()""" | |
result = self.navigate(row, col) | |
if not isinstance(result, dict): | |
return "Failed to navigate", [] | |
return result.get("status", "Navigation failed"), result.get("path", []) | |
def press_buttons( | |
self, buttons, wait=True, include_state=False, include_screenshot=False | |
): | |
"""Press a sequence of buttons on the Game Boy | |
Args: | |
buttons: List of buttons to press | |
wait: Whether to pause briefly after each button press | |
include_state: Whether to include game state in response | |
include_screenshot: Whether to include screenshot in response | |
Returns: | |
dict: Response data which may include button press result, game state, and screenshot | |
""" | |
data = { | |
"buttons": buttons, | |
"wait": wait, | |
"include_state": include_state, | |
"include_screenshot": include_screenshot, | |
} | |
response = requests.post(f"{self.base_url}/api/press_buttons", json=data) | |
if response.status_code != 200: | |
logger.error( | |
f"Error pressing buttons: {response.status_code} - {response.text}" | |
) | |
return {"error": f"Error: {response.status_code}"} | |
return response.json() | |
def navigate(self, row, col, include_state=False, include_screenshot=False): | |
"""Navigate to a specific position on the grid | |
Args: | |
row: Target row coordinate | |
col: Target column coordinate | |
include_state: Whether to include game state in response | |
include_screenshot: Whether to include screenshot in response | |
Returns: | |
dict: Response data which may include navigation result, game state, and screenshot | |
""" | |
data = { | |
"row": row, | |
"col": col, | |
"include_state": include_state, | |
"include_screenshot": include_screenshot, | |
} | |
response = requests.post(f"{self.base_url}/api/navigate", json=data) | |
if response.status_code != 200: | |
logger.error(f"Error navigating: {response.status_code} - {response.text}") | |
return {"status": f"Error: {response.status_code}", "path": []} | |
return response.json() | |
def read_memory(self, address): | |
"""Read a specific memory address""" | |
response = requests.get(f"{self.base_url}/api/memory/{address}") | |
if response.status_code != 200: | |
logger.error( | |
f"Error reading memory: {response.status_code} - {response.text}" | |
) | |
return {"error": f"Error: {response.status_code}"} | |
return response.json() | |
def load_state(self, state_path): | |
"""Load a saved state""" | |
data = {"state_path": state_path} | |
response = requests.post(f"{self.base_url}/api/load_state", json=data) | |
if response.status_code != 200: | |
logger.error( | |
f"Error loading state: {response.status_code} - {response.text}" | |
) | |
return {"error": f"Error: {response.status_code}"} | |
return response.json() | |
def save_screenshot(self, filename="screenshot.png"): | |
"""Save current screenshot to a file""" | |
screenshot = self.get_screenshot() | |
if screenshot: | |
screenshot.save(filename) | |
logger.info(f"Screenshot saved as {filename}") | |
return True | |
return False | |
def initialize(self, max_retries=5, retry_delay=3): | |
""" | |
Initialize method with retry capability for compatibility with Emulator | |
Args: | |
max_retries (int): Maximum number of retry attempts | |
retry_delay (int): Delay between retries in seconds | |
Returns: | |
bool: True if server is ready, False otherwise | |
""" | |
logger.info(f"Client initialization requested (compatibility method) with {max_retries} retries") | |
# Implement retry logic | |
for attempt in range(1, max_retries + 1): | |
try: | |
logger.info(f"Checking server status (attempt {attempt}/{max_retries})") | |
response = requests.get(f"{self.base_url}/api/status", timeout=10) | |
status = response.json() | |
ready = status.get("ready", False) | |
if ready: | |
logger.info("Server reports ready status") | |
return True | |
else: | |
logger.warning(f"Server reports not ready (attempt {attempt}/{max_retries})") | |
# If not ready and we have more attempts, wait before trying again | |
if attempt < max_retries: | |
logger.info(f"Waiting {retry_delay} seconds before retry...") | |
time.sleep(retry_delay) | |
except requests.exceptions.Timeout: | |
logger.warning(f"Connection timeout (attempt {attempt}/{max_retries})") | |
if attempt < max_retries: | |
time.sleep(retry_delay) | |
except requests.exceptions.ConnectionError as e: | |
logger.warning(f"Connection error: {e} (attempt {attempt}/{max_retries})") | |
if attempt < max_retries: | |
time.sleep(retry_delay) | |
except Exception as e: | |
logger.error(f"Error checking server status: {e} (attempt {attempt}/{max_retries})") | |
if attempt < max_retries: | |
time.sleep(retry_delay) | |
logger.error(f"Server not ready after {max_retries} attempts") | |
return False | |
def stop(self): | |
"""Empty stop method for compatibility with Emulator""" | |
logger.info("Client stop requested (compatibility method)") | |
# Nothing to do for client | |
pass | |
def get_screenshot_base64(screenshot, upscale=1): | |
"""Convert PIL image to base64 string.""" | |
# Resize if needed | |
if upscale > 1: | |
new_size = (screenshot.width * upscale, screenshot.height * upscale) | |
screenshot = screenshot.resize(new_size) | |
# Convert to base64 | |
buffered = io.BytesIO() | |
screenshot.save(buffered, format="PNG") | |
return base64.standard_b64encode(buffered.getvalue()).decode() | |
class PokemonAgent: | |
def __init__( | |
self, | |
server_host="127.0.0.1", | |
server_port: typing.Optional[int] = 9876, | |
max_history=60, | |
display_config=None, | |
morph_client=None, # Add MorphCloudClient as a parameter | |
parent_snapshot_id=None, # Add parent snapshot ID parameter | |
dashboard_run_id=None, # Add dashboard run ID parameter | |
): | |
"""Initialize the server agent. | |
Args: | |
server_host: Host where the game server is running | |
server_port: Port number of the game server | |
max_history: Maximum number of messages in history before summarization | |
display_config: Dictionary with display configuration options | |
morph_client: Optional MorphCloudClient instance for snapshot creation | |
parent_snapshot_id: Optional ID of the parent snapshot for lineage tracking | |
dashboard_run_id: Optional ID for grouping snapshots by dashboard run | |
""" | |
self.client = EmulatorClient(host=server_host, port=server_port or 9876) | |
self.anthropic = Anthropic() | |
self.running = True | |
self.message_history = [ | |
{"role": "user", "content": "You may now begin playing."} | |
] | |
self.max_history = max_history | |
# Store the MorphCloud client and snapshot tracking IDs | |
self.morph_client = morph_client | |
self.parent_snapshot_id = parent_snapshot_id | |
self.dashboard_run_id = dashboard_run_id or parent_snapshot_id # Use parent as fallback | |
self.last_snapshot_id = parent_snapshot_id # Track the last created snapshot ID | |
# Set display configuration with defaults | |
self.display_config = display_config or { | |
"show_game_state": False, | |
"show_collision_map": False, | |
"quiet_mode": False, | |
} | |
# Log initialization with chosen configuration | |
logger.debug(f"Agent initialized with display config: {self.display_config}") | |
if self.morph_client and self.parent_snapshot_id: | |
logger.info(f"Snapshot tracking enabled. Parent snapshot: {self.parent_snapshot_id}") | |
if self.dashboard_run_id: | |
logger.info(f"Dashboard run ID for grouping snapshots: {self.dashboard_run_id}") | |
# Check if the server is ready | |
if not self.client.initialize(): | |
logger.error( | |
"Server not ready - please start the server before running the agent" | |
) | |
raise RuntimeError("Server not ready") | |
SYSTEM_PROMPT = """You are playing Pokemon Red. You can see the game screen and control the game by executing emulator commands. | |
Your goal is to play through Pokemon Red and eventually defeat the Elite Four. Make decisions based on what you see on the screen. | |
check your tools! for example, try to use 'navigate_to' to help you move faster and better when looking at the map, with positions from game state. | |
Before each action, explain your reasoning briefly, plan your immediate next few steps needed (low level tool calls and actions, e.g. 'to reach the Cave from here, I need to go 1. right, 2. right, 3. right, 4. up', not high level goals like '1. explore the Cave 2. ??? 3. win!') to get there, then use the available actions to execute the next step in the game. | |
The game commands always register perfectly, so if you see no reaction to them, you have made an invalid command and misunderstood the game state. In battles, when you see an attack that isn't effective, you should examine your assumptions and update your beliefs. In general, search the solution space (try different things) before getting stuck in ruts. | |
Mistakes you have made before: | |
- do not talk to NPCs | |
- do not plan with high level goals | |
- DON'T FIGHT ANY BATTLES IF YOU CAN HELP IT. IF YOU ENCOUNTER A WILD (non trainer) BATTLE - JUST RUN. JUST RUN. | |
- do not insist on your prior knowledge about what attacks are strong against what types of Pokemon works when the evidence is the opposite | |
The conversation history may occasionally be summarized to save context space. If you see a message labeled "CONVERSATION HISTORY SUMMARY", this contains the key information about your progress so far. Use this information to maintain continuity in your gameplay.""" | |
SUMMARY_PROMPT = """I need you to create a detailed summary of our conversation history up to this point. This summary will replace the full conversation history to manage the context window. | |
Please include: | |
1. Key game events and milestones you've reached | |
2. Important decisions you've made | |
3. Current objectives or goals you're working toward | |
4. Your current location and Pokémon team status | |
5. Any strategies or plans you've mentioned | |
The summary should be comprehensive enough that you can continue gameplay without losing important context about what has happened so far.""" | |
AVAILABLE_TOOLS = [ | |
{ | |
"name": "press_buttons", | |
"description": "Press a sequence of buttons on the Game Boy.", | |
"input_schema": { | |
"type": "object", | |
"properties": { | |
"buttons": { | |
"type": "array", | |
"items": { | |
"type": "string", | |
"enum": [ | |
"a", | |
"b", | |
"start", | |
"select", | |
"up", | |
"down", | |
"left", | |
"right", | |
], | |
}, | |
"description": "List of buttons to press in sequence. Valid buttons: 'a', 'b', 'start', 'select', 'up', 'down', 'left', 'right'", | |
}, | |
"wait": { | |
"type": "boolean", | |
"description": "Whether to wait for a brief period after pressing each button. Defaults to true.", | |
}, | |
}, | |
"required": ["buttons"], | |
}, | |
} | |
] | |
# Add navigation tool if enabled | |
if USE_NAVIGATOR: | |
AVAILABLE_TOOLS.append( | |
{ | |
"name": "navigate_to", | |
"description": "Automatically navigate to a position on the map grid. The screen is divided into a 9x10 grid, with the top-left corner as (0, 0). This tool is only available in the overworld.", | |
"input_schema": { | |
"type": "object", | |
"properties": { | |
"row": { | |
"type": "integer", | |
"description": "The row coordinate to navigate to (0-8).", | |
}, | |
"col": { | |
"type": "integer", | |
"description": "The column coordinate to navigate to (0-9).", | |
}, | |
}, | |
"required": ["row", "col"], | |
}, | |
} | |
) | |
def process_tool_call(self, tool_call): | |
"""Process a single tool call.""" | |
tool_name = tool_call.name | |
tool_input = tool_call.input | |
# In quiet mode, only log at debug level | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"Processing tool call: {tool_name}") | |
else: | |
logger.info(f"Processing tool call: {tool_name}") | |
if tool_name == "press_buttons": | |
buttons = tool_input["buttons"] | |
wait = tool_input.get("wait", True) | |
# Log the button press action | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"[Buttons] Pressing: {buttons} (wait={wait})") | |
else: | |
logger.info(f"[Buttons] Pressing: {buttons} (wait={wait})") | |
# Use enhanced client method to get result, state, and screenshot in one call | |
response = self.client.press_buttons( | |
buttons, wait=wait, include_state=True, include_screenshot=True | |
) | |
# Extract results from response | |
result = response.get("result", f"Pressed buttons: {', '.join(buttons)}") | |
# Get game state from response or fetch it if not included | |
if "game_state" in response: | |
memory_info = response["game_state"].get("game_state", "") | |
if self.display_config["show_game_state"]: | |
logger.info(f"[Memory State from response]") | |
logger.info(memory_info) | |
else: | |
logger.debug(f"[Memory State from response]") | |
logger.debug(memory_info) | |
collision_map = response["game_state"].get("collision_map", "") | |
if collision_map and self.display_config["show_collision_map"]: | |
logger.info(f"[Collision Map from response]\n{collision_map}") | |
elif collision_map: | |
logger.debug(f"[Collision Map from response]\n{collision_map}") | |
else: | |
# Fallback to separate calls if state not included | |
memory_info = self.client.get_state_from_memory() | |
if self.display_config["show_game_state"]: | |
logger.info(f"[Memory State after action]") | |
logger.info(memory_info) | |
else: | |
logger.debug(f"[Memory State after action]") | |
logger.debug(memory_info) | |
collision_map = self.client.get_collision_map() | |
if collision_map and self.display_config["show_collision_map"]: | |
logger.info(f"[Collision Map after action]\n{collision_map}") | |
elif collision_map: | |
logger.debug(f"[Collision Map after action]\n{collision_map}") | |
# Get screenshot from response or fetch it if not included | |
if "screenshot" in response: | |
screenshot_b64 = response["screenshot"] | |
else: | |
screenshot = self.client.get_screenshot() | |
screenshot_b64 = get_screenshot_base64(screenshot, upscale=2) | |
# Build response content based on display configuration | |
content = [ | |
{"type": "text", "text": f"Pressed buttons: {', '.join(buttons)}"}, | |
{ | |
"type": "text", | |
"text": "\nHere is a screenshot of the screen after your button presses:", | |
}, | |
{ | |
"type": "image", | |
"source": { | |
"type": "base64", | |
"media_type": "image/png", | |
"data": screenshot_b64, | |
}, | |
}, | |
] | |
# Add game state to Claude's view if enabled | |
content.append( | |
{ | |
"type": "text", | |
"text": f"\nGame state information from memory after your action:\n{memory_info}", | |
} | |
) | |
# Return tool result as a dictionary | |
return { | |
"type": "tool_result", | |
"tool_use_id": tool_call.id, | |
"content": content, | |
} | |
elif tool_name == "navigate_to": | |
row = tool_input["row"] | |
col = tool_input["col"] | |
# Log the navigation action | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"[Navigation] Navigating to: ({row}, {col})") | |
else: | |
logger.info(f"[Navigation] Navigating to: ({row}, {col})") | |
# Use enhanced client method to get result, state, and screenshot in one call | |
response = self.client.navigate( | |
row, col, include_state=True, include_screenshot=True | |
) | |
# Extract navigation result | |
status = response.get("status", "Unknown status") | |
path = response.get("path", []) | |
if path: | |
result = f"Navigation successful: followed path with {len(path)} steps" | |
else: | |
result = f"Navigation failed: {status}" | |
# Get game state from response or fetch it if not included | |
if "game_state" in response: | |
memory_info = response["game_state"].get("game_state", "") | |
if self.display_config["show_game_state"]: | |
logger.info(f"[Memory State from response]") | |
logger.info(memory_info) | |
else: | |
logger.debug(f"[Memory State from response]") | |
logger.debug(memory_info) | |
collision_map = response["game_state"].get("collision_map", "") | |
if collision_map and self.display_config["show_collision_map"]: | |
logger.info(f"[Collision Map from response]\n{collision_map}") | |
elif collision_map: | |
logger.debug(f"[Collision Map from response]\n{collision_map}") | |
else: | |
# Fallback to separate calls if state not included | |
memory_info = self.client.get_state_from_memory() | |
if self.display_config["show_game_state"]: | |
logger.info(f"[Memory State after action]") | |
logger.info(memory_info) | |
else: | |
logger.debug(f"[Memory State after action]") | |
logger.debug(memory_info) | |
collision_map = self.client.get_collision_map() | |
if collision_map and self.display_config["show_collision_map"]: | |
logger.info(f"[Collision Map after action]\n{collision_map}") | |
elif collision_map: | |
logger.debug(f"[Collision Map after action]\n{collision_map}") | |
# Get screenshot from response or fetch it if not included | |
if "screenshot" in response: | |
screenshot_b64 = response["screenshot"] | |
else: | |
screenshot = self.client.get_screenshot() | |
screenshot_b64 = get_screenshot_base64(screenshot, upscale=2) | |
# Build response content based on display configuration | |
content = [ | |
{"type": "text", "text": f"Navigation result: {result}"}, | |
{ | |
"type": "text", | |
"text": "\nHere is a screenshot of the screen after navigation:", | |
}, | |
{ | |
"type": "image", | |
"source": { | |
"type": "base64", | |
"media_type": "image/png", | |
"data": screenshot_b64, | |
}, | |
}, | |
] | |
# Add game state to Claude's view if enabled | |
content.append( | |
{ | |
"type": "text", | |
"text": f"\nGame state information from memory after your action:\n{memory_info}", | |
} | |
) | |
# Return tool result as a dictionary | |
return { | |
"type": "tool_result", | |
"tool_use_id": tool_call.id, | |
"content": content, | |
} | |
else: | |
logger.error(f"Unknown tool called: {tool_name}") | |
return { | |
"type": "tool_result", | |
"tool_use_id": tool_call.id, | |
"content": [ | |
{"type": "text", "text": f"Error: Unknown tool '{tool_name}'"} | |
], | |
} | |
async def brainstorm_action_sequences(self) -> List[str]: | |
"""Use Claude to brainstorm different action sequences to try.""" | |
game_state = self.client.get_game_state() | |
prompt = f"""Current game state: | |
{game_state} | |
Brainstorm 3 different strategic approaches for what to do next in Pokemon Red: | |
1. A cautious exploration-focused approach | |
2. A progress-focused approach | |
3. A resource/preparation-focused approach | |
For each approach, describe the specific next actions to take in 2-3 sentences. | |
Be concrete and specific about the immediate next steps, not high-level goals. | |
""" | |
response = await self.anthropic.messages.create( | |
model="claude-3-7-sonnet-20250219", | |
max_tokens=1000, | |
messages=[{ | |
"role": "user", | |
"content": prompt | |
}] | |
) | |
return [response.content[0].text] # Return as a list of strategies | |
async def evaluate_action_branches(self, instance_id: str, steps_per_branch: int = 5) -> dict: | |
""" | |
Create and evaluate multiple branches with different action sequences. | |
Args: | |
instance_id: ID of the current instance to branch from | |
steps_per_branch: Number of steps to simulate for each branch | |
Returns: | |
Dictionary containing evaluation results for each branch | |
""" | |
# Get brainstormed strategies from Claude | |
strategies = await self.brainstorm_action_sequences() | |
# Create branches using MorphCloud | |
snapshot, clones = self.morph_client.instances.get(instance_id).branch(count=len(strategies)) | |
logger.info(f"Created snapshot {snapshot.id} with {len(clones)} branches") | |
results = {} | |
# For each branch, create a new agent instance and let it play through the strategy | |
for clone, strategy in zip(clones, strategies): | |
# Create a new agent instance for this branch | |
branch_agent = PokemonAgent( | |
server_host=self.client.base_url, | |
morph_client=self.morph_client, | |
parent_snapshot_id=clone.id | |
) | |
# Initialize the branch agent with the strategy | |
branch_agent.message_history = [ | |
{"role": "user", "content": f"You are exploring this strategic approach: {strategy}. Take actions aligned with this strategy."} | |
] | |
# Let the branch agent play for the specified number of steps | |
branch_result = await self._evaluate_single_branch(clone.id, branch_agent, steps_per_branch) | |
results[clone.id] = branch_result | |
# Clean up the branch agent | |
branch_agent.stop() | |
return { | |
"snapshot_id": snapshot.id, | |
"branch_results": results | |
} | |
async def _evaluate_single_branch(self, instance_id: str, branch_agent: 'PokemonAgent', num_steps: int) -> dict: | |
"""Evaluate a single branch by letting an agent play through it.""" | |
game_states = [] | |
actions_taken = [] | |
# Let the branch agent play for the specified number of steps | |
for _ in range(num_steps): | |
if not branch_agent.running: | |
break | |
# Process one step and capture the game state | |
await branch_agent.process_next_step() | |
game_state = branch_agent.client.get_game_state() | |
game_states.append(game_state) | |
# Record the last action taken (from message history) | |
if branch_agent.message_history: | |
last_message = branch_agent.message_history[-1] | |
if last_message["role"] == "assistant": | |
actions_taken.append(str(last_message["content"])) | |
# Get Claude's assessment | |
assessment = await self._get_branch_assessment(game_states) | |
return { | |
"instance_id": instance_id, | |
"actions_taken": actions_taken, | |
"game_states": game_states, | |
"claude_assessment": assessment | |
} | |
async def _get_branch_assessment(self, game_states: List[str]) -> str: | |
"""Get Claude's assessment of the branch outcome.""" | |
prompt = f"""Analyze this sequence of Pokemon game states and actions: | |
{chr(10).join(game_states)} | |
Provide a brief assessment of: | |
1. The effectiveness of this action sequence | |
2. Any risks or opportunities identified | |
3. Whether this branch seems promising to pursue | |
""" | |
response = await self.anthropic.messages.create( | |
model="claude-3-7-sonnet-20250219", | |
max_tokens=300, | |
messages=[{ | |
"role": "user", | |
"content": prompt | |
}] | |
) | |
return response.content[0].text | |
async def run_with_branching(self, num_steps=1000, instance_id=None, snapshot_name_prefix=None, branching_interval=20): | |
""" | |
Run the agent with periodic branching to explore different strategies. | |
Args: | |
num_steps: Total number of steps to run | |
instance_id: ID of the current instance to branch from | |
snapshot_name_prefix: Prefix for naming snapshots | |
branching_interval: Number of steps between branching evaluations | |
""" | |
steps_taken = 0 | |
while steps_taken < num_steps and self.running: | |
# Run normal steps for a while | |
for _ in range(branching_interval): | |
if not self.running or steps_taken >= num_steps: | |
break | |
await self.process_next_step() | |
steps_taken += 1 | |
if self.running and steps_taken < num_steps: | |
# Evaluate branches to find the best path forward | |
branch_results = await self.evaluate_action_branches(instance_id, steps_per_branch=5) | |
# Log branch evaluations | |
logger.info(f"Branch evaluations at step {steps_taken}:") | |
for branch_id, result in branch_results["branch_results"].items(): | |
logger.info(f"\nBranch {branch_id}:") | |
logger.info(f"Assessment: {result['claude_assessment']}") | |
# Could add logic here to automatically select and switch to most promising branch | |
return steps_taken | |
def process_next_step(self): | |
"""Process the next step in the agent loop.""" | |
messages = copy.deepcopy(self.message_history) | |
if len(messages) >= 3: | |
if ( | |
messages[-1]["role"] == "user" | |
and isinstance(messages[-1]["content"], list) | |
and messages[-1]["content"] | |
): | |
messages[-1]["content"][-1]["cache_control"] = { | |
"type": "ephemeral" | |
} | |
if ( | |
len(messages) >= 5 | |
and messages[-3]["role"] == "user" | |
and isinstance(messages[-3]["content"], list) | |
and messages[-3]["content"] | |
): | |
messages[-3]["content"][-1]["cache_control"] = { | |
"type": "ephemeral" | |
} | |
# Get model response | |
response = self.anthropic.messages.create( | |
model=MODEL_NAME, | |
max_tokens=MAX_TOKENS, | |
system=self.SYSTEM_PROMPT, | |
messages=messages, | |
tools=self.AVAILABLE_TOOLS, | |
temperature=TEMPERATURE, | |
) | |
# Log token usage | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"Response usage: {response.usage}") | |
else: | |
logger.info(f"Response usage: {response.usage}") | |
# Extract tool calls | |
tool_calls = [ | |
block for block in response.content if block.type == "tool_use" | |
] | |
# Display the model's reasoning | |
for block in response.content: | |
if block.type == "text": | |
# Claude's thoughts should always be visible, even in quiet mode | |
logger.info(f"[Claude] {block.text}") | |
elif block.type == "tool_use": | |
# Tool calls should be visible at info level by default | |
if self.display_config["quiet_mode"]: | |
logger.info( | |
f"[Claude Action] Using tool: {block.name} with input: {block.input}" | |
) | |
else: | |
logger.info( | |
f"[Tool Use] {block.name} with input: {block.input}" | |
) | |
# Process tool calls | |
if tool_calls: | |
# Add assistant message to history | |
assistant_content = [] | |
for block in response.content: | |
if block.type == "text": | |
assistant_content.append( | |
{"type": "text", "text": block.text} | |
) | |
elif block.type == "tool_use": | |
assistant_content.append( | |
{"type": "tool_use", **dict(block)} | |
) | |
self.message_history.append( | |
{"role": "assistant", "content": assistant_content} | |
) | |
# Process tool calls and create tool results | |
tool_results = [] | |
for tool_call in tool_calls: | |
tool_result = self.process_tool_call(tool_call) | |
tool_results.append(tool_result) | |
# Add tool results to message history | |
self.message_history.append( | |
{"role": "user", "content": tool_results} | |
) | |
# Check if we need to summarize the history | |
if len(self.message_history) >= self.max_history: | |
self.summarize_history() | |
def run(self, num_steps=1000, instance_id=None, snapshot_name_prefix=None): | |
"""Main agent loop. | |
Args: | |
num_steps: Number of steps to run for | |
instance_id: ID of the current instance for snapshot creation | |
snapshot_name_prefix: Prefix for naming snapshots | |
""" | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"Starting agent loop for {num_steps} steps") | |
else: | |
logger.info(f"Starting agent loop for {num_steps} steps") | |
steps_completed = 0 | |
snapshots = [] | |
while self.running and steps_completed < num_steps: | |
try: | |
messages = copy.deepcopy(self.message_history) | |
if len(messages) >= 3: | |
if ( | |
messages[-1]["role"] == "user" | |
and isinstance(messages[-1]["content"], list) | |
and messages[-1]["content"] | |
): | |
messages[-1]["content"][-1]["cache_control"] = { | |
"type": "ephemeral" | |
} | |
if ( | |
len(messages) >= 5 | |
and messages[-3]["role"] == "user" | |
and isinstance(messages[-3]["content"], list) | |
and messages[-3]["content"] | |
): | |
messages[-3]["content"][-1]["cache_control"] = { | |
"type": "ephemeral" | |
} | |
# Get model response | |
response = self.anthropic.messages.create( | |
model=MODEL_NAME, | |
max_tokens=MAX_TOKENS, | |
system=self.SYSTEM_PROMPT, | |
messages=messages, | |
tools=self.AVAILABLE_TOOLS, | |
temperature=TEMPERATURE, | |
) | |
# Log token usage | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"Response usage: {response.usage}") | |
else: | |
logger.info(f"Response usage: {response.usage}") | |
# Extract tool calls | |
tool_calls = [ | |
block for block in response.content if block.type == "tool_use" | |
] | |
# Display the model's reasoning | |
for block in response.content: | |
if block.type == "text": | |
# Claude's thoughts should always be visible, even in quiet mode | |
logger.info(f"[Claude] {block.text}") | |
elif block.type == "tool_use": | |
# Tool calls should be visible at info level by default | |
if self.display_config["quiet_mode"]: | |
logger.info( | |
f"[Claude Action] Using tool: {block.name} with input: {block.input}" | |
) | |
else: | |
logger.info( | |
f"[Tool Use] {block.name} with input: {block.input}" | |
) | |
# Process tool calls | |
if tool_calls: | |
# Add assistant message to history | |
assistant_content = [] | |
for block in response.content: | |
if block.type == "text": | |
assistant_content.append( | |
{"type": "text", "text": block.text} | |
) | |
elif block.type == "tool_use": | |
assistant_content.append( | |
{"type": "tool_use", **dict(block)} | |
) | |
self.message_history.append( | |
{"role": "assistant", "content": assistant_content} | |
) | |
# Process tool calls and create tool results | |
tool_results = [] | |
for tool_call in tool_calls: | |
tool_result = self.process_tool_call(tool_call) | |
tool_results.append(tool_result) | |
# Add tool results to message history | |
self.message_history.append( | |
{"role": "user", "content": tool_results} | |
) | |
# Check if we need to summarize the history | |
if len(self.message_history) >= self.max_history: | |
self.summarize_history() | |
steps_completed += 1 | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"Completed step {steps_completed}/{num_steps}") | |
else: | |
logger.info(f"Completed step {steps_completed}/{num_steps}") | |
# Create a snapshot after each step if morph_client and instance_id are provided | |
if self.morph_client and instance_id: | |
step_num = steps_completed | |
snapshot_name = f"{snapshot_name_prefix}_step_{step_num}" if snapshot_name_prefix else f"pokemon_step_{step_num}" | |
logger.info(f"Creating snapshot after step {step_num}...") | |
try: | |
# Create metadata dictionary to track lineage | |
metadata = { | |
"step_number": str(step_num), | |
"timestamp": str(int(time.time())), | |
} | |
# Add parent_snapshot if we have one | |
if self.parent_snapshot_id: | |
metadata["parent_snapshot"] = self.parent_snapshot_id | |
# Add dashboard_run_id for filtering in dashboard | |
if self.dashboard_run_id: | |
metadata["dashboard_run_id"] = self.dashboard_run_id | |
# Add previous snapshot if we have one | |
if self.last_snapshot_id: | |
metadata["prev_snapshot"] = self.last_snapshot_id | |
# Create the snapshot with metadata | |
instance = self.morph_client.instances.get(instance_id) | |
snapshot = instance.snapshot() | |
snapshot.set_metadata(metadata) | |
# Update our last snapshot ID | |
self.last_snapshot_id = snapshot.id | |
logger.info(f"✅ Snapshot created with ID: {snapshot.id}") | |
logger.info(f" Metadata: parent={metadata.get('parent_snapshot', 'None')}, prev={metadata.get('prev_snapshot', 'None')}, step={step_num}, dashboard_run_id={metadata.get('dashboard_run_id', 'None')}") | |
# Keep track of all snapshots | |
snapshots.append({ | |
'step': step_num, | |
'snapshot_id': snapshot.id, | |
'name': snapshot_name, | |
'metadata': metadata | |
}) | |
except Exception as e: | |
logger.error(f"Failed to create snapshot: {e}") | |
except KeyboardInterrupt: | |
logger.info("Received keyboard interrupt, stopping") | |
self.running = False | |
except Exception as e: | |
logger.error(f"Error in agent loop: {e}") | |
logger.exception(e) | |
raise e | |
if not self.running: | |
self.client.stop() | |
return steps_completed, snapshots | |
def summarize_history(self): | |
"""Generate a summary of the conversation history and replace the history with just the summary.""" | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"[Agent] Generating conversation summary...") | |
else: | |
logger.info(f"[Agent] Generating conversation summary...") | |
# Get a new screenshot for the summary | |
screenshot = self.client.get_screenshot() | |
screenshot_b64 = get_screenshot_base64(screenshot, upscale=2) | |
# Create messages for the summarization request - pass the entire conversation history | |
messages = copy.deepcopy(self.message_history) | |
if len(messages) >= 3: | |
if ( | |
messages[-1]["role"] == "user" | |
and isinstance(messages[-1]["content"], list) | |
and messages[-1]["content"] | |
): | |
messages[-1]["content"][-1]["cache_control"] = {"type": "ephemeral"} | |
if ( | |
len(messages) >= 5 | |
and messages[-3]["role"] == "user" | |
and isinstance(messages[-3]["content"], list) | |
and messages[-3]["content"] | |
): | |
messages[-3]["content"][-1]["cache_control"] = {"type": "ephemeral"} | |
messages += [ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": self.SUMMARY_PROMPT, | |
} | |
], | |
} | |
] | |
# Get summary from Claude | |
response = self.anthropic.messages.create( | |
model=MODEL_NAME, | |
max_tokens=MAX_TOKENS, | |
system=self.SYSTEM_PROMPT, | |
messages=messages, | |
temperature=TEMPERATURE, | |
) | |
# Extract the summary text | |
summary_text = " ".join( | |
[block.text for block in response.content if block.type == "text"] | |
) | |
# Log the summary - use info level even in quiet mode as it's important | |
logger.info(f"[Claude Summary] Game Progress Summary:") | |
logger.info(f"{summary_text}") | |
# Replace message history with just the summary | |
self.message_history = [ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": f"CONVERSATION HISTORY SUMMARY (representing {self.max_history} previous messages): {summary_text}", | |
}, | |
{ | |
"type": "text", | |
"text": "\n\nCurrent game screenshot for reference:", | |
}, | |
{ | |
"type": "image", | |
"source": { | |
"type": "base64", | |
"media_type": "image/png", | |
"data": screenshot_b64, | |
}, | |
}, | |
{ | |
"type": "text", | |
"text": "You were just asked to summarize your playthrough so far, which is the summary you see above. You may now continue playing by selecting your next action.", | |
}, | |
], | |
} | |
] | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"[Agent] Message history condensed into summary.") | |
else: | |
logger.info(f"[Agent] Message history condensed into summary.") | |
def stop(self): | |
"""Stop the agent.""" | |
self.running = False | |
self.client.stop() | |
def parse_arguments(): | |
"""Parse command line arguments""" | |
parser = argparse.ArgumentParser(description="Run a Pokemon Game Server Agent") | |
parser.add_argument( | |
"--snapshot-id", type=str, required=True, help="Morph snapshot ID to run" | |
) | |
parser.add_argument( | |
"--api-key", type=str, help="Morph API key (defaults to MORPH_API_KEY env var)" | |
) | |
parser.add_argument( | |
"--steps", type=int, default=10, help="Number of steps to run (default: 10)" | |
) | |
parser.add_argument( | |
"--max-history", | |
type=int, | |
default=30, | |
help="Maximum history size before summarizing (default: 30)", | |
) | |
# Add parent snapshot tracking option | |
parser.add_argument( | |
"--parent-snapshot-id", | |
type=str, | |
help="Parent snapshot ID for lineage tracking (defaults to the starting snapshot-id)" | |
) | |
parser.add_argument( | |
"--dashboard-run-id", | |
type=str, | |
help="Dashboard run ID for grouping snapshots (defaults to parent-snapshot-id)" | |
) | |
parser.add_argument( | |
"--snapshot-prefix", | |
type=str, | |
default="pokemon", | |
help="Prefix for snapshot names (default: 'pokemon')" | |
) | |
# Add verbosity and display options | |
parser.add_argument( | |
"--verbose", | |
"-v", | |
action="count", | |
default=0, | |
help="Increase output verbosity (can be used multiple times, e.g. -vv)", | |
) | |
parser.add_argument( | |
"--show-game-state", | |
action="store_true", | |
help="Show full game state information in the logs", | |
) | |
parser.add_argument( | |
"--show-collision-map", | |
action="store_true", | |
help="Show collision map in the logs", | |
) | |
parser.add_argument( | |
"--log-file", | |
type=str, | |
help="Path to log file. If not provided, logs will only go to stderr", | |
) | |
parser.add_argument( | |
"--quiet", | |
"-q", | |
action="store_true", | |
help="Only show Claude's thoughts and actions, minimal logging", | |
) | |
parser.add_argument( | |
"--no-browser", | |
action="store_true", | |
help="Suppress auto-opening the browser to display the game", | |
) | |
return parser.parse_args() | |
def main(): | |
args = parse_arguments() | |
# Configure logging based on command line arguments | |
log_handlers = [] | |
# Set up console handler with formatting | |
console_handler = logging.StreamHandler() | |
if args.quiet: | |
console_format = "%(message)s" # Minimal format for quiet mode | |
else: | |
console_format = "%(asctime)s - %(levelname)s - %(message)s" | |
console_handler.setFormatter(logging.Formatter(console_format)) | |
log_handlers.append(console_handler) | |
# Add file handler if log file specified | |
if args.log_file: | |
file_handler = logging.FileHandler(args.log_file) | |
# Full detailed format for log files | |
file_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
file_handler.setFormatter(logging.Formatter(file_format)) | |
log_handlers.append(file_handler) | |
# Set log level based on verbosity | |
if args.quiet: | |
log_level = logging.WARNING | |
elif args.verbose == 0: | |
log_level = logging.INFO | |
elif args.verbose == 1: | |
log_level = logging.DEBUG | |
else: # args.verbose >= 2 | |
log_level = logging.DEBUG # Maximum verbosity | |
# Configure the root logger | |
logging.basicConfig(level=log_level, handlers=log_handlers, force=True) | |
# Create a rich console for nice output | |
console = Console() | |
console.print( | |
f"Starting Pokemon Game Server Agent from snapshot {args.snapshot_id}" | |
) | |
console.print( | |
f"Will run for {args.steps} steps with max history of {args.max_history}" | |
) | |
# Set parent snapshot ID (if not provided, use the starting snapshot as parent) | |
parent_snapshot_id = args.parent_snapshot_id or args.snapshot_id | |
console.print(f"Parent snapshot ID for lineage tracking: {parent_snapshot_id}") | |
if not args.quiet: | |
console.print( | |
f"Log level: {'QUIET' if args.quiet else logging.getLevelName(log_level)}" | |
) | |
if args.show_game_state: | |
console.print("Game state display: Enabled") | |
if args.show_collision_map: | |
console.print("Collision map display: Enabled") | |
if args.log_file: | |
console.print(f"Logging to file: {args.log_file}") | |
console.print("=" * 50) | |
# Create the MorphCloud client | |
morph_client = MorphCloudClient(api_key=args.api_key) | |
# Start instance from snapshot | |
console.print("Starting instance from snapshot...") | |
instance = morph_client.instances.start( | |
snapshot_id=args.snapshot_id, ttl_seconds=60 * 60 * 24 # 24 hours | |
) | |
# Wait for instance to be ready | |
console.print("Waiting for instance to be ready...") | |
instance.wait_until_ready() | |
# Get the instance URL | |
instance_url = next( | |
service.url | |
for service in instance.networking.http_services | |
if service.name == "web" | |
) | |
remote_desktop_url = next( | |
service.url | |
for service in instance.networking.http_services | |
if service.name == "novnc" | |
) | |
novnc_url = f"{remote_desktop_url}/vnc_lite.html" | |
console.print(f"Pokemon remote desktop available at: {novnc_url}") | |
# Open the NoVNC URL automatically in the default browser if not suppressed | |
if not args.no_browser: | |
webbrowser.open(novnc_url) | |
else: | |
console.print("Browser auto-open suppressed. Use the URL above to view the game.") | |
# Create a "game display" configuration object to pass to the agent | |
display_config = { | |
"show_game_state": args.show_game_state or args.verbose > 0, | |
"show_collision_map": args.show_collision_map or args.verbose > 1, | |
"quiet_mode": args.quiet, | |
} | |
# Run agent with the instance URL | |
console.print("Initializing agent...") | |
try: | |
agent = PokemonAgent( | |
server_host=instance_url, | |
server_port=None, # Not needed since URL already includes the port | |
max_history=args.max_history, | |
display_config=display_config, | |
morph_client=morph_client, # Pass the client for snapshot creation | |
parent_snapshot_id=parent_snapshot_id, # Pass the parent snapshot ID | |
dashboard_run_id=args.dashboard_run_id, # Pass the dashboard run ID | |
) | |
console.print("✅ Agent initialized successfully!") | |
console.print("=" * 50) | |
# Run the agent | |
console.print(f"Starting agent loop for {args.steps} steps...") | |
steps_completed, snapshots = agent.run( | |
num_steps=args.steps, | |
instance_id=instance.id, | |
snapshot_name_prefix=args.snapshot_prefix | |
) | |
console.print("=" * 50) | |
console.print(f"✅ Agent completed {steps_completed} steps") | |
# Display a summary of created snapshots | |
if snapshots: | |
console.print(f"\nCreated {len(snapshots)} snapshots:") | |
for snapshot in snapshots: | |
console.print(f" - Step {snapshot['step']}: {snapshot['snapshot_id']} ({snapshot['name']})") | |
except ConnectionError as e: | |
console.print(f"❌ Connection error: {e}") | |
console.print(f"Make sure the server is running on the instance") | |
sys.exit(1) | |
except KeyboardInterrupt: | |
console.print("Received keyboard interrupt, stopping agent") | |
except Exception as e: | |
console.print(f"❌ Error: {e}") | |
sys.exit(1) | |
finally: | |
if "agent" in locals(): | |
agent.stop() | |
# Stop the Morph instance | |
console.print("Stopping Morph instance...") | |
instance.stop() | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# /// script | |
# dependencies = [ | |
# "morphcloud", | |
# "requests", | |
# "pillow", | |
# "rich", | |
# "anthropic", | |
# ] | |
# /// | |
"""Run a non-interactive server agent that plays Pokemon automatically. | |
This script combines the EmulatorClient and PokemonAgent to set up a basic agent. | |
""" | |
import io | |
import sys | |
import json | |
import copy | |
import typing | |
import base64 | |
import logging | |
import argparse | |
import time | |
import requests | |
import webbrowser | |
from PIL import Image | |
from anthropic import Anthropic | |
from rich.console import Console | |
from morphcloud.api import MorphCloudClient | |
# Set up logging - this will be configured properly in main() based on command line args | |
logger = logging.getLogger(__name__) | |
# Configuration | |
MAX_TOKENS = 4096 | |
MODEL_NAME = "claude-3-7-sonnet-20250219" | |
TEMPERATURE = 0.7 | |
USE_NAVIGATOR = True | |
class EmulatorClient: | |
def __init__(self, host: str = "127.0.0.1", port: int = 9876): | |
# Check if host already includes the protocol, if not add http:// | |
if host.startswith("http://") or host.startswith("https://"): | |
# For MorphVM URLs, don't append port as it's handled by the URL routing | |
if "cloud.morph.so" in host or port is None: | |
self.base_url = host | |
# For other URLs, handle port as before | |
elif ":" not in host.split("/")[-1]: | |
self.base_url = f"{host}:{port}" | |
else: | |
# Host already has port, use it as is | |
self.base_url = host | |
else: | |
# For MorphVM URLs, don't append port | |
if "cloud.morph.so" in host: | |
self.base_url = f"https://{host}" | |
else: | |
self.base_url = f"http://{host}:{port}" | |
logger.info(f"Initialized client connecting to {self.base_url}") | |
def get_screenshot(self): | |
"""Get current screenshot as PIL Image""" | |
response = requests.get(f"{self.base_url}/api/screenshot") | |
if response.status_code != 200: | |
logger.error(f"Error getting screenshot: {response.status_code}") | |
return None | |
return Image.open(io.BytesIO(response.content)) | |
def get_screenshot_base64(self): | |
"""Get current screenshot as base64 string""" | |
response = requests.get(f"{self.base_url}/api/screenshot") | |
if response.status_code != 200: | |
logger.error(f"Error getting screenshot: {response.status_code}") | |
return "" | |
return base64.b64encode(response.content).decode("utf-8") | |
def get_game_state(self): | |
"""Get complete game state from server""" | |
response = requests.get(f"{self.base_url}/api/game_state") | |
if response.status_code != 200: | |
logger.error( | |
f"Error response from server: {response.status_code} - {response.text}" | |
) | |
return {} | |
try: | |
return response.json() | |
except json.JSONDecodeError as e: | |
logger.error(f"JSON decode error: {e}") | |
logger.error(f"Response content: {response.text[:100]}...") | |
return {} | |
# Compatibility methods to match Emulator interface | |
def get_state_from_memory(self): | |
"""Get game state string - mimics Emulator.get_state_from_memory()""" | |
state_data = self.get_game_state() | |
return state_data.get("game_state", "") | |
def get_collision_map(self): | |
"""Get collision map - mimics Emulator.get_collision_map()""" | |
state_data = self.get_game_state() | |
return state_data.get("collision_map", "") | |
def get_valid_moves(self): | |
"""Get valid moves - mimics Emulator.get_valid_moves()""" | |
state_data = self.get_game_state() | |
return state_data.get("valid_moves", []) | |
def find_path(self, row, col): | |
"""Find path to position - mimics Emulator.find_path()""" | |
result = self.navigate(row, col) | |
if not isinstance(result, dict): | |
return "Failed to navigate", [] | |
return result.get("status", "Navigation failed"), result.get("path", []) | |
def press_buttons( | |
self, buttons, wait=True, include_state=False, include_screenshot=False | |
): | |
"""Press a sequence of buttons on the Game Boy | |
Args: | |
buttons: List of buttons to press | |
wait: Whether to pause briefly after each button press | |
include_state: Whether to include game state in response | |
include_screenshot: Whether to include screenshot in response | |
Returns: | |
dict: Response data which may include button press result, game state, and screenshot | |
""" | |
data = { | |
"buttons": buttons, | |
"wait": wait, | |
"include_state": include_state, | |
"include_screenshot": include_screenshot, | |
} | |
response = requests.post(f"{self.base_url}/api/press_buttons", json=data) | |
if response.status_code != 200: | |
logger.error( | |
f"Error pressing buttons: {response.status_code} - {response.text}" | |
) | |
return {"error": f"Error: {response.status_code}"} | |
return response.json() | |
def navigate(self, row, col, include_state=False, include_screenshot=False): | |
"""Navigate to a specific position on the grid | |
Args: | |
row: Target row coordinate | |
col: Target column coordinate | |
include_state: Whether to include game state in response | |
include_screenshot: Whether to include screenshot in response | |
Returns: | |
dict: Response data which may include navigation result, game state, and screenshot | |
""" | |
data = { | |
"row": row, | |
"col": col, | |
"include_state": include_state, | |
"include_screenshot": include_screenshot, | |
} | |
response = requests.post(f"{self.base_url}/api/navigate", json=data) | |
if response.status_code != 200: | |
logger.error(f"Error navigating: {response.status_code} - {response.text}") | |
return {"status": f"Error: {response.status_code}", "path": []} | |
return response.json() | |
def read_memory(self, address): | |
"""Read a specific memory address""" | |
response = requests.get(f"{self.base_url}/api/memory/{address}") | |
if response.status_code != 200: | |
logger.error( | |
f"Error reading memory: {response.status_code} - {response.text}" | |
) | |
return {"error": f"Error: {response.status_code}"} | |
return response.json() | |
def load_state(self, state_path): | |
"""Load a saved state""" | |
data = {"state_path": state_path} | |
response = requests.post(f"{self.base_url}/api/load_state", json=data) | |
if response.status_code != 200: | |
logger.error( | |
f"Error loading state: {response.status_code} - {response.text}" | |
) | |
return {"error": f"Error: {response.status_code}"} | |
return response.json() | |
def save_screenshot(self, filename="screenshot.png"): | |
"""Save current screenshot to a file""" | |
screenshot = self.get_screenshot() | |
if screenshot: | |
screenshot.save(filename) | |
logger.info(f"Screenshot saved as {filename}") | |
return True | |
return False | |
def initialize(self, max_retries=5, retry_delay=3): | |
""" | |
Initialize method with retry capability for compatibility with Emulator | |
Args: | |
max_retries (int): Maximum number of retry attempts | |
retry_delay (int): Delay between retries in seconds | |
Returns: | |
bool: True if server is ready, False otherwise | |
""" | |
logger.info(f"Client initialization requested (compatibility method) with {max_retries} retries") | |
# Implement retry logic | |
for attempt in range(1, max_retries + 1): | |
try: | |
logger.info(f"Checking server status (attempt {attempt}/{max_retries})") | |
response = requests.get(f"{self.base_url}/api/status", timeout=10) | |
status = response.json() | |
ready = status.get("ready", False) | |
if ready: | |
logger.info("Server reports ready status") | |
return True | |
else: | |
logger.warning(f"Server reports not ready (attempt {attempt}/{max_retries})") | |
# If not ready and we have more attempts, wait before trying again | |
if attempt < max_retries: | |
logger.info(f"Waiting {retry_delay} seconds before retry...") | |
time.sleep(retry_delay) | |
except requests.exceptions.Timeout: | |
logger.warning(f"Connection timeout (attempt {attempt}/{max_retries})") | |
if attempt < max_retries: | |
time.sleep(retry_delay) | |
except requests.exceptions.ConnectionError as e: | |
logger.warning(f"Connection error: {e} (attempt {attempt}/{max_retries})") | |
if attempt < max_retries: | |
time.sleep(retry_delay) | |
except Exception as e: | |
logger.error(f"Error checking server status: {e} (attempt {attempt}/{max_retries})") | |
if attempt < max_retries: | |
time.sleep(retry_delay) | |
logger.error(f"Server not ready after {max_retries} attempts") | |
return False | |
def stop(self): | |
"""Empty stop method for compatibility with Emulator""" | |
logger.info("Client stop requested (compatibility method)") | |
# Nothing to do for client | |
pass | |
def get_screenshot_base64(screenshot, upscale=1): | |
"""Convert PIL image to base64 string.""" | |
# Resize if needed | |
if upscale > 1: | |
new_size = (screenshot.width * upscale, screenshot.height * upscale) | |
screenshot = screenshot.resize(new_size) | |
# Convert to base64 | |
buffered = io.BytesIO() | |
screenshot.save(buffered, format="PNG") | |
return base64.standard_b64encode(buffered.getvalue()).decode() | |
class PokemonAgent: | |
def __init__( | |
self, | |
server_host="127.0.0.1", | |
server_port: typing.Optional[int] = 9876, | |
max_history=60, | |
display_config=None, | |
morph_client=None, # Add MorphCloudClient as a parameter | |
parent_snapshot_id=None, # Add parent snapshot ID parameter | |
dashboard_run_id=None, # Add dashboard run ID parameter | |
): | |
"""Initialize the server agent. | |
Args: | |
server_host: Host where the game server is running | |
server_port: Port number of the game server | |
max_history: Maximum number of messages in history before summarization | |
display_config: Dictionary with display configuration options | |
morph_client: Optional MorphCloudClient instance for snapshot creation | |
parent_snapshot_id: Optional ID of the parent snapshot for lineage tracking | |
dashboard_run_id: Optional ID for grouping snapshots by dashboard run | |
""" | |
self.client = EmulatorClient(host=server_host, port=server_port or 9876) | |
self.anthropic = Anthropic() | |
self.running = True | |
self.message_history = [ | |
{"role": "user", "content": "You may now begin playing."} | |
] | |
self.max_history = max_history | |
# Store the MorphCloud client and snapshot tracking IDs | |
self.morph_client = morph_client | |
self.parent_snapshot_id = parent_snapshot_id | |
self.dashboard_run_id = dashboard_run_id or parent_snapshot_id # Use parent as fallback | |
self.last_snapshot_id = parent_snapshot_id # Track the last created snapshot ID | |
# Set display configuration with defaults | |
self.display_config = display_config or { | |
"show_game_state": False, | |
"show_collision_map": False, | |
"quiet_mode": False, | |
} | |
# Log initialization with chosen configuration | |
logger.debug(f"Agent initialized with display config: {self.display_config}") | |
if self.morph_client and self.parent_snapshot_id: | |
logger.info(f"Snapshot tracking enabled. Parent snapshot: {self.parent_snapshot_id}") | |
if self.dashboard_run_id: | |
logger.info(f"Dashboard run ID for grouping snapshots: {self.dashboard_run_id}") | |
# Check if the server is ready | |
if not self.client.initialize(): | |
logger.error( | |
"Server not ready - please start the server before running the agent" | |
) | |
raise RuntimeError("Server not ready") | |
SYSTEM_PROMPT = """You are playing Pokemon Red. You can see the game screen and control the game by executing emulator commands. | |
Your goal is to play through Pokemon Red and eventually defeat the Elite Four. Make decisions based on what you see on the screen. | |
check your tools! for example, try to use 'navigate_to' to help you move faster and better when looking at the map, with positions from game state. | |
Before each action, explain your reasoning briefly, plan your immediate next few steps needed (low level tool calls and actions, e.g. 'to reach the Cave from here, I need to go 1. right, 2. right, 3. right, 4. up', not high level goals like '1. explore the Cave 2. ??? 3. win!') to get there, then use the available actions to execute the next step in the game. | |
The game commands always register perfectly, so if you see no reaction to them, you have made an invalid command and misunderstood the game state. In battles, when you see an attack that isn't effective, you should examine your assumptions and update your beliefs. In general, search the solution space (try different things) before getting stuck in ruts. | |
Mistakes you have made before: | |
- do not talk to NPCs | |
- do not plan with high level goals | |
- DON'T FIGHT ANY BATTLES IF YOU CAN HELP IT. IF YOU ENCOUNTER A WILD (non trainer) BATTLE - JUST RUN | |
- do not insist on your prior knowledge about what attacks are strong against what types of Pokemon works when the evidence is the opposite | |
- you often miss the cave, which is a black hole to the side of the pokemon center. | |
The conversation history may occasionally be summarized to save context space. If you see a message labeled "CONVERSATION HISTORY SUMMARY", this contains the key information about your progress so far. Use this information to maintain continuity in your gameplay.""" | |
SUMMARY_PROMPT = """I need you to create a detailed summary of our conversation history up to this point. This summary will replace the full conversation history to manage the context window. | |
Please include: | |
1. Key game events and milestones you've reached | |
2. Important decisions you've made | |
3. Current objectives or goals you're working toward | |
4. Your current location and Pokémon team status | |
5. Any strategies or plans you've mentioned | |
The summary should be comprehensive enough that you can continue gameplay without losing important context about what has happened so far.""" | |
AVAILABLE_TOOLS = [ | |
{ | |
"name": "press_buttons", | |
"description": "Press a sequence of buttons on the Game Boy.", | |
"input_schema": { | |
"type": "object", | |
"properties": { | |
"buttons": { | |
"type": "array", | |
"items": { | |
"type": "string", | |
"enum": [ | |
"a", | |
"b", | |
"start", | |
"select", | |
"up", | |
"down", | |
"left", | |
"right", | |
], | |
}, | |
"description": "List of buttons to press in sequence. Valid buttons: 'a', 'b', 'start', 'select', 'up', 'down', 'left', 'right'", | |
}, | |
"wait": { | |
"type": "boolean", | |
"description": "Whether to wait for a brief period after pressing each button. Defaults to true.", | |
}, | |
}, | |
"required": ["buttons"], | |
}, | |
} | |
] | |
# Add navigation tool if enabled | |
if USE_NAVIGATOR: | |
AVAILABLE_TOOLS.append( | |
{ | |
"name": "navigate_to", | |
"description": "Automatically navigate to a position on the map grid. The screen is divided into a 9x10 grid, with the top-left corner as (0, 0). This tool is only available in the overworld.", | |
"input_schema": { | |
"type": "object", | |
"properties": { | |
"row": { | |
"type": "integer", | |
"description": "The row coordinate to navigate to (0-8).", | |
}, | |
"col": { | |
"type": "integer", | |
"description": "The column coordinate to navigate to (0-9).", | |
}, | |
}, | |
"required": ["row", "col"], | |
}, | |
} | |
) | |
def process_tool_call(self, tool_call): | |
"""Process a single tool call.""" | |
tool_name = tool_call.name | |
tool_input = tool_call.input | |
# In quiet mode, only log at debug level | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"Processing tool call: {tool_name}") | |
else: | |
logger.info(f"Processing tool call: {tool_name}") | |
if tool_name == "press_buttons": | |
buttons = tool_input["buttons"] | |
wait = tool_input.get("wait", True) | |
# Log the button press action | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"[Buttons] Pressing: {buttons} (wait={wait})") | |
else: | |
logger.info(f"[Buttons] Pressing: {buttons} (wait={wait})") | |
# Use enhanced client method to get result, state, and screenshot in one call | |
response = self.client.press_buttons( | |
buttons, wait=wait, include_state=True, include_screenshot=True | |
) | |
# Extract results from response | |
result = response.get("result", f"Pressed buttons: {', '.join(buttons)}") | |
# Get game state from response or fetch it if not included | |
if "game_state" in response: | |
memory_info = response["game_state"].get("game_state", "") | |
if self.display_config["show_game_state"]: | |
logger.info(f"[Memory State from response]") | |
logger.info(memory_info) | |
else: | |
logger.debug(f"[Memory State from response]") | |
logger.debug(memory_info) | |
collision_map = response["game_state"].get("collision_map", "") | |
if collision_map and self.display_config["show_collision_map"]: | |
logger.info(f"[Collision Map from response]\n{collision_map}") | |
elif collision_map: | |
logger.debug(f"[Collision Map from response]\n{collision_map}") | |
else: | |
# Fallback to separate calls if state not included | |
memory_info = self.client.get_state_from_memory() | |
if self.display_config["show_game_state"]: | |
logger.info(f"[Memory State after action]") | |
logger.info(memory_info) | |
else: | |
logger.debug(f"[Memory State after action]") | |
logger.debug(memory_info) | |
collision_map = self.client.get_collision_map() | |
if collision_map and self.display_config["show_collision_map"]: | |
logger.info(f"[Collision Map after action]\n{collision_map}") | |
elif collision_map: | |
logger.debug(f"[Collision Map after action]\n{collision_map}") | |
# Get screenshot from response or fetch it if not included | |
if "screenshot" in response: | |
screenshot_b64 = response["screenshot"] | |
else: | |
screenshot = self.client.get_screenshot() | |
screenshot_b64 = get_screenshot_base64(screenshot, upscale=2) | |
# Build response content based on display configuration | |
content = [ | |
{"type": "text", "text": f"Pressed buttons: {', '.join(buttons)}"}, | |
{ | |
"type": "text", | |
"text": "\nHere is a screenshot of the screen after your button presses:", | |
}, | |
{ | |
"type": "image", | |
"source": { | |
"type": "base64", | |
"media_type": "image/png", | |
"data": screenshot_b64, | |
}, | |
}, | |
] | |
# Add game state to Claude's view if enabled | |
content.append( | |
{ | |
"type": "text", | |
"text": f"\nGame state information from memory after your action:\n{memory_info}", | |
} | |
) | |
# Return tool result as a dictionary | |
return { | |
"type": "tool_result", | |
"tool_use_id": tool_call.id, | |
"content": content, | |
} | |
elif tool_name == "navigate_to": | |
row = tool_input["row"] | |
col = tool_input["col"] | |
# Log the navigation action | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"[Navigation] Navigating to: ({row}, {col})") | |
else: | |
logger.info(f"[Navigation] Navigating to: ({row}, {col})") | |
# Use enhanced client method to get result, state, and screenshot in one call | |
response = self.client.navigate( | |
row, col, include_state=True, include_screenshot=True | |
) | |
# Extract navigation result | |
status = response.get("status", "Unknown status") | |
path = response.get("path", []) | |
if path: | |
result = f"Navigation successful: followed path with {len(path)} steps" | |
else: | |
result = f"Navigation failed: {status}" | |
# Get game state from response or fetch it if not included | |
if "game_state" in response: | |
memory_info = response["game_state"].get("game_state", "") | |
if self.display_config["show_game_state"]: | |
logger.info(f"[Memory State from response]") | |
logger.info(memory_info) | |
else: | |
logger.debug(f"[Memory State from response]") | |
logger.debug(memory_info) | |
collision_map = response["game_state"].get("collision_map", "") | |
if collision_map and self.display_config["show_collision_map"]: | |
logger.info(f"[Collision Map from response]\n{collision_map}") | |
elif collision_map: | |
logger.debug(f"[Collision Map from response]\n{collision_map}") | |
else: | |
# Fallback to separate calls if state not included | |
memory_info = self.client.get_state_from_memory() | |
if self.display_config["show_game_state"]: | |
logger.info(f"[Memory State after action]") | |
logger.info(memory_info) | |
else: | |
logger.debug(f"[Memory State after action]") | |
logger.debug(memory_info) | |
collision_map = self.client.get_collision_map() | |
if collision_map and self.display_config["show_collision_map"]: | |
logger.info(f"[Collision Map after action]\n{collision_map}") | |
elif collision_map: | |
logger.debug(f"[Collision Map after action]\n{collision_map}") | |
# Get screenshot from response or fetch it if not included | |
if "screenshot" in response: | |
screenshot_b64 = response["screenshot"] | |
else: | |
screenshot = self.client.get_screenshot() | |
screenshot_b64 = get_screenshot_base64(screenshot, upscale=2) | |
# Build response content based on display configuration | |
content = [ | |
{"type": "text", "text": f"Navigation result: {result}"}, | |
{ | |
"type": "text", | |
"text": "\nHere is a screenshot of the screen after navigation:", | |
}, | |
{ | |
"type": "image", | |
"source": { | |
"type": "base64", | |
"media_type": "image/png", | |
"data": screenshot_b64, | |
}, | |
}, | |
] | |
# Add game state to Claude's view if enabled | |
content.append( | |
{ | |
"type": "text", | |
"text": f"\nGame state information from memory after your action:\n{memory_info}", | |
} | |
) | |
# Return tool result as a dictionary | |
return { | |
"type": "tool_result", | |
"tool_use_id": tool_call.id, | |
"content": content, | |
} | |
else: | |
logger.error(f"Unknown tool called: {tool_name}") | |
return { | |
"type": "tool_result", | |
"tool_use_id": tool_call.id, | |
"content": [ | |
{"type": "text", "text": f"Error: Unknown tool '{tool_name}'"} | |
], | |
} | |
def run(self, num_steps=1, instance_id=None, snapshot_name_prefix=None): | |
"""Main agent loop. | |
Args: | |
num_steps: Number of steps to run for | |
instance_id: ID of the current instance for snapshot creation | |
snapshot_name_prefix: Prefix for naming snapshots | |
""" | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"Starting agent loop for {num_steps} steps") | |
else: | |
logger.info(f"Starting agent loop for {num_steps} steps") | |
steps_completed = 0 | |
snapshots = [] | |
while self.running and steps_completed < num_steps: | |
try: | |
messages = copy.deepcopy(self.message_history) | |
if len(messages) >= 3: | |
if ( | |
messages[-1]["role"] == "user" | |
and isinstance(messages[-1]["content"], list) | |
and messages[-1]["content"] | |
): | |
messages[-1]["content"][-1]["cache_control"] = { | |
"type": "ephemeral" | |
} | |
if ( | |
len(messages) >= 5 | |
and messages[-3]["role"] == "user" | |
and isinstance(messages[-3]["content"], list) | |
and messages[-3]["content"] | |
): | |
messages[-3]["content"][-1]["cache_control"] = { | |
"type": "ephemeral" | |
} | |
# Get model response | |
response = self.anthropic.messages.create( | |
model=MODEL_NAME, | |
max_tokens=MAX_TOKENS, | |
system=self.SYSTEM_PROMPT, | |
messages=messages, | |
tools=self.AVAILABLE_TOOLS, | |
temperature=TEMPERATURE, | |
) | |
# Log token usage | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"Response usage: {response.usage}") | |
else: | |
logger.info(f"Response usage: {response.usage}") | |
# Extract tool calls | |
tool_calls = [ | |
block for block in response.content if block.type == "tool_use" | |
] | |
# Display the model's reasoning | |
for block in response.content: | |
if block.type == "text": | |
# Claude's thoughts should always be visible, even in quiet mode | |
logger.info(f"[Claude] {block.text}") | |
elif block.type == "tool_use": | |
# Tool calls should be visible at info level by default | |
if self.display_config["quiet_mode"]: | |
logger.info( | |
f"[Claude Action] Using tool: {block.name} with input: {block.input}" | |
) | |
else: | |
logger.info( | |
f"[Tool Use] {block.name} with input: {block.input}" | |
) | |
# Process tool calls | |
if tool_calls: | |
# Add assistant message to history | |
assistant_content = [] | |
for block in response.content: | |
if block.type == "text": | |
assistant_content.append( | |
{"type": "text", "text": block.text} | |
) | |
elif block.type == "tool_use": | |
assistant_content.append( | |
{"type": "tool_use", **dict(block)} | |
) | |
self.message_history.append( | |
{"role": "assistant", "content": assistant_content} | |
) | |
# Process tool calls and create tool results | |
tool_results = [] | |
for tool_call in tool_calls: | |
tool_result = self.process_tool_call(tool_call) | |
tool_results.append(tool_result) | |
# Add tool results to message history | |
self.message_history.append( | |
{"role": "user", "content": tool_results} | |
) | |
# Check if we need to summarize the history | |
if len(self.message_history) >= self.max_history: | |
self.summarize_history() | |
steps_completed += 1 | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"Completed step {steps_completed}/{num_steps}") | |
else: | |
logger.info(f"Completed step {steps_completed}/{num_steps}") | |
# Create a snapshot after each step if morph_client and instance_id are provided | |
if self.morph_client and instance_id: | |
step_num = steps_completed | |
snapshot_name = f"{snapshot_name_prefix}_step_{step_num}" if snapshot_name_prefix else f"pokemon_step_{step_num}" | |
logger.info(f"Creating snapshot after step {step_num}...") | |
try: | |
# Create metadata dictionary to track lineage | |
metadata = { | |
"step_number": str(step_num), | |
"timestamp": str(int(time.time())), | |
} | |
# Add parent_snapshot if we have one | |
if self.parent_snapshot_id: | |
metadata["parent_snapshot"] = self.parent_snapshot_id | |
# Add dashboard_run_id for filtering in dashboard | |
if self.dashboard_run_id: | |
metadata["dashboard_run_id"] = self.dashboard_run_id | |
# Add previous snapshot if we have one | |
if self.last_snapshot_id: | |
metadata["prev_snapshot"] = self.last_snapshot_id | |
# Create the snapshot with metadata | |
instance = self.morph_client.instances.get(instance_id) | |
snapshot = instance.snapshot() | |
snapshot.set_metadata(metadata) | |
# Update our last snapshot ID | |
self.last_snapshot_id = snapshot.id | |
logger.info(f"✅ Snapshot created with ID: {snapshot.id}") | |
logger.info(f" Metadata: parent={metadata.get('parent_snapshot', 'None')}, prev={metadata.get('prev_snapshot', 'None')}, step={step_num}, dashboard_run_id={metadata.get('dashboard_run_id', 'None')}") | |
# Keep track of all snapshots | |
snapshots.append({ | |
'step': step_num, | |
'snapshot_id': snapshot.id, | |
'name': snapshot_name, | |
'metadata': metadata | |
}) | |
except Exception as e: | |
logger.error(f"Failed to create snapshot: {e}") | |
except KeyboardInterrupt: | |
logger.info("Received keyboard interrupt, stopping") | |
self.running = False | |
except Exception as e: | |
logger.error(f"Error in agent loop: {e}") | |
logger.exception(e) | |
raise e | |
if not self.running: | |
self.client.stop() | |
return steps_completed, snapshots | |
def summarize_history(self): | |
"""Generate a summary of the conversation history and replace the history with just the summary.""" | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"[Agent] Generating conversation summary...") | |
else: | |
logger.info(f"[Agent] Generating conversation summary...") | |
# Get a new screenshot for the summary | |
screenshot = self.client.get_screenshot() | |
screenshot_b64 = get_screenshot_base64(screenshot, upscale=2) | |
# Create messages for the summarization request - pass the entire conversation history | |
messages = copy.deepcopy(self.message_history) | |
if len(messages) >= 3: | |
if ( | |
messages[-1]["role"] == "user" | |
and isinstance(messages[-1]["content"], list) | |
and messages[-1]["content"] | |
): | |
messages[-1]["content"][-1]["cache_control"] = {"type": "ephemeral"} | |
if ( | |
len(messages) >= 5 | |
and messages[-3]["role"] == "user" | |
and isinstance(messages[-3]["content"], list) | |
and messages[-3]["content"] | |
): | |
messages[-3]["content"][-1]["cache_control"] = {"type": "ephemeral"} | |
messages += [ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": self.SUMMARY_PROMPT, | |
} | |
], | |
} | |
] | |
# Get summary from Claude | |
response = self.anthropic.messages.create( | |
model=MODEL_NAME, | |
max_tokens=MAX_TOKENS, | |
system=self.SYSTEM_PROMPT, | |
messages=messages, | |
temperature=TEMPERATURE, | |
) | |
# Extract the summary text | |
summary_text = " ".join( | |
[block.text for block in response.content if block.type == "text"] | |
) | |
# Log the summary - use info level even in quiet mode as it's important | |
logger.info(f"[Claude Summary] Game Progress Summary:") | |
logger.info(f"{summary_text}") | |
# Replace message history with just the summary | |
self.message_history = [ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": f"CONVERSATION HISTORY SUMMARY (representing {self.max_history} previous messages): {summary_text}", | |
}, | |
{ | |
"type": "text", | |
"text": "\n\nCurrent game screenshot for reference:", | |
}, | |
{ | |
"type": "image", | |
"source": { | |
"type": "base64", | |
"media_type": "image/png", | |
"data": screenshot_b64, | |
}, | |
}, | |
{ | |
"type": "text", | |
"text": "You were just asked to summarize your playthrough so far, which is the summary you see above. You may now continue playing by selecting your next action.", | |
}, | |
], | |
} | |
] | |
if self.display_config["quiet_mode"]: | |
logger.debug(f"[Agent] Message history condensed into summary.") | |
else: | |
logger.info(f"[Agent] Message history condensed into summary.") | |
def stop(self): | |
"""Stop the agent.""" | |
self.running = False | |
self.client.stop() | |
def parse_arguments(): | |
"""Parse command line arguments""" | |
parser = argparse.ArgumentParser(description="Run a Pokemon Game Server Agent") | |
parser.add_argument( | |
"--snapshot-id", type=str, required=True, help="Morph snapshot ID to run" | |
) | |
parser.add_argument( | |
"--api-key", type=str, help="Morph API key (defaults to MORPH_API_KEY env var)" | |
) | |
parser.add_argument( | |
"--steps", type=int, default=10, help="Number of steps to run (default: 10)" | |
) | |
parser.add_argument( | |
"--max-history", | |
type=int, | |
default=30, | |
help="Maximum history size before summarizing (default: 30)", | |
) | |
# Add parent snapshot tracking option | |
parser.add_argument( | |
"--parent-snapshot-id", | |
type=str, | |
help="Parent snapshot ID for lineage tracking (defaults to the starting snapshot-id)" | |
) | |
parser.add_argument( | |
"--dashboard-run-id", | |
type=str, | |
help="Dashboard run ID for grouping snapshots (defaults to parent-snapshot-id)" | |
) | |
parser.add_argument( | |
"--snapshot-prefix", | |
type=str, | |
default="pokemon", | |
help="Prefix for snapshot names (default: 'pokemon')" | |
) | |
# Add verbosity and display options | |
parser.add_argument( | |
"--verbose", | |
"-v", | |
action="count", | |
default=0, | |
help="Increase output verbosity (can be used multiple times, e.g. -vv)", | |
) | |
parser.add_argument( | |
"--show-game-state", | |
action="store_true", | |
help="Show full game state information in the logs", | |
) | |
parser.add_argument( | |
"--show-collision-map", | |
action="store_true", | |
help="Show collision map in the logs", | |
) | |
parser.add_argument( | |
"--log-file", | |
type=str, | |
help="Path to log file. If not provided, logs will only go to stderr", | |
) | |
parser.add_argument( | |
"--quiet", | |
"-q", | |
action="store_true", | |
help="Only show Claude's thoughts and actions, minimal logging", | |
) | |
parser.add_argument( | |
"--no-browser", | |
action="store_true", | |
help="Suppress auto-opening the browser to display the game", | |
) | |
return parser.parse_args() | |
def main(): | |
args = parse_arguments() | |
# Configure logging based on command line arguments | |
log_handlers = [] | |
# Set up console handler with formatting | |
console_handler = logging.StreamHandler() | |
if args.quiet: | |
console_format = "%(message)s" # Minimal format for quiet mode | |
else: | |
console_format = "%(asctime)s - %(levelname)s - %(message)s" | |
console_handler.setFormatter(logging.Formatter(console_format)) | |
log_handlers.append(console_handler) | |
# Add file handler if log file specified | |
if args.log_file: | |
file_handler = logging.FileHandler(args.log_file) | |
# Full detailed format for log files | |
file_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
file_handler.setFormatter(logging.Formatter(file_format)) | |
log_handlers.append(file_handler) | |
# Set log level based on verbosity | |
if args.quiet: | |
log_level = logging.WARNING | |
elif args.verbose == 0: | |
log_level = logging.INFO | |
elif args.verbose == 1: | |
log_level = logging.DEBUG | |
else: # args.verbose >= 2 | |
log_level = logging.DEBUG # Maximum verbosity | |
# Configure the root logger | |
logging.basicConfig(level=log_level, handlers=log_handlers, force=True) | |
# Create a rich console for nice output | |
console = Console() | |
console.print( | |
f"Starting Pokemon Game Server Agent from snapshot {args.snapshot_id}" | |
) | |
console.print( | |
f"Will run for {args.steps} steps with max history of {args.max_history}" | |
) | |
# Set parent snapshot ID (if not provided, use the starting snapshot as parent) | |
parent_snapshot_id = args.parent_snapshot_id or args.snapshot_id | |
console.print(f"Parent snapshot ID for lineage tracking: {parent_snapshot_id}") | |
if not args.quiet: | |
console.print( | |
f"Log level: {'QUIET' if args.quiet else logging.getLevelName(log_level)}" | |
) | |
if args.show_game_state: | |
console.print("Game state display: Enabled") | |
if args.show_collision_map: | |
console.print("Collision map display: Enabled") | |
if args.log_file: | |
console.print(f"Logging to file: {args.log_file}") | |
console.print("=" * 50) | |
# Create the MorphCloud client | |
morph_client = MorphCloudClient(api_key=args.api_key) | |
# Start instance from snapshot | |
console.print("Starting instance from snapshot...") | |
instance = morph_client.instances.start( | |
snapshot_id=args.snapshot_id, ttl_seconds=60 * 60 * 24 # 24 hours | |
) | |
# Wait for instance to be ready | |
console.print("Waiting for instance to be ready...") | |
instance.wait_until_ready() | |
# Get the instance URL | |
instance_url = next( | |
service.url | |
for service in instance.networking.http_services | |
if service.name == "web" | |
) | |
remote_desktop_url = next( | |
service.url | |
for service in instance.networking.http_services | |
if service.name == "novnc" | |
) | |
novnc_url = f"{remote_desktop_url}/vnc_lite.html" | |
console.print(f"Pokemon remote desktop available at: {novnc_url}") | |
# Open the NoVNC URL automatically in the default browser if not suppressed | |
if not args.no_browser: | |
webbrowser.open(novnc_url) | |
else: | |
console.print("Browser auto-open suppressed. Use the URL above to view the game.") | |
# Create a "game display" configuration object to pass to the agent | |
display_config = { | |
"show_game_state": args.show_game_state or args.verbose > 0, | |
"show_collision_map": args.show_collision_map or args.verbose > 1, | |
"quiet_mode": args.quiet, | |
} | |
# Run agent with the instance URL | |
console.print("Initializing agent...") | |
try: | |
agent = PokemonAgent( | |
server_host=instance_url, | |
server_port=None, # Not needed since URL already includes the port | |
max_history=args.max_history, | |
display_config=display_config, | |
morph_client=morph_client, # Pass the client for snapshot creation | |
parent_snapshot_id=parent_snapshot_id, # Pass the parent snapshot ID | |
dashboard_run_id=args.dashboard_run_id, # Pass the dashboard run ID | |
) | |
console.print("✅ Agent initialized successfully!") | |
console.print("=" * 50) | |
# Run the agent | |
console.print(f"Starting agent loop for {args.steps} steps...") | |
steps_completed, snapshots = agent.run( | |
num_steps=args.steps, | |
instance_id=instance.id, | |
snapshot_name_prefix=args.snapshot_prefix | |
) | |
console.print("=" * 50) | |
console.print(f"✅ Agent completed {steps_completed} steps") | |
# Display a summary of created snapshots | |
if snapshots: | |
console.print(f"\nCreated {len(snapshots)} snapshots:") | |
for snapshot in snapshots: | |
console.print(f" - Step {snapshot['step']}: {snapshot['snapshot_id']} ({snapshot['name']})") | |
except ConnectionError as e: | |
console.print(f"❌ Connection error: {e}") | |
console.print(f"Make sure the server is running on the instance") | |
sys.exit(1) | |
except KeyboardInterrupt: | |
console.print("Received keyboard interrupt, stopping agent") | |
except Exception as e: | |
console.print(f"❌ Error: {e}") | |
sys.exit(1) | |
finally: | |
if "agent" in locals(): | |
agent.stop() | |
# Stop the Morph instance | |
console.print("Stopping Morph instance...") | |
instance.stop() | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
annotated-types==0.7.0 | |
anthropic==0.49.0 | |
anyio==4.9.0 | |
bcrypt==4.3.0 | |
certifi==2025.1.31 | |
cffi==1.17.1 | |
charset-normalizer==3.4.1 | |
click==8.1.8 | |
cryptography==44.0.2 | |
distro==1.9.0 | |
dotenv==0.9.9 | |
fastapi==0.115.12 | |
h11==0.14.0 | |
httpcore==1.0.7 | |
httpx==0.28.1 | |
httpx-sse==0.4.0 | |
idna==3.10 | |
jiter==0.9.0 | |
markdown-it-py==3.0.0 | |
mcp==1.6.0 | |
mdurl==0.1.2 | |
morphcloud==0.1.32 | |
paramiko==3.5.1 | |
pathspec==0.12.1 | |
pillow==11.1.0 | |
psutil==7.0.0 | |
pycparser==2.22 | |
pydantic==2.11.1 | |
pydantic-core==2.33.0 | |
pydantic-settings==2.8.1 | |
pygments==2.19.1 | |
pynacl==1.5.0 | |
python-dotenv==1.1.0 | |
requests==2.32.3 | |
rich==13.9.4 | |
sniffio==1.3.1 | |
sse-starlette==2.2.1 | |
starlette==0.46.1 | |
tqdm==4.67.1 | |
typing-extensions==4.13.0 | |
typing-inspection==0.4.0 | |
urllib3==2.3.0 | |
uvicorn==0.34.0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import argparse | |
import sys | |
import time | |
from morphcloud.api import MorphCloudClient | |
def main(): | |
# Parse command line arguments | |
parser = argparse.ArgumentParser(description="Create a new snapshot from an existing one") | |
args = parser.parse_args() | |
# Initialize the Morph Cloud client | |
# API key will be read from MORPH_API_KEY environment variable | |
client = MorphCloudClient() | |
print(f"Starting instance from snapshot {args.snapshot_id}...") | |
instance = client.instances.start(args.snapshot_id) | |
try: | |
print(f"Instance {instance.id} created, waiting for it to be ready...") | |
instance.wait_until_ready(timeout=300) | |
print(f"Instance {instance.id} is now ready") | |
print("Creating new snapshot...") | |
new_snapshot = instance.snapshot() | |
print(f"New snapshot created: {new_snapshot.id}") | |
# Output just the snapshot ID for easy capture in shell scripts | |
print(new_snapshot.id) | |
return new_snapshot.id | |
finally: | |
print(f"Stopping instance {instance.id}...") | |
instance.stop() | |
print("Instance stopped") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment