Last active
March 21, 2025 18:37
-
-
Save johnandersen777/9b9b7ba25fc9c5eda445943bd457c87b to your computer and use it in GitHub Desktop.
TMUX context as JSON
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /// script | |
# dependencies = [ | |
# "docker", | |
# "psutil", | |
# "langchain", | |
# "langchain-openai", | |
# "langchain-community", | |
# "langchain-faiss", | |
# "faiss-cpu", | |
# "kubernetes", | |
# ] | |
# /// | |
import sys | |
import asyncio | |
import docker | |
import psutil | |
from kubernetes import client, config | |
from langchain_openai import ChatOpenAI | |
from langchain_openai.embeddings import OpenAIEmbeddings | |
from langchain_community.vectorstores import FAISS | |
from langchain.docstore.document import Document | |
# --------------------------------------------------------------------------- | |
# Helper functions to fetch information from various sources asynchronously | |
# --------------------------------------------------------------------------- | |
async def fetch_codebase_docs(question: str) -> list[Document]: | |
""" | |
Uses FAISS (an open source vector database) to retrieve relevant documents | |
from an arbitrarily large codebase. | |
Assumes that the FAISS index is persisted locally in the "faiss_codebase_index" directory. | |
""" | |
embeddings = OpenAIEmbeddings() | |
# Load the existing FAISS index for the codebase. | |
try: | |
vectorstore = FAISS.load_local("faiss_codebase_index", embeddings, allow_dangerous_deserialization=True) | |
except: | |
return [] | |
retriever = vectorstore.as_retriever(search_kwargs={"k": 5}) | |
# Since the retrieval method is synchronous, run it in a thread. | |
docs = await asyncio.to_thread(retriever.get_relevant_documents, question) | |
return docs | |
async def fetch_github_issues(question: str) -> list[Document]: | |
""" | |
Uses FAISS (an open source vector database) to retrieve GitHub issues | |
related to the codebase. | |
Assumes that the FAISS index is persisted locally in the "faiss_github_issues_index" directory. | |
""" | |
embeddings = OpenAIEmbeddings() | |
# Load the existing FAISS index for GitHub issues. | |
try: | |
vectorstore = FAISS.load_local("faiss_github_issues_index", embeddings, allow_dangerous_deserialization=True) | |
except: | |
return [] | |
retriever = vectorstore.as_retriever(search_kwargs={"k": 5}) | |
docs = await asyncio.to_thread(retriever.get_relevant_documents, question) | |
return docs | |
async def fetch_docker_logs() -> str: | |
""" | |
Retrieves logs from all running Docker containers. | |
""" | |
docker_client = docker.from_env() | |
logs = [] | |
for container in docker_client.containers.list(): | |
# Fetch container logs in a non-blocking manner. | |
# container_logs = await asyncio.to_thread(container.logs) | |
container_inspect = docker_client.api.inspect_container(container.id) | |
logs.append(f"Container {container.name} `docker inspect`:\n{container_inspect}") | |
# logs.append(f"Container {container.name} `docker inspect`:\n{container_logs.decode('utf-8')}") | |
return "\n\n".join(logs) | |
async def fetch_process_logs() -> str: | |
""" | |
Retrieves a list of currently running processes. | |
""" | |
logs = [] | |
for proc in psutil.process_iter(attrs=['pid', 'name']): | |
try: | |
logs.append(f"PID {proc.info['pid']} - {proc.info['name']}") | |
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): | |
continue | |
return "\n".join(logs) | |
async def fetch_k8s_info() -> str: | |
""" | |
Retrieves a list of deployments from all namespaces in the Kubernetes cluster. | |
Assumes that a valid kubeconfig is available. | |
""" | |
try: | |
config.load_kube_config() | |
except: | |
return "" | |
apps_v1 = client.AppsV1Api() | |
deployments = apps_v1.list_deployment_for_all_namespaces().items | |
info = [] | |
for dep in deployments: | |
info.append(f"Deployment '{dep.metadata.name}' in namespace '{dep.metadata.namespace}'") | |
return "\n".join(info) | |
# --------------------------------------------------------------------------- | |
# Main function that combines all sources and uses LangChain asynchronously | |
# --------------------------------------------------------------------------- | |
async def answer_question(question: str, schema: str) -> str: | |
""" | |
Concurrently fetches relevant documents and runtime logs, | |
combines the context, and then uses an async LLM to answer the given question. | |
""" | |
# Launch concurrent tasks for fetching each type of data. | |
codebase_docs_task = asyncio.create_task(fetch_codebase_docs(question)) | |
github_issues_task = asyncio.create_task(fetch_github_issues(question)) | |
docker_logs_task = asyncio.create_task(fetch_docker_logs()) | |
process_logs_task = asyncio.create_task(fetch_process_logs()) | |
k8s_info_task = asyncio.create_task(fetch_k8s_info()) | |
# Await all tasks concurrently. | |
codebase_docs = await codebase_docs_task | |
github_issues = await github_issues_task | |
docker_logs = await docker_logs_task | |
process_logs = await process_logs_task | |
k8s_info = await k8s_info_task | |
# Combine retrieved documents and logs into one context. | |
context = ( | |
"Codebase Documents:\n" + "\n".join([doc.page_content for doc in codebase_docs]) + "\n\n" + | |
"GitHub Issues:\n" + "\n".join([doc.page_content for doc in github_issues]) + "\n\n" + | |
"Docker Logs:\n" + docker_logs + "\n\n" + | |
"Process Logs:\n" + process_logs + "\n\n" + | |
"Kubernetes Deployments:\n" + k8s_info + "\n\n" | |
) | |
# Create a final prompt that includes the context and the user question. | |
final_prompt = ( | |
"You are an assistant that answers questions about a codebase and its runtime environment. " | |
"Your response MUST be formatted aligned with the following schema. Your response should be a JSON object where the top level keys are schema and data. You will return the schema which follows in schema and the data for your response in data:\n\n" | |
f"{schema}\n\n" | |
"The following information has been gathered:\n\n" | |
f"{context}\n\n" | |
f"Based on the above, answer the following question:\n{question}" | |
) | |
# Initialize the async LLM from OpenAI (ensure your API keys are set). | |
llm = ChatOpenAI(temperature=0) | |
# Generate the answer asynchronously. | |
answer = await llm.ainvoke(final_prompt) | |
return answer | |
# --------------------------------------------------------------------------- | |
# Entry point of the application | |
# --------------------------------------------------------------------------- | |
async def main(): | |
question = sys.stdin.read() | |
final_prompt = ( | |
"You are an assistant that outputs a JSON schema approiate for the question being asked. " | |
f"Based on the above, generate an output schema for the answer to the following question:\n{question}" | |
) | |
llm = ChatOpenAI(temperature=0) | |
answer = await llm.ainvoke(final_prompt) | |
schema = answer.content | |
# print(answer.content) | |
answer = await answer_question(question, schema) | |
print(answer.content) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
r""" | |
This is free and unencumbered software released into the public domain. | |
Anyone is free to copy, modify, publish, use, compile, sell, or | |
distribute this software, either in source code form or as a compiled | |
binary, for any purpose, commercial or non-commercial, and by any | |
means. | |
In jurisdictions that recognize copyright laws, the author or authors | |
of this software dedicate any and all copyright interest in the | |
software to the public domain. We make this dedication for the benefit | |
of the public at large and to the detriment of our heirs and | |
successors. We intend this dedication to be an overt act of | |
relinquishment in perpetuity of all present and future rights to this | |
software under copyright law. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, | |
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF | |
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. | |
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR | |
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, | |
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR | |
OTHER DEALINGS IN THE SOFTWARE. | |
TODO | |
- Update agi.py to deploy client side service so we aren't doing things with | |
bash so much in shell. Sock connects back and all should be API based | |
- Time tracking and engineering log gen | |
- Bash hook for new processes | |
- kubectl exec | |
- Capture context and insepct cluster, make available to LLM | |
- Trust boundries for code gen | |
- Metadata classes should be created and loaded dynamically and added to | |
vector db or whatever so that LLM can cruse through full context | |
- Some runs on server with OpenAI API key and some runs on client. | |
- Ideally we have some sort of proxy which issues scoped tokens so that the | |
client can talk "directly" to OpenAI API and we keep stuff context local | |
- https://github.com/slsa-framework/attested-build-environments-demo | |
- "Clippy" | |
- Open pane when debugging help is identified based on train of thought | |
analysis. | |
- Looks like you're trying to kubectl apply but it's stuck in | |
ContainerCreating, did you type the image name correctly? | |
""" | |
import json | |
import logging | |
from typing import Dict, List, Optional, Union | |
import libtmux | |
import psutil | |
from pydantic import BaseModel, Field | |
# ----------------------------- | |
# Define Pydantic data models | |
# ----------------------------- | |
class GitRemote(BaseModel): | |
name: str | |
url: str | |
class GitMetadata(BaseModel): | |
repo_root: str | |
remotes: List[GitRemote] = Field(default_factory=list) | |
class ProcessNode(BaseModel): | |
cmd: List[str] | |
children: List["ProcessNode"] = [] | |
# Allow recursive models | |
ProcessNode.model_rebuild() | |
class CommandMetadata(BaseModel): | |
tree: ProcessNode | |
# This is the wrapper model that will be used to hold any metadata type. | |
class MetadataWrapper(BaseModel): | |
metadata_class_entrypoint: str | |
data: Union[GitMetadata, CommandMetadata] | |
class TmuxContext(BaseModel): | |
sessions: Dict[str, dict] = Field(default_factory=dict) | |
# ----------------------------- | |
# Helper functions to collect metadata | |
# ----------------------------- | |
def get_git_metadata(cwd: str) -> Optional[GitMetadata]: | |
""" | |
Attempt to discover if the given cwd is inside a Git repository. | |
If it is, return a GitMetadata instance containing the repo's top level | |
directory (repo_root) and its remotes. Otherwise, return None. | |
""" | |
try: | |
from git import Repo # GitPython must be installed. | |
# search_parent_directories=True ensures that if cwd is a subdirectory, | |
# the repository root will be located. | |
repo = Repo(cwd, search_parent_directories=True) | |
repo_root = repo.git.rev_parse("--show-toplevel") | |
remotes = [] | |
for remote in repo.remotes: | |
# Each remote can have one or more URLs; here we add each as a GitRemote. | |
for url in remote.urls: | |
remotes.append(GitRemote(name=remote.name, url=url)) | |
return GitMetadata(repo_root=repo_root, remotes=remotes) | |
except Exception as error: | |
logging.exception("get_git_metadata(%s)", cwd, exc_info=error) | |
# cwd is likely not in a Git repository or an error occurred. | |
return None | |
def build_process_tree(proc: psutil.Process) -> ProcessNode: | |
""" | |
Recursively builds a tree structure for the given process. | |
""" | |
try: | |
cmdline = proc.cmdline() | |
# NOTE Top level PIDs of bash processes seem to be "-bash", this fixes | |
if cmdline[0].startswith("-"): | |
cmdline[0] = cmdline[0][1:] | |
except Exception as error: | |
logging.exception("build_process_tree(%s)", proc, exc_info=error) | |
cmdline = [] | |
try: | |
children = proc.children() | |
except Exception as error: | |
logging.exception("build_process_tree(%s)", proc, exc_info=error) | |
children = [] | |
child_nodes = [build_process_tree(child) for child in children] | |
return ProcessNode(cmd=cmdline, children=child_nodes) | |
def get_command_metadata(pid: int) -> CommandMetadata: | |
""" | |
Returns a CommandMetadata instance containing a process tree for the given PID. | |
The tree includes the command-line (cmd) for the process and all its child processes. | |
""" | |
try: | |
root_proc = psutil.Process(pid) | |
tree = build_process_tree(root_proc) | |
return CommandMetadata(tree=tree) | |
except Exception as error: | |
logging.exception("get_command_metadata(%d)", pid, exc_info=error) | |
# Process may have exited or access is denied. | |
return None | |
# ----------------------------- | |
# Main function to gather tmux info with metadata | |
# ----------------------------- | |
def get_tmux_window_info(): | |
""" | |
Create a nested dictionary where: | |
- The keys of the outer dictionary are session names. | |
- For each session, keys of the inner dictionary are window names. | |
- For each window, a list of pane dictionaries is stored. | |
Each pane dictionary contains: | |
- 'cwd': current working directory of the pane. | |
- 'pid': pane's process id. | |
- 'metadata': a list of MetadataWrapper instances providing additional info. | |
""" | |
server = libtmux.Server() | |
tmux_ctx = TmuxContext() | |
session_info = tmux_ctx.sessions | |
for session in server.sessions: | |
windows_info = {} | |
for window in session.windows: | |
pane_details = [] | |
for pane in window.panes: | |
cwd = pane.pane_current_path | |
pid = int(pane.pane_pid) | |
metadata_entries = [] | |
# Discover Git metadata based on the pane's cwd. | |
git_meta = get_git_metadata(cwd) | |
if git_meta is not None: | |
git_wrapper = MetadataWrapper( | |
metadata_class_entrypoint="my_module.git_metadata.GitMetadata", | |
data=git_meta | |
) | |
metadata_entries.append(git_wrapper) | |
# Discover command metadata based on the pane's pid. | |
cmd_meta = get_command_metadata(pid) | |
if cmd_meta is not None: | |
cmd_wrapper = MetadataWrapper( | |
metadata_class_entrypoint="my_module.command_metadata.CommandMetadata", | |
data=cmd_meta | |
) | |
metadata_entries.append(cmd_wrapper) | |
pane_detail = { | |
"cwd": cwd, | |
"pid": pid, | |
"metadata": metadata_entries, | |
} | |
pane_details.append(pane_detail) | |
windows_info[window.name] = pane_details | |
session_info[session.name] = windows_info | |
return tmux_ctx | |
if __name__ == "__main__": | |
info = get_tmux_window_info() | |
print(info.model_dump_json()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sessions: | |
"0": | |
atprotobin: | |
- cwd: /home/johnandersen777 | |
pid: 8494 | |
metadata: | |
- metadata_class_entrypoint: my_module.command_metadata.CommandMetadata | |
data: | |
tree: | |
cmd: | |
- bash | |
children: | |
- cmd: | |
- python | |
- -m | |
- atprotobin | |
children: [] | |
atproto-search-flights-down: | |
- cwd: /home/johnandersen777/.tmp/tmp.Udu4NeXwc3 | |
pid: 9920 | |
metadata: | |
- metadata_class_entrypoint: my_module.git_metadata.GitMetadata | |
data: | |
repo_root: /home/johnandersen777/.tmp/tmp.Udu4NeXwc3 | |
remotes: | |
- name: origin | |
url: https://gist.github.com/johnandersen777/9b9b7ba25fc9c5eda445943bd457c87b | |
- metadata_class_entrypoint: my_module.command_metadata.CommandMetadata | |
data: | |
tree: | |
cmd: | |
- bash | |
children: | |
- cmd: | |
- python | |
- context_tmux.py | |
children: [] | |
- cmd: | |
- yq | |
- -P | |
children: [] | |
- cmd: | |
- tee | |
- zzz_example_output.yaml | |
children: [] | |
bash: | |
- cwd: /home/johnandersen777/Documents/python/scitt-api-emulator/github_webhook_events | |
pid: 11678 | |
metadata: | |
- metadata_class_entrypoint: my_module.git_metadata.GitMetadata | |
data: | |
repo_root: /home/johnandersen777/Documents/python/scitt-api-emulator | |
remotes: | |
- name: upstream | |
url: https://github.com/scitt-community/scitt-api-emulator | |
- name: origin | |
url: https://github.com/pdxjohnny/scitt-api-emulator.git | |
- metadata_class_entrypoint: my_module.command_metadata.CommandMetadata | |
data: | |
tree: | |
cmd: | |
- bash | |
children: | |
- cmd: | |
- python | |
- -m | |
- uvicorn | |
- agi:app | |
- --uds | |
- /tmp/agi.sock | |
children: [] | |
- cwd: /home/johnandersen777/Documents/python/scitt-api-emulator/github_webhook_events | |
pid: 11871 | |
metadata: | |
- metadata_class_entrypoint: my_module.git_metadata.GitMetadata | |
data: | |
repo_root: /home/johnandersen777/Documents/python/scitt-api-emulator | |
remotes: | |
- name: upstream | |
url: https://github.com/scitt-community/scitt-api-emulator | |
- name: origin | |
url: https://github.com/pdxjohnny/scitt-api-emulator.git | |
- metadata_class_entrypoint: my_module.command_metadata.CommandMetadata | |
data: | |
tree: | |
cmd: | |
- bash | |
children: [] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment