import os import re import time import random from enum import Enum from lib.common import bootstrap from lib.common.logger import global_logger from lib.common.config import StrPathExpanded from lib.common.base_scenario import BaseScenario from lib.common.errors import SACError from lib.utils import KNOWN_ARCHIVE_EXTENSIONS, splitext_archive, \ unpack_archive, pack_archive from lib.utils.fs import copy_file_or_directory, remove_file_or_directory ## Enum with type of sources. class PathType(Enum): FILE = "file" DIRECTORY = "directory" ARCHIVE = "archive" ## Get source type by path. # @param cls Class. # @param path Path to source. # @return Enumeration member. @classmethod def detect(cls, path): # if path is directory, return DIRECTORY if os.path.isdir(path): return cls.DIRECTORY else: # if path no a file (and not a directory), raise an exception if not os.path.isfile(path): raise SACError( "ARGS_ERROR", "Specified path not a directory nor a file", path=path ) # split file name to name and extension h, t = splitext_archive(os.path.basename(path)) # if head (and tail is empty) or tail in known archive extension, # assume that path is archive, file otherwise if t in KNOWN_ARCHIVE_EXTENSIONS or (h in KNOWN_ARCHIVE_EXTENSIONS \ and t == ""): return cls.ARCHIVE else: return cls.FILE class CopyFilesScenario(BaseScenario): ## Available object properties: # self.config - ScenarioConfiguration object. # self.tmp - Path to temporary folder. def _validate_specific_data(self): validate_data = [ ["source", StrPathExpanded], ["destination", StrPathExpanded], ["copy-with-root", bool], ["replace", bool], ["remove-source", bool], ["archive-type", str, str, ["zip", "tar", "tar.gz", "tar.xz", "tar.bz2"]], ["archive-action", str, str, ["pack", "unpack", "none"]], ["max-delay", int, int, lambda x: x >= 0], ["pattern", str], ["invert-pattern", bool] ] self.config.validate(validate_data) # check regex if self.config["pattern"]: try: re.compile(self.config["pattern"]) except re.error: raise SACError("ARGS_ERROR", "Invalid regular expression", regex=self.config["pattern"]) ## Check destination location. # @param self Pointer to object. def __check_destination_folder(self): # first, if destination directory doesn't exists, try to create it if not os.path.exists(self.config["destination"]): try: os.makedirs(self.config["destination"]) except: raise SACError( "ARGS_ERROR", "Cannot create directory at specified location", location=self.config["destination"] ) else: if not os.path.isdir(self.config["destination"]) and \ not self.config["replace"]: raise SACError( "ARGS_ERROR", "Destination path already exists and replace is not " "allowed", destination_path=self.config["destination"] ) ## Check source location existence. # @param self Pointer to object. def __check_source_file(self): if not os.path.exists(self.config["source"]): raise SACError("ARGS_ERROR", "Source path not exists", location=self.config["source"]) self.__source_type = PathType.detect(self.config["source"]) def _get_available_tests(self): return [ ["check-destination-folder", self.__check_destination_folder, False], ["check-source", self.__check_source_file, True] ] ## Default files copy. # @param self Pointer to object. def __default_copy(self): copy_file_or_directory(self.config["source"], self.config["destination"], self.config["copy-with-root"], self.config["pattern"], self.config["invert-pattern"], self.config["replace"]) if self.config["remove-source"]: remove_file_or_directory(self.config["source"], self.config["pattern"], self.config["invert-pattern"]) ## Unpack source and copy it to destination folder. # @param self Pointer to object. def __unpack_copy(self): unpack_archive(self.config["source"], self.config["destination"], self.config["copy-with-root"], self.config["pattern"], self.config["invert-pattern"], self.config["replace"]) if self.config["remove-source"]: remove_file_or_directory(self.config["source"]) ## Pack required files to archive and place it in destination folder. # @param self Pointer to object. def __pack_copy(self): pack_archive(self.config["source"], self.config["destination"], self.config["archive-type"], self.config["copy-with-root"], self.config["pattern"], self.config["invert-pattern"], self.config["replace"]) if self.config["remove-source"]: remove_file_or_directory(self.config["source"], self.config["pattern"], self.config["invert-pattern"]) def _real(self): # random delay before copy if self.config["max-delay"] > 0: delay = random.uniform(0, self.config["max-delay"]) global_logger.info(message="Selected delay", delay=delay) time.sleep(delay) global_logger.info(message="Starting copy") # create destination folder if os.path.exists(self.config["destination"]): if not os.path.isdir(self.config["destination"]): if self.config["replace"]: remove_file_or_directory(self.config["destination"]) else: raise SACError( "ARGS_ERROR", "Destination path exists and not a directory", path=self.config["destination"] ) else: os.makedirs(self.config["destination"]) # depends on source type and archive action, perform action if self.config["archive-action"] == "none": self.__default_copy() elif self.config["archive-action"] == "pack": self.__pack_copy() else: source_type = PathType.detect(self.config["source"]) if source_type != PathType.ARCHIVE: self.__default_copy() else: self.__unpack_copy() if __name__ == "__main__": CopyFilesScenario.main()