Last active
January 27, 2024 03:50
-
-
Save Saik0s/43209cc3ce41d0abe260a4edf2921055 to your computer and use it in GitHub Desktop.
Realtime Screen Analyzer using cogvlm
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import csv | |
import io | |
import os | |
import time | |
from datetime import datetime, timedelta | |
from typing import Tuple | |
import numpy as np | |
import pyautogui | |
import requests | |
import sieve | |
from PIL import Image | |
from rich import print | |
# Assuming sieve_function and another_sieve_function are defined to handle sieve requests | |
def sieve_caption(image_array): | |
print("Getting caption...") | |
start = time.time() | |
image = sieve.Image(array=image_array) | |
prompt = "Caption this screenshot in extremely high detail and summarize the content of each window separately." | |
vqa_mode = False | |
cogvlm_chat = sieve.function.get("sieve/cogvlm-chat") | |
output = cogvlm_chat.run(image, prompt, vqa_mode) | |
end = time.time() | |
print(f"Time taken: {end - start} seconds") | |
print(output) | |
return output | |
def sieve_summary(captions): | |
print("Getting summary...") | |
start = time.time() | |
user_prompt = "\n".join(captions) | |
system_prompt = "Summarize the captions and describe what user was doing in the last 15 minutes." | |
mixtral = sieve.function.get("sieve/mixtral") | |
output = mixtral.run(user_prompt, system_prompt) | |
end = time.time() | |
print(f"Time taken: {end - start} seconds") | |
print(output) | |
return output | |
# Initialize CSV file | |
csv_filename = "screenshot_data.csv" | |
with open(csv_filename, "w", newline="") as file: | |
writer = csv.writer(file) | |
writer.writerow(["Timestamp", "Caption", "Run time", "Summary"]) | |
def take_screenshot_and_get_caption() -> Tuple[str, str]: | |
print("Taking screenshot...") | |
start = time.time() | |
timestamp = datetime.now().isoformat() | |
# Take a screenshot | |
screenshot: Image.Image = pyautogui.screenshot() | |
# Resize the screenshot to fit 1256x1256 and optimize for size | |
screenshot = screenshot.convert("RGB") | |
screenshot.thumbnail((1256, 1256)) | |
# Convert the optimized screenshot to a base64 URI | |
buffered: io.BytesIO = io.BytesIO() | |
screenshot.save(buffered, format="JPEG", optimize=True, quality=100) | |
desktop_path = os.path.join( | |
os.path.expanduser("~"), "Desktop", "latest_screenshot.jpg" | |
) | |
screenshot.save(desktop_path, format="JPEG", optimize=True, quality=100) | |
# Send request to sieve and get caption | |
image_array: np.ndarray = np.array(screenshot) | |
caption = sieve_caption(image_array) | |
end = time.time() | |
print(f"Time taken: {end - start} seconds") | |
# Save to CSV with timestamp | |
with open(csv_filename, "a", newline="") as file: | |
writer = csv.writer(file) | |
writer.writerow([timestamp, caption, end - start, ""]) | |
return timestamp, caption | |
def get_summary_of_captions(): | |
# Filter entries from the last 15 minutes and combine captions | |
time_threshold = datetime.now() - timedelta(minutes=15) | |
captions = [] | |
with open(csv_filename, "r") as file: | |
reader = csv.DictReader(file) | |
for row in reader: | |
if datetime.fromisoformat(row["Timestamp"]) > time_threshold: | |
captions.append(row["Caption"]) | |
# Send combined captions to another sieve endpoint to get a summary | |
if captions: | |
print(f"Captions: {captions}") | |
start = time.time() | |
summary = sieve_summary(captions) | |
end = time.time() | |
print(f"Summary: {summary}") | |
print(f"Time taken: {end - start} seconds") | |
with open(csv_filename, "a", newline="") as file: | |
writer = csv.writer(file) | |
writer.writerow(["", "", end - start, summary]) | |
return summary | |
# Infinite loop | |
screenshot_count = 0 | |
while True: | |
try: | |
start = time.time() | |
print(f"Screenshot #{screenshot_count}") | |
take_screenshot_and_get_caption() | |
screenshot_count += 1 | |
# Every 5 screenshots, get summary | |
if screenshot_count % 5 == 0: | |
get_summary_of_captions() | |
elapsed_time: float = time.time() - start | |
sleep_duration: float = max(0, 30 - elapsed_time) | |
if sleep_duration > 0: | |
print(f"Sleeping for {sleep_duration} seconds...") | |
time.sleep(sleep_duration) | |
except Exception as e: | |
print(e) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment