Last active
July 23, 2024 10:09
-
-
Save daviddwlee84/2bd81fd12d4505297164b91a37b11e78 to your computer and use it in GitHub Desktop.
A PyTorch training script that can find an available GPU to train (otherwise will wait) with lock protection and custom arguments for parameters. https://chatgpt.com/share/4c82e79d-c80b-4d06-b802-fa7a8cf34ee8 https://chatgpt.com/share/1d02b8c2-78c5-44cc-8db5-b1cad0c4e191
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Tuple | |
import GPUtil | |
import torch | |
import time | |
import os | |
from filelock import FileLock | |
from tap import Tap | |
# pip install gputil torch filelock typed-argument-parser | |
# LOCK_DIR = "/tmp/gpu_locks" # Might have some permission issue | |
LOCK_DIR = os.path.expanduser('~/.gpu_locks') | |
LOCK_EXTENSION = ".lock" | |
WAIT_TIME = 10 | |
def create_lock_dir() -> None: | |
if not os.path.exists(LOCK_DIR): | |
os.makedirs(LOCK_DIR) | |
def get_lock_file_path(gpu_id: int) -> str: | |
return os.path.join(LOCK_DIR, f"gpu_{gpu_id}{LOCK_EXTENSION}") | |
def get_available_gpu() -> Tuple[int, FileLock]: | |
create_lock_dir() | |
while True: | |
available_gpus = GPUtil.getAvailable( | |
order="first", limit=1, maxLoad=0.05, maxMemory=0.05, includeNan=False | |
) | |
for gpu_id in available_gpus: | |
lock_file = get_lock_file_path(gpu_id) | |
lock = FileLock(lock_file) | |
try: | |
lock.acquire(timeout=0) # Try to acquire the lock without waiting | |
return gpu_id, lock | |
except: | |
continue | |
print("No available GPUs. Waiting...") | |
time.sleep(WAIT_TIME) | |
class Args(Tap): | |
learning_rate: float = 0.01 # Learning rate for the optimizer | |
epochs: int = 10 # Number of epochs to train | |
gpu_id: int = -1 # GPU ID to use, -1 for automatic allocation | |
class DummyContextManager: | |
def __enter__(self): | |
pass # No setup needed | |
def __exit__(self, exc_type, exc_value, traceback): | |
pass # No cleanup needed | |
def __bool__(self) -> bool: | |
return False # Make this object a False means "no lock" | |
def main(): | |
args = Args().parse_args() | |
if torch.cuda.is_available(): | |
if args.gpu_id == -1: | |
gpu_id, lock = get_available_gpu() | |
else: | |
gpu_id = args.gpu_id | |
lock_file = get_lock_file_path(gpu_id) | |
lock = FileLock(lock_file) | |
try: | |
lock.acquire(timeout=0) | |
except: | |
print(f"GPU {gpu_id} is currently occupied. Waiting...") | |
while True: | |
try: | |
lock.acquire(timeout=0) | |
break | |
except: | |
time.sleep(WAIT_TIME) | |
device = torch.device(f"cuda:{gpu_id}") | |
else: | |
device = torch.device("cpu") | |
lock = DummyContextManager() | |
print(f"Using device {device}") | |
# NOTE: since we "with lock" we don't need to `lock.release()` manually | |
with lock: | |
try: | |
# Example model and training loop | |
model = torch.nn.Linear(10, 1).to(device) | |
optimizer = torch.optim.SGD(model.parameters(), lr=args.learning_rate) | |
criterion = torch.nn.MSELoss() | |
# Dummy data | |
data = torch.randn(100, 10).to(device) | |
target = torch.randn(100, 1).to(device) | |
# Training loop | |
for epoch in range(args.epochs): | |
optimizer.zero_grad() | |
output = model(data) | |
loss = criterion(output, target) | |
loss.backward() | |
optimizer.step() | |
print(f"Epoch {epoch + 1}, Loss: {loss.item()}") | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
finally: | |
if lock: | |
print("Released lock for GPU") | |
else: | |
print("No lock to be released. We don't create lock when we are using CPU.") | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Tuple | |
import GPUtil | |
import torch | |
import time | |
import os | |
from filelock import FileLock | |
from fastapi import FastAPI, BackgroundTasks | |
from pydantic import BaseModel | |
# pip install gputil torch filelock typed-argument-parser fastapi uvicorn | |
# Constants | |
LOCK_DIR = os.path.expanduser('~/.gpu_locks') | |
LOCK_EXTENSION = ".lock" | |
WAIT_TIME = 10 | |
app = FastAPI() | |
def create_lock_dir() -> None: | |
if not os.path.exists(LOCK_DIR): | |
os.makedirs(LOCK_DIR) | |
def get_lock_file_path(gpu_id: int) -> str: | |
return os.path.join(LOCK_DIR, f"gpu_{gpu_id}{LOCK_EXTENSION}") | |
def get_available_gpu() -> Tuple[int, FileLock]: | |
create_lock_dir() | |
while True: | |
available_gpus = GPUtil.getAvailable( | |
order="first", limit=1, maxLoad=0.05, maxMemory=0.05, includeNan=False | |
) | |
for gpu_id in available_gpus: | |
lock_file = get_lock_file_path(gpu_id) | |
lock = FileLock(lock_file) | |
try: | |
lock.acquire(timeout=0) # Try to acquire the lock without waiting | |
return gpu_id, lock | |
except: | |
continue | |
print("No available GPUs. Waiting...") | |
time.sleep(WAIT_TIME) | |
class TrainTask(BaseModel): | |
learning_rate: float = 0.01 # Learning rate for the optimizer | |
epochs: int = 10 # Number of epochs to train | |
gpu_id: int = -1 # GPU ID to use, -1 for automatic allocation | |
class DummyContextManager: | |
def __enter__(self): | |
pass # No setup needed | |
def __exit__(self, exc_type, exc_value, traceback): | |
pass # No cleanup needed | |
def __bool__(self) -> bool: | |
return False # Make this object a False means "no lock" | |
def train_model(task: TrainTask): | |
args = task | |
if torch.cuda.is_available(): | |
if args.gpu_id == -1: | |
gpu_id, lock = get_available_gpu() | |
else: | |
gpu_id = args.gpu_id | |
lock_file = get_lock_file_path(gpu_id) | |
lock = FileLock(lock_file) | |
try: | |
lock.acquire(timeout=0) | |
except: | |
print(f"GPU {gpu_id} is currently occupied. Waiting...") | |
while True: | |
try: | |
lock.acquire(timeout=0) | |
break | |
except: | |
time.sleep(WAIT_TIME) | |
device = torch.device(f"cuda:{gpu_id}") | |
else: | |
device = torch.device("cpu") | |
lock = DummyContextManager() | |
print(f"Using device {device}") | |
with lock: | |
try: | |
# Example model and training loop | |
model = torch.nn.Linear(10, 1).to(device) | |
optimizer = torch.optim.SGD(model.parameters(), lr=args.learning_rate) | |
criterion = torch.nn.MSELoss() | |
# Dummy data | |
data = torch.randn(100, 10).to(device) | |
target = torch.randn(100, 1).to(device) | |
# Training loop | |
for epoch in range(args.epochs): | |
optimizer.zero_grad() | |
output = model(data) | |
loss = criterion(output, target) | |
loss.backward() | |
optimizer.step() | |
print(f"Epoch {epoch + 1}, Loss: {loss.item()}") | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
finally: | |
if lock: | |
print("Released lock for GPU") | |
else: | |
print("No lock to be released. We don't create lock when we are using CPU.") | |
@app.post("/train") | |
def submit_training(task: TrainTask, background_tasks: BackgroundTasks): | |
background_tasks.add_task(train_model, task) | |
return {"message": "Training task has been submitted", "task": task} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Tuple, Dict | |
import GPUtil | |
import torch | |
import time | |
import os | |
import uuid | |
from filelock import FileLock | |
from fastapi import FastAPI, BackgroundTasks, HTTPException | |
from pydantic import BaseModel | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
# pip install gputil torch filelock typed-argument-parser fastapi uvicorn | |
# Constants | |
LOCK_DIR = os.path.expanduser("~/.gpu_locks") | |
LOCK_EXTENSION = ".lock" | |
WAIT_TIME = 10 | |
app = FastAPI() | |
executor = ThreadPoolExecutor(max_workers=4) # Limit the number of concurrent tasks | |
tasks_status: Dict[str, str] = {} # Dictionary to track task status | |
def create_lock_dir() -> None: | |
if not os.path.exists(LOCK_DIR): | |
os.makedirs(LOCK_DIR) | |
def get_lock_file_path(gpu_id: int) -> str: | |
return os.path.join(LOCK_DIR, f"gpu_{gpu_id}{LOCK_EXTENSION}") | |
def get_available_gpu() -> Tuple[int, FileLock]: | |
create_lock_dir() | |
while True: | |
available_gpus = GPUtil.getAvailable( | |
order="first", limit=1, maxLoad=0.05, maxMemory=0.05, includeNan=False | |
) | |
for gpu_id in available_gpus: | |
lock_file = get_lock_file_path(gpu_id) | |
lock = FileLock(lock_file) | |
try: | |
lock.acquire(timeout=0) # Try to acquire the lock without waiting | |
return gpu_id, lock | |
except: | |
continue | |
print("No available GPUs. Waiting...") | |
time.sleep(WAIT_TIME) | |
class TrainTask(BaseModel): | |
learning_rate: float = 0.01 # Learning rate for the optimizer | |
epochs: int = 10 # Number of epochs to train | |
gpu_id: int = -1 # GPU ID to use, -1 for automatic allocation | |
class DummyContextManager: | |
def __enter__(self): | |
pass # No setup needed | |
def __exit__(self, exc_type, exc_value, traceback): | |
pass # No cleanup needed | |
def __bool__(self) -> bool: | |
return False # Make this object a False means "no lock" | |
def train_model(task_id: str, task: TrainTask): | |
tasks_status[task_id] = "running" | |
try: | |
args = task | |
if torch.cuda.is_available(): | |
if args.gpu_id == -1: | |
gpu_id, lock = get_available_gpu() | |
else: | |
gpu_id = args.gpu_id | |
lock_file = get_lock_file_path(gpu_id) | |
lock = FileLock(lock_file) | |
try: | |
lock.acquire(timeout=0) | |
except: | |
print(f"GPU {gpu_id} is currently occupied. Waiting...") | |
while True: | |
try: | |
lock.acquire(timeout=0) | |
break | |
except: | |
time.sleep(WAIT_TIME) | |
device = torch.device(f"cuda:{gpu_id}") | |
else: | |
device = torch.device("cpu") | |
lock = DummyContextManager() | |
print(f"Using device {device}") | |
with lock: | |
try: | |
# Example model and training loop | |
model = torch.nn.Linear(10, 1).to(device) | |
optimizer = torch.optim.SGD(model.parameters(), lr=args.learning_rate) | |
criterion = torch.nn.MSELoss() | |
# Dummy data | |
data = torch.randn(100, 10).to(device) | |
target = torch.randn(100, 1).to(device) | |
# Training loop | |
for epoch in range(args.epochs): | |
optimizer.zero_grad() | |
output = model(data) | |
loss = criterion(output, target) | |
loss.backward() | |
optimizer.step() | |
print(f"Epoch {epoch + 1}, Loss: {loss.item()}") | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
finally: | |
if lock: | |
print("Released lock for GPU") | |
else: | |
print( | |
"No lock to be released. We don't create lock when we are using CPU." | |
) | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
finally: | |
tasks_status[task_id] = "completed" | |
@app.post("/train") | |
def submit_training(task: TrainTask): | |
task_id = str(uuid.uuid4()) | |
tasks_status[task_id] = "pending" | |
executor.submit(train_model, task_id, task) | |
return {"message": "Training task has been submitted", "task_id": task_id} | |
@app.get("/status/{task_id}") | |
def get_task_status(task_id: str): | |
if task_id not in tasks_status: | |
raise HTTPException(status_code=404, detail="Task ID not found") | |
return {"task_id": task_id, "status": tasks_status[task_id]} | |
if __name__ == "__main__": | |
import uvicorn | |
# Goto http://localhost:8000/docs/ | |
uvicorn.run(app, host="0.0.0.0", port=8000) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Tuple, Dict, Optional | |
import GPUtil | |
import torch | |
import time | |
import os | |
from filelock import FileLock | |
from fastapi import FastAPI, HTTPException | |
from pydantic import BaseModel | |
from concurrent.futures import ThreadPoolExecutor | |
import mlflow | |
# pip install gputil torch filelock typed-argument-parser fastapi uvicorn mlflow | |
# Constants | |
LOCK_DIR = os.path.expanduser("~/.gpu_locks") | |
LOCK_EXTENSION = ".lock" | |
WAIT_TIME = 10 | |
app = FastAPI() | |
executor = ThreadPoolExecutor(max_workers=4) # Limit the number of concurrent tasks | |
tasks_status: Dict[str, str] = {} # Dictionary to track task status | |
def create_lock_dir() -> None: | |
if not os.path.exists(LOCK_DIR): | |
os.makedirs(LOCK_DIR) | |
def get_lock_file_path(gpu_id: int) -> str: | |
return os.path.join(LOCK_DIR, f"gpu_{gpu_id}{LOCK_EXTENSION}") | |
def get_available_gpu() -> Tuple[int, FileLock]: | |
create_lock_dir() | |
while True: | |
available_gpus = GPUtil.getAvailable( | |
order="first", limit=1, maxLoad=0.05, maxMemory=0.05, includeNan=False | |
) | |
for gpu_id in available_gpus: | |
lock_file = get_lock_file_path(gpu_id) | |
lock = FileLock(lock_file) | |
try: | |
lock.acquire(timeout=0) # Try to acquire the lock without waiting | |
return gpu_id, lock | |
except: | |
continue | |
print("No available GPUs. Waiting...") | |
time.sleep(WAIT_TIME) | |
class TrainTask(BaseModel): | |
learning_rate: float = 0.01 # Learning rate for the optimizer | |
epochs: int = 10 # Number of epochs to train | |
gpu_id: int = -1 # GPU ID to use, -1 for automatic allocation | |
run_name: Optional[str] = None # Optional run name for MLFlow | |
class DummyContextManager: | |
def __enter__(self): | |
pass # No setup needed | |
def __exit__(self, exc_type, exc_value, traceback): | |
pass # No cleanup needed | |
def __bool__(self) -> bool: | |
return False # Make this object a False means "no lock" | |
def train_model(run_id: str, task: TrainTask): | |
tasks_status[run_id] = "running" | |
try: | |
args = task | |
if torch.cuda.is_available(): | |
if args.gpu_id == -1: | |
gpu_id, lock = get_available_gpu() | |
else: | |
gpu_id = args.gpu_id | |
lock_file = get_lock_file_path(gpu_id) | |
lock = FileLock(lock_file) | |
try: | |
lock.acquire(timeout=0) | |
except: | |
print(f"GPU {gpu_id} is currently occupied. Waiting...") | |
while True: | |
try: | |
lock.acquire(timeout=0) | |
break | |
except: | |
time.sleep(WAIT_TIME) | |
device = torch.device(f"cuda:{gpu_id}") | |
else: | |
device = torch.device("cpu") | |
lock = DummyContextManager() | |
print(f"Using device {device}") | |
with lock: | |
try: | |
with mlflow.start_run( | |
run_id=run_id, | |
tags={ | |
"Device": str(device), | |
}, | |
): | |
# Example model and training loop | |
model = torch.nn.Linear(10, 1).to(device) | |
optimizer = torch.optim.SGD( | |
model.parameters(), lr=args.learning_rate | |
) | |
criterion = torch.nn.MSELoss() | |
# Log parameters | |
mlflow.log_param("learning_rate", args.learning_rate) | |
mlflow.log_param("epochs", args.epochs) | |
# Dummy data | |
data = torch.randn(100, 10).to(device) | |
target = torch.randn(100, 1).to(device) | |
# Training loop | |
for epoch in range(args.epochs): | |
optimizer.zero_grad() | |
output = model(data) | |
loss = criterion(output, target) | |
loss.backward() | |
optimizer.step() | |
print(f"Epoch {epoch + 1}, Loss: {loss.item()}") | |
# Log metrics | |
mlflow.log_metric("loss", loss.item(), step=epoch) | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
mlflow.log_param("error", str(e)) | |
finally: | |
if lock: | |
print("Released lock for GPU") | |
else: | |
print( | |
"No lock to be released. We don't create lock when we are using CPU." | |
) | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
finally: | |
tasks_status[run_id] = "completed" | |
@app.post("/train") | |
def submit_training(task: TrainTask): | |
# Alternatives | |
# client = mlflow.MlflowClient() | |
# run = client.create_run(experiment_id=mlflow.tracking.fluent._get_experiment_id(), run_name=task.run_name) | |
with mlflow.start_run(run_name=task.run_name) as run: | |
run_id = run.info.run_id | |
tasks_status[run_id] = "pending" | |
executor.submit(train_model, run_id, task) | |
return {"message": "Training task has been submitted", "run_id": run_id} | |
@app.get("/status/{run_id}") | |
def get_task_status(run_id: str): | |
if run_id not in tasks_status: | |
raise HTTPException(status_code=404, detail="Run ID not found") | |
try: | |
run = mlflow.get_run(run_id) | |
return { | |
"run_id": run_id, | |
"status": run.info.status, | |
"start_time": run.info.start_time, | |
"end_time": run.info.end_time, | |
"metrics": run.data.metrics, | |
"params": run.data.params, | |
"tags": run.data.tags, | |
} | |
except Exception as e: | |
raise HTTPException( | |
status_code=500, detail=f"Could not retrieve run status: {e}" | |
) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import streamlit as st | |
import requests | |
import json | |
import time | |
# pip install streamlit | |
# streamlit run gpu_lock_with_arguments_fastapi_with_status_using_mlflow_streamlit.py | |
# this works with gpu_lock_with_arguments_fastapi_with_status_using_mlflow.py | |
# FastAPI server URL | |
API_URL = "http://localhost:8000" | |
# A dictionary to keep track of submitted tasks and their statuses | |
if "submitted_tasks" not in st.session_state: | |
st.session_state["submitted_tasks"] = {} | |
# Streamlit UI | |
st.title("MLFlow Training Task Manager") | |
# Training parameters input form | |
st.header("Submit a Training Task") | |
run_name = st.text_input("Run Name", value="", help="optional") | |
learning_rate = st.number_input( | |
"Learning Rate", min_value=0.0001, max_value=1.0, value=0.01, step=0.001 | |
) | |
epochs = st.number_input( | |
"Number of Epochs", min_value=1, max_value=100, value=10, step=1 | |
) | |
gpu_id = st.number_input( | |
"GPU ID (-1 for automatic allocation)", min_value=-1, max_value=10, value=-1, step=1 | |
) | |
if st.button("Submit Training Task"): | |
# Prepare the payload | |
payload = { | |
"learning_rate": learning_rate, | |
"epochs": epochs, | |
"gpu_id": gpu_id, | |
"run_name": run_name if run_name else None, | |
} | |
# Send POST request to submit the training task | |
response = requests.post(f"{API_URL}/train", json=payload) | |
if response.status_code == 200: | |
response_data = response.json() | |
run_id = response_data["run_id"] | |
st.success(f"Training task has been submitted. Run ID: {run_id}") | |
st.session_state["submitted_tasks"][run_id] = "pending" | |
else: | |
st.error("Failed to submit the training task.") | |
# Display submitted tasks and their statuses | |
@st.experimental_fragment(run_every="30s") | |
def display_status(): | |
st.header("Submitted Tasks") | |
for run_id, status in st.session_state["submitted_tasks"].items(): | |
st.write(f"Run ID: {run_id}, Status: {status}") | |
# Check the status of the task | |
status_response = requests.get(f"{API_URL}/status/{run_id}") | |
if status_response.status_code == 200: | |
status_data = status_response.json() | |
st.session_state["submitted_tasks"][run_id] = status_data["status"] | |
st.write(status_data) | |
else: | |
st.error(f"Failed to retrieve the status of task {run_id}") | |
display_status() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment