Created
November 6, 2023 19:14
Revisions
-
socketpair created this gist
Nov 6, 2023 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,340 @@ import argparse import json import logging import os from contextlib import contextmanager from errno import EINVAL from fcntl import LOCK_EX, flock, ioctl from os import O_CLOEXEC, O_DIRECTORY, O_RDONLY, close, open as os_open from pathlib import Path from shutil import rmtree from signal import SIGKILL from subprocess import CalledProcessError, Popen, TimeoutExpired, call, check_call, check_output from tempfile import TemporaryDirectory from time import monotonic, sleep log = logging.getLogger(__name__) FIFREEZE = 3221510263 FITHAW = 3221510264 @contextmanager def _measure(operation: str): start = monotonic() # log.debug('Measuring "%s" operation.', operation) try: yield finally: log.debug('Operation "%s" completed in %2.2f seconds.', operation, monotonic() - start) @contextmanager def _frozen(path: Path): fd = os_open(path, O_RDONLY | O_DIRECTORY | O_CLOEXEC) def unfreeze() -> None: log.debug('Unfreezing') try: ioctl(fd, FITHAW) except OSError as err: if err.errno != EINVAL: raise try: log.debug('Freezing') # KERNEL BUG:если здесь происходит сигнал (например, SIGINT), FS остается зависшей(!) # вот почему разморозка всегда должна пытаться разморозить даже после неудачной заморозки. ioctl(fd, FIFREEZE) try: yield unfreeze finally: unfreeze() finally: close(fd) _RSYNC_ARGS = [ 'rsync', '-a', # '--checksum-choice=xxh128', # потому что '--only-write-batch' сбрасывается на SLOW MD5! # Алгоритм случайного изменения контрольной суммы не работает. '--inplace', '--hard-links', '--acls', '--xattrs', '--one-file-system', '--delete', '--numeric-ids', '--preallocate', '--trust-sender', ] def _make_reflink_copy(source: Path, destination: Path) -> None: log.debug('Removing snapshot-copy as %s', destination) if destination.exists(): with _measure('unlink destination'): rmtree(destination) destination.mkdir(parents=True) for copy_attempt_count in range(1, 20): # Исходный каталог активно меняется, возможен не нулевой код возврата log.debug('Reflinking') # -u - не делать reflink, если mtime то же самое/ # high_prio - minimize race conditions on high disk load with _measure('reflink copy'): if not call(['cp', '-u', '-a', '--reflink=always', '--no-target-directory', '--one-file-system', source, destination]): break log.info('Reflink failed. Attempt: %d. Retrying', copy_attempt_count) else: log.warning('Reflink copy is not complete. High disk load ?') def _atomic_freeze(source: Path, destination: Path, *, freeze_timeout: int, show_changes: bool) -> None: with TemporaryDirectory() as tmpdir: batch = Path(tmpdir) / 'batch' with _frozen(source) as unfreeze: with _measure('Rsync on frozen FS'): log.debug('Running rsync on frozen FS to create batch') with Popen( # pylint: disable=subprocess-popen-preexec-fn _RSYNC_ARGS + [ *(['--itemize-changes'] if show_changes else []), '--only-write-batch', batch, '--', f'{source}/', f'{destination}/', ], start_new_session=True, ) as proc: try: deadline = monotonic() + freeze_timeout while proc.returncode is None and monotonic() < deadline: try: # proc.wait() may be interrupted by SIGINT. proc.wait(0.1) except TimeoutExpired: if not Path(f'/proc/{proc.pid}/fd/3').exists(): continue # Path().read_text() may raise ENOENT is process die unexpectedly (even successfully) if 'xfs_free_eofblocks' not in Path(f'/proc/{proc.pid}/stack').read_text(): continue # [<0>] percpu_rwsem_wait+0x116/0x140 # [<0>] xfs_trans_alloc+0x20c/0x220 [xfs] # [<0>] xfs_free_eofblocks+0x83/0x120 [xfs] # [<0>] xfs_release+0x143/0x180 [xfs] # [<0>] __fput+0x8e/0x250 # [<0>] task_work_run+0x5a/0x90 # [<0>] exit_to_user_mode_prepare+0x1e6/0x1f0 # [<0>] syscall_exit_to_user_mode+0x1b/0x40 # [<0>] do_syscall_64+0x6b/0x90 # [<0>] entry_SYSCALL_64_after_hwframe+0x72/0xdc log.debug('XFS hang detected') raise RuntimeError('Early DETECTED XFS HANG') from None if proc.returncode is None: log.debug('rsync timed out') batch_size = batch.stat().st_size if batch.is_file() else 0 raise RuntimeError(f'Rsync works too long (more than {freeze_timeout} sec). Batch size is {batch_size}, Aborting.') log.debug('rsync finished with code %d.', proc.returncode) except: # noqa. see code of original check_call log.debug('Killing rsync') # Сначала прибиваем процесс, и только потом расфризиваем. # Если сначала сделать анфриз, то процесс может уже завершиться успехом ДО отправки KILL. # Если к моменту прибития рсинк както магически развис и завершился успехом, # то наше прибитие не сделает ничего ибо процесс уже умер. НО НЕ ЗАВЕЙТИЛСЯ. Поэтому ENOSRCH не будет. os.killpg(proc.pid, SIGKILL) unfreeze() # обязательно ДО .wait() который будет в Popen.__exit__() raise log.debug('rsync finally waited') log.debug('Unfrozen') assert proc.returncode is not None if proc.returncode != 0: log.debug('rsync has failed') raise CalledProcessError(proc.returncode, proc.args) log.debug('Rsync success. Applying batch of size: %2.2f MB', batch.stat().st_size / 1_000_000) with _measure('apply patch'): check_call( _RSYNC_ARGS + [ '--read-batch', batch, '--', f'{destination}/', ], ) log.debug('Patch applied') def _atomic_freeze_wrapper(source: Path, destination: Path, *, freeze_timeout: int, freeze_attempts: int, show_changes: bool) -> None: for attempt in range(1, freeze_attempts + 1): try: _make_reflink_copy(source, destination) _atomic_freeze(source, destination, freeze_timeout=freeze_timeout, show_changes=show_changes) return except Exception as err: log.debug('Freeze copy failure. Attempt: %s. Error: %s', attempt, err) log.debug('Sleeping for %d secs...', freeze_timeout) sleep(freeze_timeout) # give system time to recover after long freeze raise RuntimeError('Failed to create atomic snapshot using FSFREEZE.') _SNAP_LV_NAME = 'atomic_fs_copy' _SNAP_LV_TAG = 'atomic_fs_copy' def _atomic_lvsnap(source: Path, destination: Path, *, show_changes: bool) -> None: match json.loads(check_output(['findmnt', '-o', 'maj:min,fstype,target', '--nofsroot', '--json', '--target', source])): case {'filesystems': [{'maj:min': str() as device_number, 'fstype': str() as fs_type, 'target': str() as root_mount}]}: dev_maj, dev_min = device_number.split(':') case _: raise ValueError('Failed to parse findmnt result') # Actually, should work on any FS. # if fs_type != 'xfs': # raise RuntimeError(f'Filesystem, type is not XFS: {fs_type}. Something went wrong.') match json.loads(check_output([ 'lvs', '--select', f'lv_kernel_major={dev_maj} && lv_kernel_minor={dev_min}', '-o', 'vg_name,lv_name', '--reportformat=json', ])): case {'report': [{'lv': [{'vg_name': str() as vg_name, 'lv_name': str() as src_lv_name}]}]}: pass case _: raise ValueError('Failed to parse lvs result') lvm_snap_mount_dir = Path('/run', _SNAP_LV_NAME) if lvm_snap_mount_dir.is_mount(): log.warning('Snapshot was mounted. unmounting.') check_call(['umount', lvm_snap_mount_dir]) if lvm_snap_mount_dir.exists(): rmtree(lvm_snap_mount_dir) lvm_snap_mount_dir.mkdir() # Will not exit with error if there are no such LVMs. log.debug('Removing temporary LVMs if any.') check_call(['lvremove', '--autobackup', 'n', '-y', '--select', f'lv_tags={_SNAP_LV_TAG}']) check_call([ 'lvcreate', '--snapshot', '--addtag', _SNAP_LV_TAG, '--extents', '100%FREE', '--name', _SNAP_LV_NAME, '--autobackup', 'n', f'{vg_name}/{src_lv_name}', ]) try: snapshot_blockdev = Path(f'/dev/mapper/{vg_name}-{_SNAP_LV_NAME}') log.debug('mounting snapshot %s to %s', snapshot_blockdev, lvm_snap_mount_dir) with _measure('snapshot mounting'): check_call(['mount', '-t', fs_type, '-o', 'ro,nouuid,norecovery', snapshot_blockdev, lvm_snap_mount_dir]) try: src_snap_dir = lvm_snap_mount_dir / source.relative_to(root_mount) if not src_snap_dir.exists(): raise RuntimeError('No same src dir on LVM snap FS. Should never happen.') log.debug('Calling rsync %s -> %s', src_snap_dir, destination) with _measure('rsync from snapshot'): check_call( _RSYNC_ARGS + [ *(['--itemize-changes'] if show_changes else []), '--', f'{src_snap_dir}/', # Закрывающий / в rsync для исходного каталога важен. f'{destination}/', ], ) finally: log.debug('unmounting') check_call(['umount', lvm_snap_mount_dir]) finally: log.debug('lvremove snapshot') check_call(['lvremove', '--autobackup', 'n', '-y', f'{vg_name}/{_SNAP_LV_NAME}']) def _prepare() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Atomic Copy folder") parser.add_argument('--debug', action='store_true', help='Enable debug mode.') parser.add_argument("--method", type=str, choices=['freeze', 'lvmsnap', 'hybrid'], help="Type of the operation") parser.add_argument( "--freeze-timeout", type=int, help="Maximal time under FS freeze in one iteration. Ror 'freeze' or 'hybrid' methods", # ICS-30307 Максимальное время заморозки fs 5 секунд. При изменении учесть _CONNECTION_LOST_DEADLINE. default=5, metavar='SECONDS', ) parser.add_argument( "--freeze-attempts", type=int, help="Max attempts to create snapshot. For 'freeze' or 'hybrid' methods", default=5, metavar='NUMBER', ) parser.add_argument('--show-changes', action='store_true', help='Show changes while rsync is working.') parser.add_argument("source", type=Path, help="Source directory path") parser.add_argument("destination", type=Path, help="Destination directory path") args = parser.parse_args() return args def main() -> None: args = _prepare() logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG if args.debug else logging.INFO) logging.raiseExceptions = False logging.captureWarnings(True) flock(os.open(__file__, O_RDONLY | O_CLOEXEC), LOCK_EX) try: args.source = args.source.resolve() args.destination = args.destination.resolve() if not args.source.is_dir(): raise ValueError(f'Source path {args.source} does not exist or is not a dir.') if args.source.is_relative_to(args.destination): raise ValueError('Impossible combination of dirs') if args.destination.is_relative_to(args.source): raise ValueError('Impossible combination of dirs') # actually does not work between upperdir and overlayfs. Same fsid reported... if os.statvfs(args.source).f_fsid != os.statvfs(args.destination.parent).f_fsid: raise ValueError('Source and destination are on different FS.') # Python does not provide f_type (!) # https://stackoverflow.com/questions/48319246/how-can-i-determine-filesystem-type-name-with-linux-api-for-c # os.statvfs(args.source).f_type # so we can not check that fs is XFS. if args.method != 'lvmsnap': log.info('Using fast FSFREEZE method.') try: _atomic_freeze_wrapper( args.source, args.destination, freeze_timeout=args.freeze_timeout, freeze_attempts=args.freeze_attempts, show_changes=args.show_changes, ) return except Exception as exc: if args.method == 'freeze': raise log.warning('Fast FSFREEZE method failed: %s', exc) log.info('Using slower LVM snap method.') _make_reflink_copy(args.source, args.destination) _atomic_lvsnap(args.source, args.destination, show_changes=args.show_changes) except Exception: if args.destination.exists(): with _measure('destination remove after lvm mount'): args.destination.rmtree() raise if __name__ == '__main__': main()