Skip to content

Instantly share code, notes, and snippets.

@marcan
Last active September 2, 2025 14:08
Show Gist options
  • Save marcan/26cc3ac7241f866dca38916215dd10ff to your computer and use it in GitHub Desktop.
Save marcan/26cc3ac7241f866dca38916215dd10ff to your computer and use it in GitHub Desktop.
Transcoder (layout migrator) for CephFS
# CephFS pool/layout migration tool ("transcoder")
#
# Loosely inspired by:
# https://git.sr.ht/~pjjw/cephfs-layout-tool/tree/master/item/cephfs_layout_tool/migrate_pools.py
# https://gist.github.com/ervwalter/5ff6632c930c27a1eb6b07c986d7439b
#
# MIT license (https://opensource.org/license/mit)
import os, stat, time, signal, shutil, logging, sys
from concurrent.futures import ThreadPoolExecutor
import threading, uuid, argparse
replace_lock = threading.Lock()
do_exit = threading.Event()
thread_count = None
class CephLayout:
def __init__(self, layout):
vals = {}
for s in layout.split():
k, v = s.split("=", 1)
vals[k] = v
self.stripe_unit = int(vals["stripe_unit"])
self.stripe_count = int(vals["stripe_count"])
self.object_size = int(vals["object_size"])
self.pool = vals["pool"]
self.layout = layout
@classmethod
def from_dir(self, path):
try:
return CephLayout(os.getxattr(path, "ceph.dir.layout", follow_symlinks=False).decode("utf-8"))
except OSError as e:
assert e.errno == 61
return None
@classmethod
def from_file(self, path):
try:
return CephLayout(os.getxattr(path, "ceph.file.layout", follow_symlinks=False).decode("utf-8"))
except OSError as e:
assert e.errno == 61
return None
def apply_file(self, path):
os.setxattr(path, "ceph.file.layout", self.layout.encode("utf-8"), follow_symlinks=False)
def __str__(self):
return self.layout
def __eq__(self, other):
return self.layout == other.layout
def diff(self, other):
diff = []
for i in ("stripe_unit", "stripe_count", "object_size", "pool"):
a = getattr(self, i)
b = getattr(other, i)
if a != b:
diff.append(f"{i}=[{a} -> {b}]")
return " ".join(diff)
def process_file(args, filepaths, st, layout, file_layout):
if do_exit.is_set():
return
tmp_file = os.path.join(args.tmpdir, uuid.uuid4().hex)
if len(filepaths) == 1:
logging.info(f"Transcoding {filepaths[0]} [{st.st_size} bytes]: {file_layout.diff(layout)}")
else:
logging.info(f"Transcoding {filepaths[0]} [{st.st_size} bytes] (+ {len(filepaths) - 1} hardlink(s)): {file_layout.diff(layout)} [{tmp_file}]")
with open(tmp_file, "wb") as ofd:
layout.apply_file(tmp_file)
with open(filepaths[0], "rb") as ifd:
shutil.copyfileobj(ifd, ofd, layout.object_size)
shutil.copystat(filepaths[0], tmp_file)
os.chown(tmp_file, st.st_uid, st.st_gid)
if args.dry_run or do_exit.is_set():
os.unlink(tmp_file)
return
with replace_lock:
try:
signal.pthread_sigmask(signal.SIG_BLOCK,[signal.SIGINT])
st2 = os.stat(filepaths[0], follow_symlinks=False)
if st2.st_mtime != st.st_mtime:
logging.error(f"Failed to replace {filepaths[0]} (+ {len(filepaths) - 1} hardlink(s)): Source file changed")
os.unlink(tmp_file)
return
for i, path in enumerate(filepaths):
parent_path = os.path.split(path)[0]
parent_st = os.stat(parent_path, follow_symlinks=False)
if i == 0:
logging.info(f"Renaming {tmp_file} -> {path}")
os.rename(tmp_file, path)
else:
logging.info(f"Linking {filepaths[0]} -> {path}")
os.link(filepaths[0], tmp_file, follow_symlinks=False)
os.rename(tmp_file, path)
os.utime(parent_path, ns=(parent_st.st_atime_ns, parent_st.st_mtime_ns), follow_symlinks=False)
finally:
signal.pthread_sigmask(signal.SIG_UNBLOCK,[signal.SIGINT])
def handler(future):
future.result()
thread_count.release()
def process_dir(args, start_dir, hard_links, executor, mountpoints, dir_layouts):
for dirpath, dirnames, filenames in os.walk(start_dir, True):
if do_exit.is_set():
return
if dirpath in mountpoints:
logging.warning(f"Skipping {dirpath}: path is a mountpoint")
del dirnames[:]
continue
if dirpath == args.tmpdir:
logging.info(f"Skipping {dirpath}: path is the temporary dir")
del dirnames[:]
continue
layout = dir_layouts.get(dirpath, None)
if layout is None:
layout = CephLayout.from_dir(dirpath)
if layout is None:
layout = dir_layouts[os.path.split(dirpath)[0]]
dirnames.sort()
filenames.sort()
logging.debug(f"Scanning {dirpath} ({layout}): {len(dirnames)} dirs and {len(filenames)} files")
dir_layouts[dirpath] = layout
def submit(filepaths, st, file_layout):
if do_exit.is_set():
return
thread_count.acquire()
future = executor.submit(process_file, args, filepaths, st, layout, file_layout)
future.add_done_callback(handler)
for filename in filenames:
if do_exit.is_set():
return
filepath = os.path.join(dirpath, filename)
st = os.stat(filepath, follow_symlinks=False)
if not stat.S_ISREG(st.st_mode):
logging.debug(f"Skipping {filepath}: not a regular file")
continue
if st.st_mtime > (time.time() - 86400 * args.min_age):
logging.info(f"Skipping {filepath}: modified too recently")
continue
file_layout = CephLayout.from_file(filepath)
assert file_layout is not None
if file_layout == layout:
continue
if st.st_nlink == 1:
submit([filepath], st, file_layout)
else:
file_id = (st.st_dev, st.st_ino)
if file_id not in hard_links:
hard_links[file_id] = ([filepath], [layout])
else:
hard_links[file_id][0].append(filepath)
hard_links[file_id][1].append(layout)
if len(hard_links[file_id][0]) == st.st_nlink:
filepaths = hard_links[file_id][0]
layouts = hard_links[file_id][1]
del hard_links[file_id]
if not all(i == layouts[0] for i in layouts[1:]):
logging.error("Hardlinked file has inconsistent directory layouts:")
for fp, ly in zip(filepaths, layouts):
logging.error(f" [{ly}]: {fp}")
else:
submit(filepaths, st, file_layout)
else:
logging.info(f"Deferring {filepath} due to hardlinks ({st.st_nlink - len(hard_links[file_id][0])} link(s) left)")
def process_files(args):
args.tmpdir = os.path.abspath(args.tmpdir)
if not os.path.exists(args.tmpdir):
os.makedirs(args.tmpdir)
hard_links = {}
dir_layouts = {}
lock = threading.Lock()
mountpoints = set()
for line in open("/proc/self/mounts", "r"):
mountpoints.add(line.split()[1])
with ThreadPoolExecutor(max_workers=args.threads) as executor:
for start_dir in args.dirs:
start_dir = os.path.abspath(start_dir)
if start_dir in mountpoints:
mountpoints.remove(start_dir)
layout = CephLayout.from_dir(start_dir)
parent = start_dir
while layout is None and parent != "/":
parent = os.path.split(parent)[0]
layout = CephLayout.from_dir(parent)
assert layout
dir_layouts[start_dir] = layout
logging.info(f"Starting at {start_dir} ({layout})")
process_dir(args, start_dir, hard_links, executor, mountpoints, dir_layouts)
if do_exit.is_set():
break
if hard_links and not do_exit.is_set():
logging.warning(f"Some hard links could not be located. Refusing to transcode these inodes:")
for file_id, v in hard_links.items():
dev, inode = file_id
st = os.stat(v[0][0], follow_symlinks=False)
logging.warning(f" Inode {dev}:{inode} ({len(v[0])}/{st.st_nlink} links):")
for path in v[0]:
logging.warning(f" - {path}")
def main():
global thread_count
parser = argparse.ArgumentParser(
description="Transcode cephfs files to their directory layout"
)
parser.add_argument("dirs", help="Directories to scan", nargs="+")
parser.add_argument("--tmpdir", default="/data/tmp", help="Temporary directory to copy files to. Important: This directory should have its layout set to the *default* data pool for the FS, to avoid excess backtrace objects.")
parser.add_argument("--debug", "-d", action="store_true")
parser.add_argument("--min-age", "-m", default=1, type=int, help="Minimum age of file before transcoding, in days")
parser.add_argument("--threads", "-t", default=4, type=int, help="Number of threads for data copying")
parser.add_argument("--dry-run", "-n", action="store_true", help="Perform transcode but do not replace files")
args = parser.parse_args()
thread_count = threading.BoundedSemaphore(args.threads)
layout = CephLayout.from_dir(args.tmpdir)
parent = args.tmpdir
while layout is None and parent != "/":
parent = os.path.split(parent)[0]
layout = CephLayout.from_dir(parent)
if args.debug:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
else:
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.warning(f"Temporary directory is {args.tmpdir} with pool {layout.pool}. "
f"This should be the *default* data pool for the FS (NOT the target pool for your files). "
f"If it is not, abort this script with ^C and configure it with `setfattr -n ceph.dir.layout.pool -v default_data_pool_name {args.tmpdir}`.")
def signal_handler(sig, frame):
logging.error("SIGINT received, exiting cleanly...")
do_exit.set()
signal.signal(signal.SIGINT, signal_handler)
process_files(args)
if __name__ == "__main__":
main()
@inDane
Copy link

inDane commented Aug 25, 2025

@marcan
Thanks for this script! I've got one question though:
Does this delete the old objects/files from the primary pool?
i.e. transcoding from Pool A (primary) to Pool B (as set in the xattr). Does Pool A get eventually smaller?

I am asking this, because I do not see my primary Pool A shrink, but Pool B is expanding.

@marcan
Copy link
Author

marcan commented Aug 26, 2025

CephFS always has a head object in the primary pool for every file. If your files are smaller than the object size, then in general, the object count in the primary pool will remain unchanged. However, since the head objects are zero-sized, the data usage of the primary pool will shrink, as those objects only have metadata. If the files are on average larger than the object size, then you will see a reduction to one object per file.

If you are not seeing the data usage of the primary pool shrink, perhaps you have snapshots?

@inDane
Copy link

inDane commented Aug 26, 2025

thanks for your answer, turns out, i was just too impatient. Pool A is slowly getting reduced! 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment