Skip to content

Instantly share code, notes, and snippets.

@johnandersen777
Last active March 21, 2025 18:37
Show Gist options
  • Save johnandersen777/9b9b7ba25fc9c5eda445943bd457c87b to your computer and use it in GitHub Desktop.
Save johnandersen777/9b9b7ba25fc9c5eda445943bd457c87b to your computer and use it in GitHub Desktop.
TMUX context as JSON
# /// script
# dependencies = [
# "docker",
# "psutil",
# "langchain",
# "langchain-openai",
# "langchain-community",
# "langchain-faiss",
# "faiss-cpu",
# "kubernetes",
# ]
# ///
import sys
import asyncio
import docker
import psutil
from kubernetes import client, config
from langchain_openai import ChatOpenAI
from langchain_openai.embeddings import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.docstore.document import Document
# ---------------------------------------------------------------------------
# Helper functions to fetch information from various sources asynchronously
# ---------------------------------------------------------------------------
async def fetch_codebase_docs(question: str) -> list[Document]:
"""
Uses FAISS (an open source vector database) to retrieve relevant documents
from an arbitrarily large codebase.
Assumes that the FAISS index is persisted locally in the "faiss_codebase_index" directory.
"""
embeddings = OpenAIEmbeddings()
# Load the existing FAISS index for the codebase.
try:
vectorstore = FAISS.load_local("faiss_codebase_index", embeddings, allow_dangerous_deserialization=True)
except:
return []
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# Since the retrieval method is synchronous, run it in a thread.
docs = await asyncio.to_thread(retriever.get_relevant_documents, question)
return docs
async def fetch_github_issues(question: str) -> list[Document]:
"""
Uses FAISS (an open source vector database) to retrieve GitHub issues
related to the codebase.
Assumes that the FAISS index is persisted locally in the "faiss_github_issues_index" directory.
"""
embeddings = OpenAIEmbeddings()
# Load the existing FAISS index for GitHub issues.
try:
vectorstore = FAISS.load_local("faiss_github_issues_index", embeddings, allow_dangerous_deserialization=True)
except:
return []
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
docs = await asyncio.to_thread(retriever.get_relevant_documents, question)
return docs
async def fetch_docker_logs() -> str:
"""
Retrieves logs from all running Docker containers.
"""
docker_client = docker.from_env()
logs = []
for container in docker_client.containers.list():
# Fetch container logs in a non-blocking manner.
# container_logs = await asyncio.to_thread(container.logs)
container_inspect = docker_client.api.inspect_container(container.id)
logs.append(f"Container {container.name} `docker inspect`:\n{container_inspect}")
# logs.append(f"Container {container.name} `docker inspect`:\n{container_logs.decode('utf-8')}")
return "\n\n".join(logs)
async def fetch_process_logs() -> str:
"""
Retrieves a list of currently running processes.
"""
logs = []
for proc in psutil.process_iter(attrs=['pid', 'name']):
try:
logs.append(f"PID {proc.info['pid']} - {proc.info['name']}")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return "\n".join(logs)
async def fetch_k8s_info() -> str:
"""
Retrieves a list of deployments from all namespaces in the Kubernetes cluster.
Assumes that a valid kubeconfig is available.
"""
try:
config.load_kube_config()
except:
return ""
apps_v1 = client.AppsV1Api()
deployments = apps_v1.list_deployment_for_all_namespaces().items
info = []
for dep in deployments:
info.append(f"Deployment '{dep.metadata.name}' in namespace '{dep.metadata.namespace}'")
return "\n".join(info)
# ---------------------------------------------------------------------------
# Main function that combines all sources and uses LangChain asynchronously
# ---------------------------------------------------------------------------
async def answer_question(question: str, schema: str) -> str:
"""
Concurrently fetches relevant documents and runtime logs,
combines the context, and then uses an async LLM to answer the given question.
"""
# Launch concurrent tasks for fetching each type of data.
codebase_docs_task = asyncio.create_task(fetch_codebase_docs(question))
github_issues_task = asyncio.create_task(fetch_github_issues(question))
docker_logs_task = asyncio.create_task(fetch_docker_logs())
process_logs_task = asyncio.create_task(fetch_process_logs())
k8s_info_task = asyncio.create_task(fetch_k8s_info())
# Await all tasks concurrently.
codebase_docs = await codebase_docs_task
github_issues = await github_issues_task
docker_logs = await docker_logs_task
process_logs = await process_logs_task
k8s_info = await k8s_info_task
# Combine retrieved documents and logs into one context.
context = (
"Codebase Documents:\n" + "\n".join([doc.page_content for doc in codebase_docs]) + "\n\n" +
"GitHub Issues:\n" + "\n".join([doc.page_content for doc in github_issues]) + "\n\n" +
"Docker Logs:\n" + docker_logs + "\n\n" +
"Process Logs:\n" + process_logs + "\n\n" +
"Kubernetes Deployments:\n" + k8s_info + "\n\n"
)
# Create a final prompt that includes the context and the user question.
final_prompt = (
"You are an assistant that answers questions about a codebase and its runtime environment. "
"Your response MUST be formatted aligned with the following schema. Your response should be a JSON object where the top level keys are schema and data. You will return the schema which follows in schema and the data for your response in data:\n\n"
f"{schema}\n\n"
"The following information has been gathered:\n\n"
f"{context}\n\n"
f"Based on the above, answer the following question:\n{question}"
)
# Initialize the async LLM from OpenAI (ensure your API keys are set).
llm = ChatOpenAI(temperature=0)
# Generate the answer asynchronously.
answer = await llm.ainvoke(final_prompt)
return answer
# ---------------------------------------------------------------------------
# Entry point of the application
# ---------------------------------------------------------------------------
async def main():
question = sys.stdin.read()
final_prompt = (
"You are an assistant that outputs a JSON schema approiate for the question being asked. "
f"Based on the above, generate an output schema for the answer to the following question:\n{question}"
)
llm = ChatOpenAI(temperature=0)
answer = await llm.ainvoke(final_prompt)
schema = answer.content
# print(answer.content)
answer = await answer_question(question, schema)
print(answer.content)
if __name__ == "__main__":
asyncio.run(main())
r"""
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
TODO
- Update agi.py to deploy client side service so we aren't doing things with
bash so much in shell. Sock connects back and all should be API based
- Time tracking and engineering log gen
- Bash hook for new processes
- kubectl exec
- Capture context and insepct cluster, make available to LLM
- Trust boundries for code gen
- Metadata classes should be created and loaded dynamically and added to
vector db or whatever so that LLM can cruse through full context
- Some runs on server with OpenAI API key and some runs on client.
- Ideally we have some sort of proxy which issues scoped tokens so that the
client can talk "directly" to OpenAI API and we keep stuff context local
- https://github.com/slsa-framework/attested-build-environments-demo
- "Clippy"
- Open pane when debugging help is identified based on train of thought
analysis.
- Looks like you're trying to kubectl apply but it's stuck in
ContainerCreating, did you type the image name correctly?
"""
import json
import logging
from typing import Dict, List, Optional, Union
import libtmux
import psutil
from pydantic import BaseModel, Field
# -----------------------------
# Define Pydantic data models
# -----------------------------
class GitRemote(BaseModel):
name: str
url: str
class GitMetadata(BaseModel):
repo_root: str
remotes: List[GitRemote] = Field(default_factory=list)
class ProcessNode(BaseModel):
cmd: List[str]
children: List["ProcessNode"] = []
# Allow recursive models
ProcessNode.model_rebuild()
class CommandMetadata(BaseModel):
tree: ProcessNode
# This is the wrapper model that will be used to hold any metadata type.
class MetadataWrapper(BaseModel):
metadata_class_entrypoint: str
data: Union[GitMetadata, CommandMetadata]
class TmuxContext(BaseModel):
sessions: Dict[str, dict] = Field(default_factory=dict)
# -----------------------------
# Helper functions to collect metadata
# -----------------------------
def get_git_metadata(cwd: str) -> Optional[GitMetadata]:
"""
Attempt to discover if the given cwd is inside a Git repository.
If it is, return a GitMetadata instance containing the repo's top level
directory (repo_root) and its remotes. Otherwise, return None.
"""
try:
from git import Repo # GitPython must be installed.
# search_parent_directories=True ensures that if cwd is a subdirectory,
# the repository root will be located.
repo = Repo(cwd, search_parent_directories=True)
repo_root = repo.git.rev_parse("--show-toplevel")
remotes = []
for remote in repo.remotes:
# Each remote can have one or more URLs; here we add each as a GitRemote.
for url in remote.urls:
remotes.append(GitRemote(name=remote.name, url=url))
return GitMetadata(repo_root=repo_root, remotes=remotes)
except Exception as error:
logging.exception("get_git_metadata(%s)", cwd, exc_info=error)
# cwd is likely not in a Git repository or an error occurred.
return None
def build_process_tree(proc: psutil.Process) -> ProcessNode:
"""
Recursively builds a tree structure for the given process.
"""
try:
cmdline = proc.cmdline()
# NOTE Top level PIDs of bash processes seem to be "-bash", this fixes
if cmdline[0].startswith("-"):
cmdline[0] = cmdline[0][1:]
except Exception as error:
logging.exception("build_process_tree(%s)", proc, exc_info=error)
cmdline = []
try:
children = proc.children()
except Exception as error:
logging.exception("build_process_tree(%s)", proc, exc_info=error)
children = []
child_nodes = [build_process_tree(child) for child in children]
return ProcessNode(cmd=cmdline, children=child_nodes)
def get_command_metadata(pid: int) -> CommandMetadata:
"""
Returns a CommandMetadata instance containing a process tree for the given PID.
The tree includes the command-line (cmd) for the process and all its child processes.
"""
try:
root_proc = psutil.Process(pid)
tree = build_process_tree(root_proc)
return CommandMetadata(tree=tree)
except Exception as error:
logging.exception("get_command_metadata(%d)", pid, exc_info=error)
# Process may have exited or access is denied.
return None
# -----------------------------
# Main function to gather tmux info with metadata
# -----------------------------
def get_tmux_window_info():
"""
Create a nested dictionary where:
- The keys of the outer dictionary are session names.
- For each session, keys of the inner dictionary are window names.
- For each window, a list of pane dictionaries is stored.
Each pane dictionary contains:
- 'cwd': current working directory of the pane.
- 'pid': pane's process id.
- 'metadata': a list of MetadataWrapper instances providing additional info.
"""
server = libtmux.Server()
tmux_ctx = TmuxContext()
session_info = tmux_ctx.sessions
for session in server.sessions:
windows_info = {}
for window in session.windows:
pane_details = []
for pane in window.panes:
cwd = pane.pane_current_path
pid = int(pane.pane_pid)
metadata_entries = []
# Discover Git metadata based on the pane's cwd.
git_meta = get_git_metadata(cwd)
if git_meta is not None:
git_wrapper = MetadataWrapper(
metadata_class_entrypoint="my_module.git_metadata.GitMetadata",
data=git_meta
)
metadata_entries.append(git_wrapper)
# Discover command metadata based on the pane's pid.
cmd_meta = get_command_metadata(pid)
if cmd_meta is not None:
cmd_wrapper = MetadataWrapper(
metadata_class_entrypoint="my_module.command_metadata.CommandMetadata",
data=cmd_meta
)
metadata_entries.append(cmd_wrapper)
pane_detail = {
"cwd": cwd,
"pid": pid,
"metadata": metadata_entries,
}
pane_details.append(pane_detail)
windows_info[window.name] = pane_details
session_info[session.name] = windows_info
return tmux_ctx
if __name__ == "__main__":
info = get_tmux_window_info()
print(info.model_dump_json())
sessions:
"0":
atprotobin:
- cwd: /home/johnandersen777
pid: 8494
metadata:
- metadata_class_entrypoint: my_module.command_metadata.CommandMetadata
data:
tree:
cmd:
- bash
children:
- cmd:
- python
- -m
- atprotobin
children: []
atproto-search-flights-down:
- cwd: /home/johnandersen777/.tmp/tmp.Udu4NeXwc3
pid: 9920
metadata:
- metadata_class_entrypoint: my_module.git_metadata.GitMetadata
data:
repo_root: /home/johnandersen777/.tmp/tmp.Udu4NeXwc3
remotes:
- name: origin
url: https://gist.github.com/johnandersen777/9b9b7ba25fc9c5eda445943bd457c87b
- metadata_class_entrypoint: my_module.command_metadata.CommandMetadata
data:
tree:
cmd:
- bash
children:
- cmd:
- python
- context_tmux.py
children: []
- cmd:
- yq
- -P
children: []
- cmd:
- tee
- zzz_example_output.yaml
children: []
bash:
- cwd: /home/johnandersen777/Documents/python/scitt-api-emulator/github_webhook_events
pid: 11678
metadata:
- metadata_class_entrypoint: my_module.git_metadata.GitMetadata
data:
repo_root: /home/johnandersen777/Documents/python/scitt-api-emulator
remotes:
- name: upstream
url: https://github.com/scitt-community/scitt-api-emulator
- name: origin
url: https://github.com/pdxjohnny/scitt-api-emulator.git
- metadata_class_entrypoint: my_module.command_metadata.CommandMetadata
data:
tree:
cmd:
- bash
children:
- cmd:
- python
- -m
- uvicorn
- agi:app
- --uds
- /tmp/agi.sock
children: []
- cwd: /home/johnandersen777/Documents/python/scitt-api-emulator/github_webhook_events
pid: 11871
metadata:
- metadata_class_entrypoint: my_module.git_metadata.GitMetadata
data:
repo_root: /home/johnandersen777/Documents/python/scitt-api-emulator
remotes:
- name: upstream
url: https://github.com/scitt-community/scitt-api-emulator
- name: origin
url: https://github.com/pdxjohnny/scitt-api-emulator.git
- metadata_class_entrypoint: my_module.command_metadata.CommandMetadata
data:
tree:
cmd:
- bash
children: []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment