import argparse import json import logging import os from contextlib import contextmanager from errno import EINVAL from fcntl import LOCK_EX, flock, ioctl from os import O_CLOEXEC, O_DIRECTORY, O_RDONLY, close, open as os_open from pathlib import Path from shutil import rmtree from signal import SIGKILL from subprocess import CalledProcessError, Popen, TimeoutExpired, call, check_call, check_output from tempfile import TemporaryDirectory from time import monotonic, sleep log = logging.getLogger(__name__) FIFREEZE = 3221510263 FITHAW = 3221510264 @contextmanager def _measure(operation: str): start = monotonic() # log.debug('Measuring "%s" operation.', operation) try: yield finally: log.debug('Operation "%s" completed in %2.2f seconds.', operation, monotonic() - start) @contextmanager def _frozen(path: Path): fd = os_open(path, O_RDONLY | O_DIRECTORY | O_CLOEXEC) def unfreeze() -> None: log.debug('Unfreezing') try: ioctl(fd, FITHAW) except OSError as err: if err.errno != EINVAL: raise try: log.debug('Freezing') # KERNEL BUG:если здесь происходит сигнал (например, SIGINT), FS остается зависшей(!) # вот почему разморозка всегда должна пытаться разморозить даже после неудачной заморозки. ioctl(fd, FIFREEZE) try: yield unfreeze finally: unfreeze() finally: close(fd) _RSYNC_ARGS = [ 'rsync', '-a', # '--checksum-choice=xxh128', # потому что '--only-write-batch' сбрасывается на SLOW MD5! # Алгоритм случайного изменения контрольной суммы не работает. '--inplace', '--hard-links', '--acls', '--xattrs', '--one-file-system', '--delete', '--numeric-ids', '--preallocate', '--trust-sender', ] def _make_reflink_copy(source: Path, destination: Path) -> None: log.debug('Removing snapshot-copy as %s', destination) if destination.exists(): with _measure('unlink destination'): rmtree(destination) destination.mkdir(parents=True) for copy_attempt_count in range(1, 20): # Исходный каталог активно меняется, возможен не нулевой код возврата log.debug('Reflinking') # -u - не делать reflink, если mtime то же самое/ # high_prio - minimize race conditions on high disk load with _measure('reflink copy'): if not call(['cp', '-u', '-a', '--reflink=always', '--no-target-directory', '--one-file-system', source, destination]): break log.info('Reflink failed. Attempt: %d. Retrying', copy_attempt_count) else: log.warning('Reflink copy is not complete. High disk load ?') def _atomic_freeze(source: Path, destination: Path, *, freeze_timeout: int, show_changes: bool) -> None: with TemporaryDirectory() as tmpdir: batch = Path(tmpdir) / 'batch' with _frozen(source) as unfreeze: with _measure('Rsync on frozen FS'): log.debug('Running rsync on frozen FS to create batch') with Popen( # pylint: disable=subprocess-popen-preexec-fn _RSYNC_ARGS + [ *(['--itemize-changes'] if show_changes else []), '--only-write-batch', batch, '--', f'{source}/', f'{destination}/', ], start_new_session=True, ) as proc: try: deadline = monotonic() + freeze_timeout while proc.returncode is None and monotonic() < deadline: try: # proc.wait() may be interrupted by SIGINT. proc.wait(0.1) except TimeoutExpired: if not Path(f'/proc/{proc.pid}/fd/3').exists(): continue # Path().read_text() may raise ENOENT is process die unexpectedly (even successfully) if 'xfs_free_eofblocks' not in Path(f'/proc/{proc.pid}/stack').read_text(): continue # [<0>] percpu_rwsem_wait+0x116/0x140 # [<0>] xfs_trans_alloc+0x20c/0x220 [xfs] # [<0>] xfs_free_eofblocks+0x83/0x120 [xfs] # [<0>] xfs_release+0x143/0x180 [xfs] # [<0>] __fput+0x8e/0x250 # [<0>] task_work_run+0x5a/0x90 # [<0>] exit_to_user_mode_prepare+0x1e6/0x1f0 # [<0>] syscall_exit_to_user_mode+0x1b/0x40 # [<0>] do_syscall_64+0x6b/0x90 # [<0>] entry_SYSCALL_64_after_hwframe+0x72/0xdc log.debug('XFS hang detected') raise RuntimeError('Early DETECTED XFS HANG') from None if proc.returncode is None: log.debug('rsync timed out') batch_size = batch.stat().st_size if batch.is_file() else 0 raise RuntimeError(f'Rsync works too long (more than {freeze_timeout} sec). Batch size is {batch_size}, Aborting.') log.debug('rsync finished with code %d.', proc.returncode) except: # noqa. see code of original check_call log.debug('Killing rsync') # Сначала прибиваем процесс, и только потом расфризиваем. # Если сначала сделать анфриз, то процесс может уже завершиться успехом ДО отправки KILL. # Если к моменту прибития рсинк както магически развис и завершился успехом, # то наше прибитие не сделает ничего ибо процесс уже умер. НО НЕ ЗАВЕЙТИЛСЯ. Поэтому ENOSRCH не будет. os.killpg(proc.pid, SIGKILL) unfreeze() # обязательно ДО .wait() который будет в Popen.__exit__() raise log.debug('rsync finally waited') log.debug('Unfrozen') assert proc.returncode is not None if proc.returncode != 0: log.debug('rsync has failed') raise CalledProcessError(proc.returncode, proc.args) log.debug('Rsync success. Applying batch of size: %2.2f MB', batch.stat().st_size / 1_000_000) with _measure('apply patch'): check_call( _RSYNC_ARGS + [ '--read-batch', batch, '--', f'{destination}/', ], ) log.debug('Patch applied') def _atomic_freeze_wrapper(source: Path, destination: Path, *, freeze_timeout: int, freeze_attempts: int, show_changes: bool) -> None: for attempt in range(1, freeze_attempts + 1): try: _make_reflink_copy(source, destination) _atomic_freeze(source, destination, freeze_timeout=freeze_timeout, show_changes=show_changes) return except Exception as err: log.debug('Freeze copy failure. Attempt: %s. Error: %s', attempt, err) log.debug('Sleeping for %d secs...', freeze_timeout) sleep(freeze_timeout) # give system time to recover after long freeze raise RuntimeError('Failed to create atomic snapshot using FSFREEZE.') _SNAP_LV_NAME = 'atomic_fs_copy' _SNAP_LV_TAG = 'atomic_fs_copy' def _atomic_lvsnap(source: Path, destination: Path, *, show_changes: bool) -> None: match json.loads(check_output(['findmnt', '-o', 'maj:min,fstype,target', '--nofsroot', '--json', '--target', source])): case {'filesystems': [{'maj:min': str() as device_number, 'fstype': str() as fs_type, 'target': str() as root_mount}]}: dev_maj, dev_min = device_number.split(':') case _: raise ValueError('Failed to parse findmnt result') # Actually, should work on any FS. # if fs_type != 'xfs': # raise RuntimeError(f'Filesystem, type is not XFS: {fs_type}. Something went wrong.') match json.loads(check_output([ 'lvs', '--select', f'lv_kernel_major={dev_maj} && lv_kernel_minor={dev_min}', '-o', 'vg_name,lv_name', '--reportformat=json', ])): case {'report': [{'lv': [{'vg_name': str() as vg_name, 'lv_name': str() as src_lv_name}]}]}: pass case _: raise ValueError('Failed to parse lvs result') lvm_snap_mount_dir = Path('/run', _SNAP_LV_NAME) if lvm_snap_mount_dir.is_mount(): log.warning('Snapshot was mounted. unmounting.') check_call(['umount', lvm_snap_mount_dir]) if lvm_snap_mount_dir.exists(): rmtree(lvm_snap_mount_dir) lvm_snap_mount_dir.mkdir() # Will not exit with error if there are no such LVMs. log.debug('Removing temporary LVMs if any.') check_call(['lvremove', '--autobackup', 'n', '-y', '--select', f'lv_tags={_SNAP_LV_TAG}']) check_call([ 'lvcreate', '--snapshot', '--addtag', _SNAP_LV_TAG, '--extents', '100%FREE', '--name', _SNAP_LV_NAME, '--autobackup', 'n', f'{vg_name}/{src_lv_name}', ]) try: snapshot_blockdev = Path(f'/dev/mapper/{vg_name}-{_SNAP_LV_NAME}') log.debug('mounting snapshot %s to %s', snapshot_blockdev, lvm_snap_mount_dir) with _measure('snapshot mounting'): check_call(['mount', '-t', fs_type, '-o', 'ro,nouuid,norecovery', snapshot_blockdev, lvm_snap_mount_dir]) try: src_snap_dir = lvm_snap_mount_dir / source.relative_to(root_mount) if not src_snap_dir.exists(): raise RuntimeError('No same src dir on LVM snap FS. Should never happen.') log.debug('Calling rsync %s -> %s', src_snap_dir, destination) with _measure('rsync from snapshot'): check_call( _RSYNC_ARGS + [ *(['--itemize-changes'] if show_changes else []), '--', f'{src_snap_dir}/', # Закрывающий / в rsync для исходного каталога важен. f'{destination}/', ], ) finally: log.debug('unmounting') check_call(['umount', lvm_snap_mount_dir]) finally: log.debug('lvremove snapshot') check_call(['lvremove', '--autobackup', 'n', '-y', f'{vg_name}/{_SNAP_LV_NAME}']) def _prepare() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Atomic Copy folder") parser.add_argument('--debug', action='store_true', help='Enable debug mode.') parser.add_argument("--method", type=str, choices=['freeze', 'lvmsnap', 'hybrid'], help="Type of the operation") parser.add_argument( "--freeze-timeout", type=int, help="Maximal time under FS freeze in one iteration. Ror 'freeze' or 'hybrid' methods", # ICS-30307 Максимальное время заморозки fs 5 секунд. При изменении учесть _CONNECTION_LOST_DEADLINE. default=5, metavar='SECONDS', ) parser.add_argument( "--freeze-attempts", type=int, help="Max attempts to create snapshot. For 'freeze' or 'hybrid' methods", default=5, metavar='NUMBER', ) parser.add_argument('--show-changes', action='store_true', help='Show changes while rsync is working.') parser.add_argument("source", type=Path, help="Source directory path") parser.add_argument("destination", type=Path, help="Destination directory path") args = parser.parse_args() return args def main() -> None: args = _prepare() logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG if args.debug else logging.INFO) logging.raiseExceptions = False logging.captureWarnings(True) flock(os.open(__file__, O_RDONLY | O_CLOEXEC), LOCK_EX) try: args.source = args.source.resolve() args.destination = args.destination.resolve() if not args.source.is_dir(): raise ValueError(f'Source path {args.source} does not exist or is not a dir.') if args.source.is_relative_to(args.destination): raise ValueError('Impossible combination of dirs') if args.destination.is_relative_to(args.source): raise ValueError('Impossible combination of dirs') # actually does not work between upperdir and overlayfs. Same fsid reported... if os.statvfs(args.source).f_fsid != os.statvfs(args.destination.parent).f_fsid: raise ValueError('Source and destination are on different FS.') # Python does not provide f_type (!) # https://stackoverflow.com/questions/48319246/how-can-i-determine-filesystem-type-name-with-linux-api-for-c # os.statvfs(args.source).f_type # so we can not check that fs is XFS. if args.method != 'lvmsnap': log.info('Using fast FSFREEZE method.') try: _atomic_freeze_wrapper( args.source, args.destination, freeze_timeout=args.freeze_timeout, freeze_attempts=args.freeze_attempts, show_changes=args.show_changes, ) return except Exception as exc: if args.method == 'freeze': raise log.warning('Fast FSFREEZE method failed: %s', exc) log.info('Using slower LVM snap method.') _make_reflink_copy(args.source, args.destination) _atomic_lvsnap(args.source, args.destination, show_changes=args.show_changes) except Exception: if args.destination.exists(): with _measure('destination remove after lvm mount'): args.destination.rmtree() raise if __name__ == '__main__': main()