Created
January 14, 2022 16:06
-
-
Save huww98/91cbff0782ad4f6673dcffccce731c05 to your computer and use it in GitHub Desktop.
Reintegrate stray files in cephfs in conda envs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Reintegrate stray files in cephfs in conda envs | |
Usage: | |
For each MDS rank, run the following shell script to extract strays: | |
RANK=0 | |
mkdir mds${RANK} | |
for i in {0..9}; do | |
ceph tell mds.<name-of-rank0> dump tree "~mdsdir/stray${i}" > mds${RANK}/stray${i}.json; | |
sleep 2; | |
echo $i; | |
done | |
Then run this script in the same working directory. | |
Dependencies: | |
sudo apt install python3-cephfs | |
''' | |
import re | |
import json | |
from pathlib import Path | |
import logging | |
from typing import Dict, List | |
from cephfs import LibCephFS, ObjectNotFound | |
logger = logging.getLogger(__name__) | |
CONDA_PKGS_RE = re.compile('^(/.+/(anaconda|miniconda)3?)/pkgs/[^/]+/(.+)$') | |
CONDA_ENVS_PYTHON_RE = re.compile('^(/.+/(anaconda|miniconda)3?)/envs/[^/]+/lib/python[\d.]+/site-packages/([^/]+)/(.+)$') | |
CONDA_ENVS_RE = re.compile('^(/.+/(anaconda|miniconda)3?)/envs/[^/]+/(.+)$') | |
CONDA_ROOT_RE = re.compile('^(/.+/(anaconda|miniconda)3?)/(.+)$') | |
PREFIX_ENVS: Dict[bytes, List[bytes]] = {} | |
ORIG_PY_PKG_NAMES = [b'caffe2', b'torch'] | |
ALERATE_ENVS_DIR = { | |
# Where to find extra envs for one conda installation | |
# e.g. | |
# b'/home/username/anaconda3': [b'/home/username/another_envs'] | |
} | |
def create_cephfs_client(app=None): | |
conf = { | |
# 'keyring': '/PATH/TO/ceph.client.admin.keyring'), | |
'client_mount_uid': '0', | |
'client_mount_gid': '0', | |
} | |
if app is not None: | |
conf['client_metadata'] = f'app={app}' | |
return LibCephFS(conf=conf, auth_id='admin') | |
def find_envs(cephfs, prefix: bytes): | |
logger.info('Find envs in %s', prefix) | |
def envs_bases(): | |
yield prefix + b'/envs' | |
if prefix in ALERATE_ENVS_DIR: | |
yield from ALERATE_ENVS_DIR[prefix] | |
def env_dirs(): | |
for envs_base in envs_bases(): | |
try: | |
envs_dir = cephfs.opendir(envs_base) | |
except ObjectNotFound: | |
logger.info(' No envs found in %s', envs_base) | |
continue | |
with envs_dir: | |
while True: | |
e = envs_dir.readdir() | |
if e is None: | |
break | |
if e.d_name in (b'.', b'..'): | |
continue | |
if not e.is_dir(): | |
continue | |
yield envs_base + b'/' + e.d_name | |
yield prefix | |
all_envs = [] | |
for env_dir in env_dirs(): | |
stat = cephfs.stat(env_dir) | |
logger.info(' Found env %s', env_dir) | |
all_envs.append((stat.st_ctime, env_dir)) | |
return [e[1] for e in sorted(all_envs, reverse=True)] | |
def guess_path(cephfs, stray_prior_path: str): | |
logger.info('reintegration %s', stray_prior_path) | |
m = CONDA_PKGS_RE.match(stray_prior_path) | |
if m is not None: | |
suffix = m.group(3).encode() | |
else: | |
m = CONDA_ENVS_PYTHON_RE.match(stray_prior_path) | |
if m is not None: | |
py_pkg_name = m.group(3).encode() | |
if py_pkg_name.startswith(b'~'): | |
for orig_name in ORIG_PY_PKG_NAMES: | |
fake_name = b'~' + orig_name[1:] | |
if py_pkg_name.startswith(fake_name): | |
py_pkg_name = orig_name[:1] + py_pkg_name[1:] | |
break | |
if py_pkg_name.startswith(b'~'): | |
raise NotImplementedError(f'fake name {py_pkg_name} not known') | |
suffix = b'site-packages/' + py_pkg_name + b'/' + m.group(4).encode() | |
else: | |
m = CONDA_ENVS_RE.match(stray_prior_path) | |
if m is None: | |
m = CONDA_ROOT_RE.match(stray_prior_path) | |
if m is not None: | |
suffix = m.group(3).encode() | |
else: | |
logger.info(' not recognized.') | |
return | |
prefix = m.group(1).encode() | |
if prefix not in PREFIX_ENVS: | |
PREFIX_ENVS[prefix] = find_envs(cephfs, prefix) | |
for e in PREFIX_ENVS[prefix]: | |
if suffix.startswith(b'site-packages/'): | |
for py in [b'3.6', b'3.7', b'3.8', b'3.9', b'3.10']: | |
py_prefix = e + b'/lib/python' + py | |
try: | |
cephfs.stat(py_prefix) | |
e = py_prefix | |
break | |
except ObjectNotFound: | |
pass | |
guessed = e + b'/' + suffix | |
if guessed[-3:] == b'.c~': | |
yield guessed[:-3] | |
yield guessed | |
def main(): | |
with create_cephfs_client('reintegrate') as cephfs: | |
for stray_dump_p in Path('').glob('mds*/stray*.json'): | |
with stray_dump_p.open('r') as f: | |
stray_dump = json.load(f) | |
for i in stray_dump: | |
if i['nlink'] == 0 or not i['stray_prior_path']: | |
continue | |
for guessed_path in guess_path(cephfs, i['stray_prior_path']): | |
try: | |
stat = cephfs.stat(guessed_path) | |
if stat.st_ino != i['ino']: | |
logger.debug(' wrong inode %s', guessed_path) | |
continue | |
except ObjectNotFound: | |
logger.debug(' not found %s', guessed_path) | |
continue | |
logger.info(' reintegrated into %s', guessed_path) | |
break | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment