Skip to content

Instantly share code, notes, and snippets.

@huww98
Created January 14, 2022 16:06
Show Gist options
  • Save huww98/91cbff0782ad4f6673dcffccce731c05 to your computer and use it in GitHub Desktop.
Save huww98/91cbff0782ad4f6673dcffccce731c05 to your computer and use it in GitHub Desktop.
Reintegrate stray files in cephfs in conda envs
'''
Reintegrate stray files in cephfs in conda envs
Usage:
For each MDS rank, run the following shell script to extract strays:
RANK=0
mkdir mds${RANK}
for i in {0..9}; do
ceph tell mds.<name-of-rank0> dump tree "~mdsdir/stray${i}" > mds${RANK}/stray${i}.json;
sleep 2;
echo $i;
done
Then run this script in the same working directory.
Dependencies:
sudo apt install python3-cephfs
'''
import re
import json
from pathlib import Path
import logging
from typing import Dict, List
from cephfs import LibCephFS, ObjectNotFound
logger = logging.getLogger(__name__)
CONDA_PKGS_RE = re.compile('^(/.+/(anaconda|miniconda)3?)/pkgs/[^/]+/(.+)$')
CONDA_ENVS_PYTHON_RE = re.compile('^(/.+/(anaconda|miniconda)3?)/envs/[^/]+/lib/python[\d.]+/site-packages/([^/]+)/(.+)$')
CONDA_ENVS_RE = re.compile('^(/.+/(anaconda|miniconda)3?)/envs/[^/]+/(.+)$')
CONDA_ROOT_RE = re.compile('^(/.+/(anaconda|miniconda)3?)/(.+)$')
PREFIX_ENVS: Dict[bytes, List[bytes]] = {}
ORIG_PY_PKG_NAMES = [b'caffe2', b'torch']
ALERATE_ENVS_DIR = {
# Where to find extra envs for one conda installation
# e.g.
# b'/home/username/anaconda3': [b'/home/username/another_envs']
}
def create_cephfs_client(app=None):
conf = {
# 'keyring': '/PATH/TO/ceph.client.admin.keyring'),
'client_mount_uid': '0',
'client_mount_gid': '0',
}
if app is not None:
conf['client_metadata'] = f'app={app}'
return LibCephFS(conf=conf, auth_id='admin')
def find_envs(cephfs, prefix: bytes):
logger.info('Find envs in %s', prefix)
def envs_bases():
yield prefix + b'/envs'
if prefix in ALERATE_ENVS_DIR:
yield from ALERATE_ENVS_DIR[prefix]
def env_dirs():
for envs_base in envs_bases():
try:
envs_dir = cephfs.opendir(envs_base)
except ObjectNotFound:
logger.info(' No envs found in %s', envs_base)
continue
with envs_dir:
while True:
e = envs_dir.readdir()
if e is None:
break
if e.d_name in (b'.', b'..'):
continue
if not e.is_dir():
continue
yield envs_base + b'/' + e.d_name
yield prefix
all_envs = []
for env_dir in env_dirs():
stat = cephfs.stat(env_dir)
logger.info(' Found env %s', env_dir)
all_envs.append((stat.st_ctime, env_dir))
return [e[1] for e in sorted(all_envs, reverse=True)]
def guess_path(cephfs, stray_prior_path: str):
logger.info('reintegration %s', stray_prior_path)
m = CONDA_PKGS_RE.match(stray_prior_path)
if m is not None:
suffix = m.group(3).encode()
else:
m = CONDA_ENVS_PYTHON_RE.match(stray_prior_path)
if m is not None:
py_pkg_name = m.group(3).encode()
if py_pkg_name.startswith(b'~'):
for orig_name in ORIG_PY_PKG_NAMES:
fake_name = b'~' + orig_name[1:]
if py_pkg_name.startswith(fake_name):
py_pkg_name = orig_name[:1] + py_pkg_name[1:]
break
if py_pkg_name.startswith(b'~'):
raise NotImplementedError(f'fake name {py_pkg_name} not known')
suffix = b'site-packages/' + py_pkg_name + b'/' + m.group(4).encode()
else:
m = CONDA_ENVS_RE.match(stray_prior_path)
if m is None:
m = CONDA_ROOT_RE.match(stray_prior_path)
if m is not None:
suffix = m.group(3).encode()
else:
logger.info(' not recognized.')
return
prefix = m.group(1).encode()
if prefix not in PREFIX_ENVS:
PREFIX_ENVS[prefix] = find_envs(cephfs, prefix)
for e in PREFIX_ENVS[prefix]:
if suffix.startswith(b'site-packages/'):
for py in [b'3.6', b'3.7', b'3.8', b'3.9', b'3.10']:
py_prefix = e + b'/lib/python' + py
try:
cephfs.stat(py_prefix)
e = py_prefix
break
except ObjectNotFound:
pass
guessed = e + b'/' + suffix
if guessed[-3:] == b'.c~':
yield guessed[:-3]
yield guessed
def main():
with create_cephfs_client('reintegrate') as cephfs:
for stray_dump_p in Path('').glob('mds*/stray*.json'):
with stray_dump_p.open('r') as f:
stray_dump = json.load(f)
for i in stray_dump:
if i['nlink'] == 0 or not i['stray_prior_path']:
continue
for guessed_path in guess_path(cephfs, i['stray_prior_path']):
try:
stat = cephfs.stat(guessed_path)
if stat.st_ino != i['ino']:
logger.debug(' wrong inode %s', guessed_path)
continue
except ObjectNotFound:
logger.debug(' not found %s', guessed_path)
continue
logger.info(' reintegrated into %s', guessed_path)
break
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment