Skip to content

Instantly share code, notes, and snippets.

@socketpair
Created November 6, 2023 19:14

Revisions

  1. socketpair created this gist Nov 6, 2023.
    340 changes: 340 additions & 0 deletions xfs_snap.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,340 @@
    import argparse
    import json
    import logging
    import os
    from contextlib import contextmanager
    from errno import EINVAL
    from fcntl import LOCK_EX, flock, ioctl
    from os import O_CLOEXEC, O_DIRECTORY, O_RDONLY, close, open as os_open
    from pathlib import Path
    from shutil import rmtree
    from signal import SIGKILL
    from subprocess import CalledProcessError, Popen, TimeoutExpired, call, check_call, check_output
    from tempfile import TemporaryDirectory
    from time import monotonic, sleep

    log = logging.getLogger(__name__)

    FIFREEZE = 3221510263
    FITHAW = 3221510264


    @contextmanager
    def _measure(operation: str):
    start = monotonic()
    # log.debug('Measuring "%s" operation.', operation)
    try:
    yield
    finally:
    log.debug('Operation "%s" completed in %2.2f seconds.', operation, monotonic() - start)


    @contextmanager
    def _frozen(path: Path):
    fd = os_open(path, O_RDONLY | O_DIRECTORY | O_CLOEXEC)

    def unfreeze() -> None:
    log.debug('Unfreezing')
    try:
    ioctl(fd, FITHAW)
    except OSError as err:
    if err.errno != EINVAL:
    raise

    try:
    log.debug('Freezing')
    # KERNEL BUG:если здесь происходит сигнал (например, SIGINT), FS остается зависшей(!)
    # вот почему разморозка всегда должна пытаться разморозить даже после неудачной заморозки.
    ioctl(fd, FIFREEZE)
    try:
    yield unfreeze
    finally:
    unfreeze()
    finally:
    close(fd)


    _RSYNC_ARGS = [
    'rsync',
    '-a',
    # '--checksum-choice=xxh128', # потому что '--only-write-batch' сбрасывается на SLOW MD5!
    # Алгоритм случайного изменения контрольной суммы не работает.
    '--inplace',
    '--hard-links',
    '--acls',
    '--xattrs',
    '--one-file-system',
    '--delete',
    '--numeric-ids',
    '--preallocate',
    '--trust-sender',
    ]


    def _make_reflink_copy(source: Path, destination: Path) -> None:
    log.debug('Removing snapshot-copy as %s', destination)
    if destination.exists():
    with _measure('unlink destination'):
    rmtree(destination)
    destination.mkdir(parents=True)
    for copy_attempt_count in range(1, 20):
    # Исходный каталог активно меняется, возможен не нулевой код возврата
    log.debug('Reflinking')
    # -u - не делать reflink, если mtime то же самое/
    # high_prio - minimize race conditions on high disk load
    with _measure('reflink copy'):
    if not call(['cp', '-u', '-a', '--reflink=always', '--no-target-directory', '--one-file-system', source, destination]):
    break
    log.info('Reflink failed. Attempt: %d. Retrying', copy_attempt_count)
    else:
    log.warning('Reflink copy is not complete. High disk load ?')


    def _atomic_freeze(source: Path, destination: Path, *, freeze_timeout: int, show_changes: bool) -> None:
    with TemporaryDirectory() as tmpdir:
    batch = Path(tmpdir) / 'batch'
    with _frozen(source) as unfreeze:
    with _measure('Rsync on frozen FS'):
    log.debug('Running rsync on frozen FS to create batch')
    with Popen( # pylint: disable=subprocess-popen-preexec-fn
    _RSYNC_ARGS
    + [
    *(['--itemize-changes'] if show_changes else []),
    '--only-write-batch', batch,
    '--',
    f'{source}/',
    f'{destination}/',
    ],
    start_new_session=True,
    ) as proc:
    try:
    deadline = monotonic() + freeze_timeout
    while proc.returncode is None and monotonic() < deadline:
    try:
    # proc.wait() may be interrupted by SIGINT.
    proc.wait(0.1)
    except TimeoutExpired:
    if not Path(f'/proc/{proc.pid}/fd/3').exists():
    continue
    # Path().read_text() may raise ENOENT is process die unexpectedly (even successfully)
    if 'xfs_free_eofblocks' not in Path(f'/proc/{proc.pid}/stack').read_text():
    continue
    # [<0>] percpu_rwsem_wait+0x116/0x140
    # [<0>] xfs_trans_alloc+0x20c/0x220 [xfs]
    # [<0>] xfs_free_eofblocks+0x83/0x120 [xfs]
    # [<0>] xfs_release+0x143/0x180 [xfs]
    # [<0>] __fput+0x8e/0x250
    # [<0>] task_work_run+0x5a/0x90
    # [<0>] exit_to_user_mode_prepare+0x1e6/0x1f0
    # [<0>] syscall_exit_to_user_mode+0x1b/0x40
    # [<0>] do_syscall_64+0x6b/0x90
    # [<0>] entry_SYSCALL_64_after_hwframe+0x72/0xdc
    log.debug('XFS hang detected')
    raise RuntimeError('Early DETECTED XFS HANG') from None
    if proc.returncode is None:
    log.debug('rsync timed out')
    batch_size = batch.stat().st_size if batch.is_file() else 0
    raise RuntimeError(f'Rsync works too long (more than {freeze_timeout} sec). Batch size is {batch_size}, Aborting.')
    log.debug('rsync finished with code %d.', proc.returncode)
    except: # noqa. see code of original check_call
    log.debug('Killing rsync')
    # Сначала прибиваем процесс, и только потом расфризиваем.
    # Если сначала сделать анфриз, то процесс может уже завершиться успехом ДО отправки KILL.
    # Если к моменту прибития рсинк както магически развис и завершился успехом,
    # то наше прибитие не сделает ничего ибо процесс уже умер. НО НЕ ЗАВЕЙТИЛСЯ. Поэтому ENOSRCH не будет.
    os.killpg(proc.pid, SIGKILL)
    unfreeze() # обязательно ДО .wait() который будет в Popen.__exit__()
    raise
    log.debug('rsync finally waited')
    log.debug('Unfrozen')
    assert proc.returncode is not None
    if proc.returncode != 0:
    log.debug('rsync has failed')
    raise CalledProcessError(proc.returncode, proc.args)

    log.debug('Rsync success. Applying batch of size: %2.2f MB', batch.stat().st_size / 1_000_000)
    with _measure('apply patch'):
    check_call(
    _RSYNC_ARGS
    + [
    '--read-batch', batch,
    '--',
    f'{destination}/',
    ],
    )
    log.debug('Patch applied')


    def _atomic_freeze_wrapper(source: Path, destination: Path, *, freeze_timeout: int, freeze_attempts: int, show_changes: bool) -> None:
    for attempt in range(1, freeze_attempts + 1):
    try:
    _make_reflink_copy(source, destination)
    _atomic_freeze(source, destination, freeze_timeout=freeze_timeout, show_changes=show_changes)
    return
    except Exception as err:
    log.debug('Freeze copy failure. Attempt: %s. Error: %s', attempt, err)
    log.debug('Sleeping for %d secs...', freeze_timeout)
    sleep(freeze_timeout) # give system time to recover after long freeze
    raise RuntimeError('Failed to create atomic snapshot using FSFREEZE.')


    _SNAP_LV_NAME = 'atomic_fs_copy'
    _SNAP_LV_TAG = 'atomic_fs_copy'


    def _atomic_lvsnap(source: Path, destination: Path, *, show_changes: bool) -> None:
    match json.loads(check_output(['findmnt', '-o', 'maj:min,fstype,target', '--nofsroot', '--json', '--target', source])):
    case {'filesystems': [{'maj:min': str() as device_number, 'fstype': str() as fs_type, 'target': str() as root_mount}]}:
    dev_maj, dev_min = device_number.split(':')
    case _:
    raise ValueError('Failed to parse findmnt result')

    # Actually, should work on any FS.
    # if fs_type != 'xfs':
    # raise RuntimeError(f'Filesystem, type is not XFS: {fs_type}. Something went wrong.')

    match json.loads(check_output([
    'lvs',
    '--select', f'lv_kernel_major={dev_maj} && lv_kernel_minor={dev_min}',
    '-o', 'vg_name,lv_name',
    '--reportformat=json',
    ])):
    case {'report': [{'lv': [{'vg_name': str() as vg_name, 'lv_name': str() as src_lv_name}]}]}:
    pass
    case _:
    raise ValueError('Failed to parse lvs result')

    lvm_snap_mount_dir = Path('/run', _SNAP_LV_NAME)
    if lvm_snap_mount_dir.is_mount():
    log.warning('Snapshot was mounted. unmounting.')
    check_call(['umount', lvm_snap_mount_dir])
    if lvm_snap_mount_dir.exists():
    rmtree(lvm_snap_mount_dir)
    lvm_snap_mount_dir.mkdir()

    # Will not exit with error if there are no such LVMs.
    log.debug('Removing temporary LVMs if any.')
    check_call(['lvremove', '--autobackup', 'n', '-y', '--select', f'lv_tags={_SNAP_LV_TAG}'])

    check_call([
    'lvcreate',
    '--snapshot',
    '--addtag', _SNAP_LV_TAG,
    '--extents', '100%FREE',
    '--name', _SNAP_LV_NAME,
    '--autobackup', 'n',
    f'{vg_name}/{src_lv_name}',
    ])
    try:
    snapshot_blockdev = Path(f'/dev/mapper/{vg_name}-{_SNAP_LV_NAME}')
    log.debug('mounting snapshot %s to %s', snapshot_blockdev, lvm_snap_mount_dir)
    with _measure('snapshot mounting'):
    check_call(['mount', '-t', fs_type, '-o', 'ro,nouuid,norecovery', snapshot_blockdev, lvm_snap_mount_dir])

    try:
    src_snap_dir = lvm_snap_mount_dir / source.relative_to(root_mount)
    if not src_snap_dir.exists():
    raise RuntimeError('No same src dir on LVM snap FS. Should never happen.')
    log.debug('Calling rsync %s -> %s', src_snap_dir, destination)
    with _measure('rsync from snapshot'):
    check_call(
    _RSYNC_ARGS
    + [
    *(['--itemize-changes'] if show_changes else []),
    '--',
    f'{src_snap_dir}/', # Закрывающий / в rsync для исходного каталога важен.
    f'{destination}/',
    ],
    )
    finally:
    log.debug('unmounting')
    check_call(['umount', lvm_snap_mount_dir])
    finally:
    log.debug('lvremove snapshot')
    check_call(['lvremove', '--autobackup', 'n', '-y', f'{vg_name}/{_SNAP_LV_NAME}'])


    def _prepare() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Atomic Copy folder")
    parser.add_argument('--debug', action='store_true', help='Enable debug mode.')
    parser.add_argument("--method", type=str, choices=['freeze', 'lvmsnap', 'hybrid'], help="Type of the operation")
    parser.add_argument(
    "--freeze-timeout",
    type=int,
    help="Maximal time under FS freeze in one iteration. Ror 'freeze' or 'hybrid' methods",
    # ICS-30307 Максимальное время заморозки fs 5 секунд. При изменении учесть _CONNECTION_LOST_DEADLINE.
    default=5,
    metavar='SECONDS',
    )
    parser.add_argument(
    "--freeze-attempts",
    type=int,
    help="Max attempts to create snapshot. For 'freeze' or 'hybrid' methods",
    default=5,
    metavar='NUMBER',
    )
    parser.add_argument('--show-changes', action='store_true', help='Show changes while rsync is working.')
    parser.add_argument("source", type=Path, help="Source directory path")
    parser.add_argument("destination", type=Path, help="Destination directory path")

    args = parser.parse_args()
    return args


    def main() -> None:
    args = _prepare()

    logging.basicConfig()
    logging.getLogger().setLevel(logging.DEBUG if args.debug else logging.INFO)
    logging.raiseExceptions = False
    logging.captureWarnings(True)
    flock(os.open(__file__, O_RDONLY | O_CLOEXEC), LOCK_EX)

    try:
    args.source = args.source.resolve()
    args.destination = args.destination.resolve()

    if not args.source.is_dir():
    raise ValueError(f'Source path {args.source} does not exist or is not a dir.')

    if args.source.is_relative_to(args.destination):
    raise ValueError('Impossible combination of dirs')
    if args.destination.is_relative_to(args.source):
    raise ValueError('Impossible combination of dirs')

    # actually does not work between upperdir and overlayfs. Same fsid reported...
    if os.statvfs(args.source).f_fsid != os.statvfs(args.destination.parent).f_fsid:
    raise ValueError('Source and destination are on different FS.')

    # Python does not provide f_type (!)
    # https://stackoverflow.com/questions/48319246/how-can-i-determine-filesystem-type-name-with-linux-api-for-c
    # os.statvfs(args.source).f_type
    # so we can not check that fs is XFS.

    if args.method != 'lvmsnap':
    log.info('Using fast FSFREEZE method.')
    try:
    _atomic_freeze_wrapper(
    args.source,
    args.destination,
    freeze_timeout=args.freeze_timeout,
    freeze_attempts=args.freeze_attempts,
    show_changes=args.show_changes,
    )
    return
    except Exception as exc:
    if args.method == 'freeze':
    raise
    log.warning('Fast FSFREEZE method failed: %s', exc)
    log.info('Using slower LVM snap method.')
    _make_reflink_copy(args.source, args.destination)
    _atomic_lvsnap(args.source, args.destination, show_changes=args.show_changes)
    except Exception:
    if args.destination.exists():
    with _measure('destination remove after lvm mount'):
    args.destination.rmtree()
    raise


    if __name__ == '__main__':
    main()