Created
July 4, 2024 18:29
-
-
Save egberts/f9f9f1f3156bc17a1c8718b3dfd068d3 to your computer and use it in GitHub Desktop.
Revisions
-
egberts created this gist
Jul 4, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,168 @@ from __future__ import annotations import logging import os import pathlib import shutil from datetime import datetime as dt from collections.abc import Iterable LOG_FORMAT = "%(name)s: %(message)s" level = 20 logging.basicConfig( level=level, format=LOG_FORMAT, datefmt="[%H:%M:%S]", #handlers=[handler] if handler else [], # for our UT benchmark, this is a pesky IT effort ) logger = logging.getLogger('os-listdir') logger.setLevel(level) def clean_output_dir_original(path: str, retention: Iterable[str]) -> None: """Remove all files from output directory except those in retention list""" if not os.path.exists(path): logger.info("Directory already removed: %s", path) return if not os.path.isdir(path): try: os.remove(path) except Exception as e: logger.error("Unable to delete file %s; %s", path, e) return # remove existing content from output folder unless in retention list for filename in os.listdir(path): file = os.path.join(path, filename) if any(filename == retain for retain in retention): logger.info( "Skipping deletion; %s is on retention list: %s", filename, file ) elif os.path.isdir(file): try: shutil.rmtree(file) logger.info("Deleted directory %s", file) except Exception as e: logger.error("Unable to delete directory %s; %s", file, e) elif os.path.isfile(file) or os.path.islink(file): try: os.remove(file) logger.debug("Deleted file/link %s", file) except Exception as e: logger.error("Unable to delete file %s; %s", file, e) else: logger.error("Unable to delete %s, file type unknown", file) def clean_output_dir_new(path: str, retention: Iterable[str]) -> None: """Remove all files from output directory except those in retention list""" if not os.path.exists(path): logger.debug("Directory already removed: %s", path) return if not os.path.isdir(path): try: os.remove(path) except Exception as e: logger.error("Unable to delete file %s; %s", path, e) return # remove existing content from output folder unless in retention list for dir_entry in os.scandir(path): file = dir_entry.path if any(dir_entry.name == retain for retain in retention): logger.debug( "Skipping deletion; %s is on retention list: %s", dir_entry.name, file ) elif dir_entry.is_dir(): try: shutil.rmtree(file) logger.debug("Deleted directory %s", file) except Exception as e: logger.error("Unable to delete directory %s; %s", file, e) elif dir_entry.is_file(follow_symlinks=False) or dir_entry.is_symlink(): try: os.remove(file) logger.debug("Deleted file/link %s", file) except Exception as e: logger.error("Unable to delete file %s; %s", file, e) else: logger.error("Unable to delete %s, file type unknown", file) # Performance test alphanum = 'abcdefghijklmnopqrstuvwxyz0123456789' DIR_SEPARATOR = '/' TEST_DIR = '/tmp/output' shutil.rmtree(path=TEST_DIR, ignore_errors=False) total_files = 0 total_dirs = 0 total_inodes = 0 def populate_files(test_path): global total_dirs global total_files global total_inodes def add_char_then_touch(test_dir, suffix_filename): global total_files for first_chr in alphanum: this_name = test_dir + DIR_SEPARATOR + suffix_filename + first_chr pathlib.Path(this_name).touch() total_files += 1 for second_chr in alphanum: that_name = this_name + second_chr pathlib.Path(that_name).touch() total_files += 1 # establish Test directory current_template = test_path try: os.mkdir(current_template) except: pass total_dirs += 1 # populate test directory for first_char in alphanum: this_file = current_template + DIR_SEPARATOR + first_char pathlib.Path(this_file).touch() total_files += 1 add_char_then_touch(current_template, first_char) # prepare test area populate_files(TEST_DIR) # start performance test old_start_dt = dt.now() # Exercise target routine clean_output_dir_original(TEST_DIR, []) old_end_dt = dt.now() # prepare test area populate_files(TEST_DIR) new_start_dt = dt.now() clean_output_dir_new(TEST_DIR, []) # end performance test new_end_dt = dt.now() # test results old_delta_utc = old_end_dt - old_start_dt new_delta_utc = new_end_dt - new_start_dt print(f'Original Delta time {old_delta_utc}') print(f'Latest Delta time {new_delta_utc}') print(f'Total inodes: {total_inodes}') print(f'Total Files {total_files}') print(f'Total Dirs {total_dirs}')