-
-
Save vi/b6e7b8503c19abe69305249c060883ec to your computer and use it in GitHub Desktop.
A script to list cgroup v2 hierarchy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| cgls - list cgroup v2 hierarchy with task counts and resource tags. | |
| Usage: cgls [ROOT] | |
| ROOT defaults to /sys/fs/cgroup. Output is a tree of cgroups, each annotated | |
| with the number of direct tasks and tags indicating active resource constraints | |
| or states. | |
| Tags: | |
| FROZEN - cgroup.freeze is 1 | |
| CPU - cpu.max is not "max 100000" (i.e. CPU is limited) | |
| MEM - memory.max is set (not "max") | |
| MEMHI - memory.high is set (not "max") | |
| IO - io.max has entries | |
| PIDS - pids.max is set (not "max") | |
| CPUSET - cpuset.cpus is restricted (non-empty and not equal to root's) | |
| """ | |
| import os | |
| import sys | |
| CGROUP_ROOT = "/sys/fs/cgroup" | |
| def read_file(path): | |
| try: | |
| with open(path) as f: | |
| return f.read().strip() | |
| except (OSError, PermissionError): | |
| return None | |
| def task_count(cg_path): | |
| content = read_file(os.path.join(cg_path, "cgroup.procs")) | |
| if content is None: | |
| return 0 | |
| return len(content.splitlines()) if content else 0 | |
| def get_tags(cg_path): | |
| tags = [] | |
| # FROZEN | |
| v = read_file(os.path.join(cg_path, "cgroup.freeze")) | |
| if v == "1": | |
| tags.append("FROZEN") | |
| # CPU - cpu.max not "max 100000" means limited | |
| v = read_file(os.path.join(cg_path, "cpu.max")) | |
| if v is not None and not v.startswith("max "): | |
| tags.append("CPU") | |
| # MEM - memory.max set | |
| v = read_file(os.path.join(cg_path, "memory.max")) | |
| if v is not None and v != "max": | |
| tags.append("MEM") | |
| # MEMHI - memory.high set | |
| v = read_file(os.path.join(cg_path, "memory.high")) | |
| if v is not None and v != "max": | |
| tags.append("MEMHI") | |
| # IO - io.max has content | |
| v = read_file(os.path.join(cg_path, "io.max")) | |
| if v: | |
| tags.append("IO") | |
| # PIDS - pids.max set | |
| v = read_file(os.path.join(cg_path, "pids.max")) | |
| if v is not None and v != "max": | |
| tags.append("PIDS") | |
| # CPUSET - cpuset.cpus restricted | |
| v = read_file(os.path.join(cg_path, "cpuset.cpus")) | |
| if v is not None and v != "" and v != read_file(os.path.join(CGROUP_ROOT, "cpuset.cpus")): | |
| tags.append("CPUSET") | |
| return tags | |
| def walk_cgroups(root, prefix=""): | |
| entries = [] | |
| try: | |
| for entry in sorted(os.listdir(root)): | |
| path = os.path.join(root, entry) | |
| if os.path.isdir(path) and not entry.startswith("."): | |
| # Verify it's a cgroup (has cgroup.procs or cgroup.controllers) | |
| if os.path.exists(os.path.join(path, "cgroup.procs")) or \ | |
| os.path.exists(os.path.join(path, "cgroup.controllers")): | |
| entries.append((entry, path)) | |
| except PermissionError: | |
| return | |
| for i, (name, path) in enumerate(entries): | |
| is_last = (i == len(entries) - 1) | |
| connector = "└── " if is_last else "├── " | |
| child_prefix = prefix + (" " if is_last else "│ ") | |
| tasks = task_count(path) | |
| tags = get_tags(path) | |
| annotation = "" | |
| parts = [] | |
| if tasks > 0: | |
| parts.append(f"{tasks} task{'s' if tasks != 1 else ''}") | |
| if tags: | |
| parts.append(" ".join(tags)) | |
| if parts: | |
| annotation = f" [{', '.join(parts)}]" | |
| print(f"{prefix}{connector}{name}{annotation}") | |
| walk_cgroups(path, child_prefix) | |
| def main(): | |
| root = sys.argv[1] if len(sys.argv) > 1 else CGROUP_ROOT | |
| if not os.path.isdir(root): | |
| print(f"error: {root} is not a directory", file=sys.stderr) | |
| sys.exit(1) | |
| # Print root info | |
| tasks = task_count(root) | |
| tags = get_tags(root) | |
| parts = [] | |
| if tasks > 0: | |
| parts.append(f"{tasks} task{'s' if tasks != 1 else ''}") | |
| if tags: | |
| parts.append(" ".join(tags)) | |
| annotation = f" [{', '.join(parts)}]" if parts else "" | |
| rel = os.path.relpath(root, CGROUP_ROOT) | |
| label = "/" if rel == "." else f"/{rel}" | |
| print(f"{label}{annotation}") | |
| walk_cgroups(root) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment