Skip to content

Instantly share code, notes, and snippets.

@daviddwlee84
Last active July 23, 2024 10:09
Show Gist options
  • Save daviddwlee84/2bd81fd12d4505297164b91a37b11e78 to your computer and use it in GitHub Desktop.
Save daviddwlee84/2bd81fd12d4505297164b91a37b11e78 to your computer and use it in GitHub Desktop.
A PyTorch training script that can find an available GPU to train (otherwise will wait) with lock protection and custom arguments for parameters. https://chatgpt.com/share/4c82e79d-c80b-4d06-b802-fa7a8cf34ee8 https://chatgpt.com/share/1d02b8c2-78c5-44cc-8db5-b1cad0c4e191
from typing import Tuple
import GPUtil
import torch
import time
import os
from filelock import FileLock
from tap import Tap
# pip install gputil torch filelock typed-argument-parser
# LOCK_DIR = "/tmp/gpu_locks" # Might have some permission issue
LOCK_DIR = os.path.expanduser('~/.gpu_locks')
LOCK_EXTENSION = ".lock"
WAIT_TIME = 10
def create_lock_dir() -> None:
if not os.path.exists(LOCK_DIR):
os.makedirs(LOCK_DIR)
def get_lock_file_path(gpu_id: int) -> str:
return os.path.join(LOCK_DIR, f"gpu_{gpu_id}{LOCK_EXTENSION}")
def get_available_gpu() -> Tuple[int, FileLock]:
create_lock_dir()
while True:
available_gpus = GPUtil.getAvailable(
order="first", limit=1, maxLoad=0.05, maxMemory=0.05, includeNan=False
)
for gpu_id in available_gpus:
lock_file = get_lock_file_path(gpu_id)
lock = FileLock(lock_file)
try:
lock.acquire(timeout=0) # Try to acquire the lock without waiting
return gpu_id, lock
except:
continue
print("No available GPUs. Waiting...")
time.sleep(WAIT_TIME)
class Args(Tap):
learning_rate: float = 0.01 # Learning rate for the optimizer
epochs: int = 10 # Number of epochs to train
gpu_id: int = -1 # GPU ID to use, -1 for automatic allocation
class DummyContextManager:
def __enter__(self):
pass # No setup needed
def __exit__(self, exc_type, exc_value, traceback):
pass # No cleanup needed
def __bool__(self) -> bool:
return False # Make this object a False means "no lock"
def main():
args = Args().parse_args()
if torch.cuda.is_available():
if args.gpu_id == -1:
gpu_id, lock = get_available_gpu()
else:
gpu_id = args.gpu_id
lock_file = get_lock_file_path(gpu_id)
lock = FileLock(lock_file)
try:
lock.acquire(timeout=0)
except:
print(f"GPU {gpu_id} is currently occupied. Waiting...")
while True:
try:
lock.acquire(timeout=0)
break
except:
time.sleep(WAIT_TIME)
device = torch.device(f"cuda:{gpu_id}")
else:
device = torch.device("cpu")
lock = DummyContextManager()
print(f"Using device {device}")
# NOTE: since we "with lock" we don't need to `lock.release()` manually
with lock:
try:
# Example model and training loop
model = torch.nn.Linear(10, 1).to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=args.learning_rate)
criterion = torch.nn.MSELoss()
# Dummy data
data = torch.randn(100, 10).to(device)
target = torch.randn(100, 1).to(device)
# Training loop
for epoch in range(args.epochs):
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
print(f"Epoch {epoch + 1}, Loss: {loss.item()}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
if lock:
print("Released lock for GPU")
else:
print("No lock to be released. We don't create lock when we are using CPU.")
if __name__ == "__main__":
main()
from typing import Tuple
import GPUtil
import torch
import time
import os
from filelock import FileLock
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
# pip install gputil torch filelock typed-argument-parser fastapi uvicorn
# Constants
LOCK_DIR = os.path.expanduser('~/.gpu_locks')
LOCK_EXTENSION = ".lock"
WAIT_TIME = 10
app = FastAPI()
def create_lock_dir() -> None:
if not os.path.exists(LOCK_DIR):
os.makedirs(LOCK_DIR)
def get_lock_file_path(gpu_id: int) -> str:
return os.path.join(LOCK_DIR, f"gpu_{gpu_id}{LOCK_EXTENSION}")
def get_available_gpu() -> Tuple[int, FileLock]:
create_lock_dir()
while True:
available_gpus = GPUtil.getAvailable(
order="first", limit=1, maxLoad=0.05, maxMemory=0.05, includeNan=False
)
for gpu_id in available_gpus:
lock_file = get_lock_file_path(gpu_id)
lock = FileLock(lock_file)
try:
lock.acquire(timeout=0) # Try to acquire the lock without waiting
return gpu_id, lock
except:
continue
print("No available GPUs. Waiting...")
time.sleep(WAIT_TIME)
class TrainTask(BaseModel):
learning_rate: float = 0.01 # Learning rate for the optimizer
epochs: int = 10 # Number of epochs to train
gpu_id: int = -1 # GPU ID to use, -1 for automatic allocation
class DummyContextManager:
def __enter__(self):
pass # No setup needed
def __exit__(self, exc_type, exc_value, traceback):
pass # No cleanup needed
def __bool__(self) -> bool:
return False # Make this object a False means "no lock"
def train_model(task: TrainTask):
args = task
if torch.cuda.is_available():
if args.gpu_id == -1:
gpu_id, lock = get_available_gpu()
else:
gpu_id = args.gpu_id
lock_file = get_lock_file_path(gpu_id)
lock = FileLock(lock_file)
try:
lock.acquire(timeout=0)
except:
print(f"GPU {gpu_id} is currently occupied. Waiting...")
while True:
try:
lock.acquire(timeout=0)
break
except:
time.sleep(WAIT_TIME)
device = torch.device(f"cuda:{gpu_id}")
else:
device = torch.device("cpu")
lock = DummyContextManager()
print(f"Using device {device}")
with lock:
try:
# Example model and training loop
model = torch.nn.Linear(10, 1).to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=args.learning_rate)
criterion = torch.nn.MSELoss()
# Dummy data
data = torch.randn(100, 10).to(device)
target = torch.randn(100, 1).to(device)
# Training loop
for epoch in range(args.epochs):
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
print(f"Epoch {epoch + 1}, Loss: {loss.item()}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
if lock:
print("Released lock for GPU")
else:
print("No lock to be released. We don't create lock when we are using CPU.")
@app.post("/train")
def submit_training(task: TrainTask, background_tasks: BackgroundTasks):
background_tasks.add_task(train_model, task)
return {"message": "Training task has been submitted", "task": task}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
from typing import Tuple, Dict
import GPUtil
import torch
import time
import os
import uuid
from filelock import FileLock
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from concurrent.futures import ThreadPoolExecutor, as_completed
# pip install gputil torch filelock typed-argument-parser fastapi uvicorn
# Constants
LOCK_DIR = os.path.expanduser("~/.gpu_locks")
LOCK_EXTENSION = ".lock"
WAIT_TIME = 10
app = FastAPI()
executor = ThreadPoolExecutor(max_workers=4) # Limit the number of concurrent tasks
tasks_status: Dict[str, str] = {} # Dictionary to track task status
def create_lock_dir() -> None:
if not os.path.exists(LOCK_DIR):
os.makedirs(LOCK_DIR)
def get_lock_file_path(gpu_id: int) -> str:
return os.path.join(LOCK_DIR, f"gpu_{gpu_id}{LOCK_EXTENSION}")
def get_available_gpu() -> Tuple[int, FileLock]:
create_lock_dir()
while True:
available_gpus = GPUtil.getAvailable(
order="first", limit=1, maxLoad=0.05, maxMemory=0.05, includeNan=False
)
for gpu_id in available_gpus:
lock_file = get_lock_file_path(gpu_id)
lock = FileLock(lock_file)
try:
lock.acquire(timeout=0) # Try to acquire the lock without waiting
return gpu_id, lock
except:
continue
print("No available GPUs. Waiting...")
time.sleep(WAIT_TIME)
class TrainTask(BaseModel):
learning_rate: float = 0.01 # Learning rate for the optimizer
epochs: int = 10 # Number of epochs to train
gpu_id: int = -1 # GPU ID to use, -1 for automatic allocation
class DummyContextManager:
def __enter__(self):
pass # No setup needed
def __exit__(self, exc_type, exc_value, traceback):
pass # No cleanup needed
def __bool__(self) -> bool:
return False # Make this object a False means "no lock"
def train_model(task_id: str, task: TrainTask):
tasks_status[task_id] = "running"
try:
args = task
if torch.cuda.is_available():
if args.gpu_id == -1:
gpu_id, lock = get_available_gpu()
else:
gpu_id = args.gpu_id
lock_file = get_lock_file_path(gpu_id)
lock = FileLock(lock_file)
try:
lock.acquire(timeout=0)
except:
print(f"GPU {gpu_id} is currently occupied. Waiting...")
while True:
try:
lock.acquire(timeout=0)
break
except:
time.sleep(WAIT_TIME)
device = torch.device(f"cuda:{gpu_id}")
else:
device = torch.device("cpu")
lock = DummyContextManager()
print(f"Using device {device}")
with lock:
try:
# Example model and training loop
model = torch.nn.Linear(10, 1).to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=args.learning_rate)
criterion = torch.nn.MSELoss()
# Dummy data
data = torch.randn(100, 10).to(device)
target = torch.randn(100, 1).to(device)
# Training loop
for epoch in range(args.epochs):
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
print(f"Epoch {epoch + 1}, Loss: {loss.item()}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
if lock:
print("Released lock for GPU")
else:
print(
"No lock to be released. We don't create lock when we are using CPU."
)
except Exception as e:
print(f"An error occurred: {e}")
finally:
tasks_status[task_id] = "completed"
@app.post("/train")
def submit_training(task: TrainTask):
task_id = str(uuid.uuid4())
tasks_status[task_id] = "pending"
executor.submit(train_model, task_id, task)
return {"message": "Training task has been submitted", "task_id": task_id}
@app.get("/status/{task_id}")
def get_task_status(task_id: str):
if task_id not in tasks_status:
raise HTTPException(status_code=404, detail="Task ID not found")
return {"task_id": task_id, "status": tasks_status[task_id]}
if __name__ == "__main__":
import uvicorn
# Goto http://localhost:8000/docs/
uvicorn.run(app, host="0.0.0.0", port=8000)
from typing import Tuple, Dict, Optional
import GPUtil
import torch
import time
import os
from filelock import FileLock
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from concurrent.futures import ThreadPoolExecutor
import mlflow
# pip install gputil torch filelock typed-argument-parser fastapi uvicorn mlflow
# Constants
LOCK_DIR = os.path.expanduser("~/.gpu_locks")
LOCK_EXTENSION = ".lock"
WAIT_TIME = 10
app = FastAPI()
executor = ThreadPoolExecutor(max_workers=4) # Limit the number of concurrent tasks
tasks_status: Dict[str, str] = {} # Dictionary to track task status
def create_lock_dir() -> None:
if not os.path.exists(LOCK_DIR):
os.makedirs(LOCK_DIR)
def get_lock_file_path(gpu_id: int) -> str:
return os.path.join(LOCK_DIR, f"gpu_{gpu_id}{LOCK_EXTENSION}")
def get_available_gpu() -> Tuple[int, FileLock]:
create_lock_dir()
while True:
available_gpus = GPUtil.getAvailable(
order="first", limit=1, maxLoad=0.05, maxMemory=0.05, includeNan=False
)
for gpu_id in available_gpus:
lock_file = get_lock_file_path(gpu_id)
lock = FileLock(lock_file)
try:
lock.acquire(timeout=0) # Try to acquire the lock without waiting
return gpu_id, lock
except:
continue
print("No available GPUs. Waiting...")
time.sleep(WAIT_TIME)
class TrainTask(BaseModel):
learning_rate: float = 0.01 # Learning rate for the optimizer
epochs: int = 10 # Number of epochs to train
gpu_id: int = -1 # GPU ID to use, -1 for automatic allocation
run_name: Optional[str] = None # Optional run name for MLFlow
class DummyContextManager:
def __enter__(self):
pass # No setup needed
def __exit__(self, exc_type, exc_value, traceback):
pass # No cleanup needed
def __bool__(self) -> bool:
return False # Make this object a False means "no lock"
def train_model(run_id: str, task: TrainTask):
tasks_status[run_id] = "running"
try:
args = task
if torch.cuda.is_available():
if args.gpu_id == -1:
gpu_id, lock = get_available_gpu()
else:
gpu_id = args.gpu_id
lock_file = get_lock_file_path(gpu_id)
lock = FileLock(lock_file)
try:
lock.acquire(timeout=0)
except:
print(f"GPU {gpu_id} is currently occupied. Waiting...")
while True:
try:
lock.acquire(timeout=0)
break
except:
time.sleep(WAIT_TIME)
device = torch.device(f"cuda:{gpu_id}")
else:
device = torch.device("cpu")
lock = DummyContextManager()
print(f"Using device {device}")
with lock:
try:
with mlflow.start_run(
run_id=run_id,
tags={
"Device": str(device),
},
):
# Example model and training loop
model = torch.nn.Linear(10, 1).to(device)
optimizer = torch.optim.SGD(
model.parameters(), lr=args.learning_rate
)
criterion = torch.nn.MSELoss()
# Log parameters
mlflow.log_param("learning_rate", args.learning_rate)
mlflow.log_param("epochs", args.epochs)
# Dummy data
data = torch.randn(100, 10).to(device)
target = torch.randn(100, 1).to(device)
# Training loop
for epoch in range(args.epochs):
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
print(f"Epoch {epoch + 1}, Loss: {loss.item()}")
# Log metrics
mlflow.log_metric("loss", loss.item(), step=epoch)
except Exception as e:
print(f"An error occurred: {e}")
mlflow.log_param("error", str(e))
finally:
if lock:
print("Released lock for GPU")
else:
print(
"No lock to be released. We don't create lock when we are using CPU."
)
except Exception as e:
print(f"An error occurred: {e}")
finally:
tasks_status[run_id] = "completed"
@app.post("/train")
def submit_training(task: TrainTask):
# Alternatives
# client = mlflow.MlflowClient()
# run = client.create_run(experiment_id=mlflow.tracking.fluent._get_experiment_id(), run_name=task.run_name)
with mlflow.start_run(run_name=task.run_name) as run:
run_id = run.info.run_id
tasks_status[run_id] = "pending"
executor.submit(train_model, run_id, task)
return {"message": "Training task has been submitted", "run_id": run_id}
@app.get("/status/{run_id}")
def get_task_status(run_id: str):
if run_id not in tasks_status:
raise HTTPException(status_code=404, detail="Run ID not found")
try:
run = mlflow.get_run(run_id)
return {
"run_id": run_id,
"status": run.info.status,
"start_time": run.info.start_time,
"end_time": run.info.end_time,
"metrics": run.data.metrics,
"params": run.data.params,
"tags": run.data.tags,
}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Could not retrieve run status: {e}"
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
import streamlit as st
import requests
import json
import time
# pip install streamlit
# streamlit run gpu_lock_with_arguments_fastapi_with_status_using_mlflow_streamlit.py
# this works with gpu_lock_with_arguments_fastapi_with_status_using_mlflow.py
# FastAPI server URL
API_URL = "http://localhost:8000"
# A dictionary to keep track of submitted tasks and their statuses
if "submitted_tasks" not in st.session_state:
st.session_state["submitted_tasks"] = {}
# Streamlit UI
st.title("MLFlow Training Task Manager")
# Training parameters input form
st.header("Submit a Training Task")
run_name = st.text_input("Run Name", value="", help="optional")
learning_rate = st.number_input(
"Learning Rate", min_value=0.0001, max_value=1.0, value=0.01, step=0.001
)
epochs = st.number_input(
"Number of Epochs", min_value=1, max_value=100, value=10, step=1
)
gpu_id = st.number_input(
"GPU ID (-1 for automatic allocation)", min_value=-1, max_value=10, value=-1, step=1
)
if st.button("Submit Training Task"):
# Prepare the payload
payload = {
"learning_rate": learning_rate,
"epochs": epochs,
"gpu_id": gpu_id,
"run_name": run_name if run_name else None,
}
# Send POST request to submit the training task
response = requests.post(f"{API_URL}/train", json=payload)
if response.status_code == 200:
response_data = response.json()
run_id = response_data["run_id"]
st.success(f"Training task has been submitted. Run ID: {run_id}")
st.session_state["submitted_tasks"][run_id] = "pending"
else:
st.error("Failed to submit the training task.")
# Display submitted tasks and their statuses
@st.experimental_fragment(run_every="30s")
def display_status():
st.header("Submitted Tasks")
for run_id, status in st.session_state["submitted_tasks"].items():
st.write(f"Run ID: {run_id}, Status: {status}")
# Check the status of the task
status_response = requests.get(f"{API_URL}/status/{run_id}")
if status_response.status_code == 200:
status_data = status_response.json()
st.session_state["submitted_tasks"][run_id] = status_data["status"]
st.write(status_data)
else:
st.error(f"Failed to retrieve the status of task {run_id}")
display_status()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment