from __future__ import annotations import logging import os import pathlib import shutil from datetime import datetime as dt from collections.abc import Iterable LOG_FORMAT = "%(name)s: %(message)s" level = 20 logging.basicConfig( level=level, format=LOG_FORMAT, datefmt="[%H:%M:%S]", #handlers=[handler] if handler else [], # for our UT benchmark, this is a pesky IT effort ) logger = logging.getLogger('os-listdir') logger.setLevel(level) def clean_output_dir_original(path: str, retention: Iterable[str]) -> None: """Remove all files from output directory except those in retention list""" if not os.path.exists(path): logger.info("Directory already removed: %s", path) return if not os.path.isdir(path): try: os.remove(path) except Exception as e: logger.error("Unable to delete file %s; %s", path, e) return # remove existing content from output folder unless in retention list for filename in os.listdir(path): file = os.path.join(path, filename) if any(filename == retain for retain in retention): logger.info( "Skipping deletion; %s is on retention list: %s", filename, file ) elif os.path.isdir(file): try: shutil.rmtree(file) logger.info("Deleted directory %s", file) except Exception as e: logger.error("Unable to delete directory %s; %s", file, e) elif os.path.isfile(file) or os.path.islink(file): try: os.remove(file) logger.debug("Deleted file/link %s", file) except Exception as e: logger.error("Unable to delete file %s; %s", file, e) else: logger.error("Unable to delete %s, file type unknown", file) def clean_output_dir_new(path: str, retention: Iterable[str]) -> None: """Remove all files from output directory except those in retention list""" if not os.path.exists(path): logger.debug("Directory already removed: %s", path) return if not os.path.isdir(path): try: os.remove(path) except Exception as e: logger.error("Unable to delete file %s; %s", path, e) return # remove existing content from output folder unless in retention list for dir_entry in os.scandir(path): file = dir_entry.path if any(dir_entry.name == retain for retain in retention): logger.debug( "Skipping deletion; %s is on retention list: %s", dir_entry.name, file ) elif dir_entry.is_dir(): try: shutil.rmtree(file) logger.debug("Deleted directory %s", file) except Exception as e: logger.error("Unable to delete directory %s; %s", file, e) elif dir_entry.is_file(follow_symlinks=False) or dir_entry.is_symlink(): try: os.remove(file) logger.debug("Deleted file/link %s", file) except Exception as e: logger.error("Unable to delete file %s; %s", file, e) else: logger.error("Unable to delete %s, file type unknown", file) # Performance test alphanum = 'abcdefghijklmnopqrstuvwxyz0123456789' DIR_SEPARATOR = '/' TEST_DIR = '/tmp/output' shutil.rmtree(path=TEST_DIR, ignore_errors=False) total_files = 0 total_dirs = 0 total_inodes = 0 def populate_files(test_path): global total_dirs global total_files global total_inodes def add_char_then_touch(test_dir, suffix_filename): global total_files for first_chr in alphanum: this_name = test_dir + DIR_SEPARATOR + suffix_filename + first_chr pathlib.Path(this_name).touch() total_files += 1 for second_chr in alphanum: that_name = this_name + second_chr pathlib.Path(that_name).touch() total_files += 1 # establish Test directory current_template = test_path try: os.mkdir(current_template) except: pass total_dirs += 1 # populate test directory for first_char in alphanum: this_file = current_template + DIR_SEPARATOR + first_char pathlib.Path(this_file).touch() total_files += 1 add_char_then_touch(current_template, first_char) # prepare test area populate_files(TEST_DIR) # start performance test old_start_dt = dt.now() # Exercise target routine clean_output_dir_original(TEST_DIR, []) old_end_dt = dt.now() # prepare test area populate_files(TEST_DIR) new_start_dt = dt.now() clean_output_dir_new(TEST_DIR, []) # end performance test new_end_dt = dt.now() # test results old_delta_utc = old_end_dt - old_start_dt new_delta_utc = new_end_dt - new_start_dt print(f'Original Delta time {old_delta_utc}') print(f'Latest Delta time {new_delta_utc}') print(f'Total inodes: {total_inodes}') print(f'Total Files {total_files}') print(f'Total Dirs {total_dirs}')