Skip to content

Instantly share code, notes, and snippets.

@egberts
Created July 4, 2024 18:29
Show Gist options
  • Save egberts/f9f9f1f3156bc17a1c8718b3dfd068d3 to your computer and use it in GitHub Desktop.
Save egberts/f9f9f1f3156bc17a1c8718b3dfd068d3 to your computer and use it in GitHub Desktop.

Revisions

  1. egberts created this gist Jul 4, 2024.
    168 changes: 168 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,168 @@
    from __future__ import annotations

    import logging
    import os
    import pathlib
    import shutil
    from datetime import datetime as dt
    from collections.abc import Iterable

    LOG_FORMAT = "%(name)s: %(message)s"
    level = 20
    logging.basicConfig(
    level=level,
    format=LOG_FORMAT,
    datefmt="[%H:%M:%S]",
    #handlers=[handler] if handler else [], # for our UT benchmark, this is a pesky IT effort
    )

    logger = logging.getLogger('os-listdir')

    logger.setLevel(level)


    def clean_output_dir_original(path: str, retention: Iterable[str]) -> None:
    """Remove all files from output directory except those in retention list"""

    if not os.path.exists(path):
    logger.info("Directory already removed: %s", path)
    return

    if not os.path.isdir(path):
    try:
    os.remove(path)
    except Exception as e:
    logger.error("Unable to delete file %s; %s", path, e)
    return

    # remove existing content from output folder unless in retention list
    for filename in os.listdir(path):
    file = os.path.join(path, filename)
    if any(filename == retain for retain in retention):
    logger.info(
    "Skipping deletion; %s is on retention list: %s", filename, file
    )
    elif os.path.isdir(file):
    try:
    shutil.rmtree(file)
    logger.info("Deleted directory %s", file)
    except Exception as e:
    logger.error("Unable to delete directory %s; %s", file, e)
    elif os.path.isfile(file) or os.path.islink(file):
    try:
    os.remove(file)
    logger.debug("Deleted file/link %s", file)
    except Exception as e:
    logger.error("Unable to delete file %s; %s", file, e)
    else:
    logger.error("Unable to delete %s, file type unknown", file)


    def clean_output_dir_new(path: str, retention: Iterable[str]) -> None:
    """Remove all files from output directory except those in retention list"""

    if not os.path.exists(path):
    logger.debug("Directory already removed: %s", path)
    return

    if not os.path.isdir(path):
    try:
    os.remove(path)
    except Exception as e:
    logger.error("Unable to delete file %s; %s", path, e)
    return

    # remove existing content from output folder unless in retention list
    for dir_entry in os.scandir(path):
    file = dir_entry.path
    if any(dir_entry.name == retain for retain in retention):
    logger.debug(
    "Skipping deletion; %s is on retention list: %s", dir_entry.name, file
    )
    elif dir_entry.is_dir():
    try:
    shutil.rmtree(file)
    logger.debug("Deleted directory %s", file)
    except Exception as e:
    logger.error("Unable to delete directory %s; %s", file, e)
    elif dir_entry.is_file(follow_symlinks=False) or dir_entry.is_symlink():
    try:
    os.remove(file)
    logger.debug("Deleted file/link %s", file)
    except Exception as e:
    logger.error("Unable to delete file %s; %s", file, e)
    else:
    logger.error("Unable to delete %s, file type unknown", file)
    # Performance test


    alphanum = 'abcdefghijklmnopqrstuvwxyz0123456789'
    DIR_SEPARATOR = '/'
    TEST_DIR = '/tmp/output'
    shutil.rmtree(path=TEST_DIR, ignore_errors=False)
    total_files = 0
    total_dirs = 0
    total_inodes = 0


    def populate_files(test_path):
    global total_dirs
    global total_files
    global total_inodes

    def add_char_then_touch(test_dir, suffix_filename):
    global total_files
    for first_chr in alphanum:
    this_name = test_dir + DIR_SEPARATOR + suffix_filename + first_chr
    pathlib.Path(this_name).touch()
    total_files += 1
    for second_chr in alphanum:
    that_name = this_name + second_chr
    pathlib.Path(that_name).touch()
    total_files += 1

    # establish Test directory
    current_template = test_path
    try:
    os.mkdir(current_template)
    except:
    pass
    total_dirs += 1

    # populate test directory
    for first_char in alphanum:
    this_file = current_template + DIR_SEPARATOR + first_char
    pathlib.Path(this_file).touch()
    total_files += 1
    add_char_then_touch(current_template, first_char)


    # prepare test area
    populate_files(TEST_DIR)

    # start performance test
    old_start_dt = dt.now()

    # Exercise target routine
    clean_output_dir_original(TEST_DIR, [])

    old_end_dt = dt.now()

    # prepare test area
    populate_files(TEST_DIR)

    new_start_dt = dt.now()

    clean_output_dir_new(TEST_DIR, [])

    # end performance test
    new_end_dt = dt.now()

    # test results
    old_delta_utc = old_end_dt - old_start_dt
    new_delta_utc = new_end_dt - new_start_dt
    print(f'Original Delta time {old_delta_utc}')
    print(f'Latest Delta time {new_delta_utc}')
    print(f'Total inodes: {total_inodes}')
    print(f'Total Files {total_files}')
    print(f'Total Dirs {total_dirs}')