Last active
August 9, 2024 20:51
-
-
Save tinycrops/4886cd9b248958a92d01a7d057178e4d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyperclip | |
import time | |
import os | |
import shutil | |
from PIL import Image | |
import hashlib | |
import base64 | |
import json | |
from datetime import datetime | |
from pathlib import Path | |
from openai import OpenAI | |
import anthropic | |
# Configuration | |
ASSETS_DIR = "/home/path/assets" | |
LOG_FILE = "/home/path/clipboard_log.txt" | |
DATASET_FILE = "/home/path/dataset.jsonl" | |
SCREENSHOTS_DIR = os.path.expanduser("~/Pictures/Screenshots") | |
# AI Configuration | |
AI_PROVIDER = "openai" | |
AI_MODEL = "gpt-4o-mini" | |
SYSTEM_MESSAGE = "You are a poetic assistant, skilled in explaining complex programming concepts with creative flair." | |
# Ensure directories exist | |
os.makedirs(ASSETS_DIR, exist_ok=True) | |
os.makedirs(SCREENSHOTS_DIR, exist_ok=True) | |
# Initialize AI client | |
if AI_PROVIDER == "openai": | |
client = OpenAI() | |
elif AI_PROVIDER == "anthropic": | |
client = anthropic.Anthropic() | |
else: | |
raise ValueError(f"Unsupported AI provider: {AI_PROVIDER}") | |
def log_event(event_type, content): | |
timestamp = datetime.now().isoformat() | |
with open(LOG_FILE, "a") as f: | |
f.write(f"{timestamp} - {event_type}: {content}\n") | |
def get_image_hash(image_path): | |
with open(image_path, "rb") as f: | |
return hashlib.md5(f.read()).hexdigest() | |
def save_image(image_path): | |
image_hash = get_image_hash(image_path) | |
new_path = os.path.join(ASSETS_DIR, f"{image_hash}.png") | |
shutil.copy(image_path, new_path) | |
return new_path | |
def encode_image(image_path): | |
with open(image_path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode("utf-8") | |
def get_latest_screenshot(): | |
screenshots = sorted(Path(SCREENSHOTS_DIR).glob("*.png"), key=os.path.getmtime, reverse=True) | |
return screenshots[0] if screenshots else None | |
def process_clipboard_content(): | |
text_content = pyperclip.paste() | |
latest_screenshot = get_latest_screenshot() | |
if text_content: | |
return {"type": "text", "content": text_content} | |
elif latest_screenshot: | |
file_path = save_image(latest_screenshot) | |
return {"type": "image", "content": file_path} | |
else: | |
return None | |
def query_ai(content): | |
try: | |
if AI_PROVIDER == "openai": | |
messages = [ | |
{"role": "system", "content": SYSTEM_MESSAGE}, | |
{"role": "user", "content": []} | |
] | |
if content["type"] == "text": | |
messages[1]["content"].append({"type": "text", "text": f"This is my most recently copied clipboard content: {content['content']}\nWhat do you think?"}) | |
elif content["type"] == "image": | |
base64_image = encode_image(content["content"]) | |
messages[1]["content"].extend([ | |
{"type": "text", "text": "This is my most recently saved screenshot. What do you think?"}, | |
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}", "detail": "high"}} | |
]) | |
completion = client.chat.completions.create( | |
model=AI_MODEL, | |
messages=messages, | |
max_tokens=999 | |
) | |
response = completion.choices[0].message.content | |
elif AI_PROVIDER == "anthropic": | |
if content["type"] == "text": | |
prompt = f"This is my most recently copied clipboard content: {content['content']}\nWhat do you think?" | |
elif content["type"] == "image": | |
base64_image = encode_image(content["content"]) | |
prompt = f"This is my most recently saved screenshot: [image: data:image/png;base64,{base64_image}]\nWhat do you think?" | |
message = client.messages.create( | |
model=AI_MODEL, | |
max_tokens=999, | |
messages=[ | |
{"role": "system", "content": SYSTEM_MESSAGE}, | |
{"role": "user", "content": prompt} | |
] | |
) | |
response = message.content | |
print("AI says:", response) | |
return response | |
except Exception as e: | |
print("Error querying AI:", str(e)) | |
return None | |
def save_to_dataset(content, ai_response): | |
data = { | |
"timestamp": datetime.now().isoformat(), | |
"content": content, | |
"ai_response": ai_response, | |
"ai_provider": AI_PROVIDER, | |
"ai_model": AI_MODEL, | |
"system_message": SYSTEM_MESSAGE | |
} | |
with open(DATASET_FILE, "a") as f: | |
json.dump(data, f) | |
f.write("\n") | |
def main(): | |
print("Monitoring clipboard and screenshots... Press Ctrl+C to exit.") | |
last_content = None | |
try: | |
while True: | |
current_content = process_clipboard_content() | |
if current_content and current_content != last_content: | |
log_event("Content Changed", f"Type: {current_content['type']}") | |
ai_response = query_ai(current_content) | |
if ai_response: | |
log_event("AI Response", ai_response[:50] + "..." if len(ai_response) > 50 else ai_response) | |
save_to_dataset(current_content, ai_response) | |
last_content = current_content | |
time.sleep(1) | |
except KeyboardInterrupt: | |
print("\nMonitoring stopped.") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment