Last active
September 2, 2024 15:52
-
-
Save habedi/effb3632a9e8a203a0613176eb2b57cf to your computer and use it in GitHub Desktop.
Revisions
-
habedi revised this gist
Sep 2, 2024 . No changes.There are no files selected for viewing
-
habedi created this gist
Sep 2, 2024 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,384 @@ import argparse import json import re from pathlib import Path import gspread import pandas as pd from getfilelistpy import getfilelist from google.oauth2 import service_account from tqdm import tqdm def str2bool(v): """Convert a string representation of truth to true or false. Args: v (str): The string to convert. Returns: bool: The boolean value corresponding to the string. Raises: argparse.ArgumentTypeError: If the string is not a valid boolean representation. """ if v.lower() in ('yes', 'true', 't', 'y', '1'): return True elif v.lower() in ('no', 'false', 'f', 'n', '0'): return False else: raise argparse.ArgumentTypeError('Boolean value expected.') class DuplicateHeaderError(Exception): """Custom exception for duplicate headers after normalization.""" def __init__(self, message): """Initialize the DuplicateHeaderError with a message. Args: message (str): The error message. """ self.message = message super().__init__(self.message) class HeadersNormalizer: """Normalize the headers of a DataFrame by replacing spaces with underscores and converting to lowercase.""" def __init__(self, dataframe: pd.DataFrame, handle_duplicates='suffix'): """Initialize the HeadersNormalizer with a DataFrame and a method to handle duplicates. Args: dataframe (pd.DataFrame): The DataFrame whose headers are to be normalized. handle_duplicates (str): Method to handle duplicate headers ('raise' or 'suffix'). """ self.dataframe = dataframe self.original_headers = dataframe.columns.tolist() self.handle_duplicates = handle_duplicates self.normalized_headers = self._normalize_headers() self._check_for_duplicates() def _normalize_headers(self): """Normalize the headers of the DataFrame. Returns: list: The normalized headers. """ return [self.normalize_header(header) for header in self.original_headers] def normalize_header(self, header): """Normalize a single header. Args: header (str): The header to normalize. Returns: str: The normalized header. """ # Replace spaces with underscores, convert to lowercase, and remove invalid characters normalized = header.strip().replace(' ', '_').lower() # Remove invalid characters (keep only letters, digits, and underscores) normalized = re.sub(r'[^a-zA-Z0-9_]', '', normalized) return normalized def _check_for_duplicates(self): """Check for duplicate normalized headers and handle them based on the specified method.""" if len(set(self.normalized_headers)) != len(self.normalized_headers): if self.handle_duplicates == 'raise': duplicates = [header for header in set(self.normalized_headers) if self.normalized_headers.count(header) > 1] raise DuplicateHeaderError(f"Duplicate normalized headers found: {duplicates}") elif self.handle_duplicates == 'suffix': self._resolve_duplicates() def _resolve_duplicates(self): """Add a suffix to the duplicate normalized headers.""" header_count = {} for i, header in enumerate(self.normalized_headers): if header not in header_count: header_count[header] = 1 else: header_count[header] += 1 self.normalized_headers[i] = f"{header}_{header_count[header]}" def get_original_headers(self): """Return the original headers of the DataFrame. Returns: list: The original headers. """ return self.original_headers def get_normalized_headers(self): """Return the normalized headers of the DataFrame. Returns: list: The normalized headers. """ return self.normalized_headers class DataDownloader: """A class to download data from Google Drive using the Google Drive API.""" def __init__(self, credentials_file, top_folder_id): """Initialize the DataDownloader class with credentials and top folder ID. Args: credentials_file (str): Path to the Google service account credentials file. top_folder_id (str): The ID of the top folder in Google Drive. """ self.credentials = service_account.Credentials.from_service_account_file( credentials_file, scopes=['https://www.googleapis.com/auth/drive'] ) self.credentials_file = credentials_file self.top_folder_id = top_folder_id def _create_resource(self): """Create the resource to get the file list from Google Drive. Returns: dict: The resource dictionary for the Google Drive API. """ return { "service_account": self.credentials, "id": self.top_folder_id, "fields": "files(name, id, mimeType, parents, createdTime, modifiedTime)", } @staticmethod def _filter_google_sheets_files(folder_contents): """Filter the Google Sheets files from the folder contents. Args: folder_contents (dict): The contents of the folder from Google Drive. Returns: list: A list of Google Sheets files. """ contents_list = [] for item in folder_contents['fileList']: for file in item['files']: if file['mimeType'] == 'application/vnd.google-apps.spreadsheet': contents_list.append(file) return contents_list def _scan_folder(self): """Scan the folder and get the Google Sheets files. Returns: list: A list of Google Sheets files in the folder. """ resource = self._create_resource() folder_contents = getfilelist.GetFileList(resource) return self._filter_google_sheets_files(folder_contents) def scan_drive(self): """Scan the Google Drive and get the Google Sheets files. Returns: list: A list of Google Sheets files in the Google Drive. """ return self._scan_folder() @staticmethod def _write_to_json_file(result, output_file): """Write the result to a JSON file. Args: result (list): The result to be written to the JSON file. output_file (str): The path to the output JSON file. """ output_file = Path(output_file) if not output_file.parent.exists(): output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, 'w') as f: json.dump(result, f, indent=4) def scan_drive_to_json(self, output_file): """Scan the Google Drive and write the result to a JSON file. Args: output_file (str): The path to the output JSON file. """ result = self.scan_drive() self._write_to_json_file(result, output_file) @staticmethod def create_dataframe(data, skip_empty_headers=True): """Create a pandas DataFrame from the data extracted from the Google Sheet. Args: data (list): The data from the Google Sheet. skip_empty_headers (bool): Whether to skip columns with empty headers. Returns: pd.DataFrame: The resulting pandas DataFrame. """ if skip_empty_headers: # Filter out columns with empty string headers header_indices = [i for i, c in enumerate(data[0]) if c != ''] else: header_indices = range(len(data[0])) # Extract data for the selected columns new_data = [[row[i] for i in header_indices] for row in data[1:]] # Convert to DataFrame df = pd.DataFrame(new_data, columns=[data[0][i] for i in header_indices]) return df @staticmethod def sanitize_filename(filename: str) -> str: """Sanitize the filename by removing invalid characters. Args: filename (str): The original filename. Returns: str: The sanitized filename. """ # Remove leading and trailing whitespaces from the filename filename = filename.strip() # Remove invalid characters (?, :, /, \, *, ", <, >, |, &, . and space) from the filename return re.sub(r'[?/:\\*"<>&.\s]', '_', filename) def download_sheet_data(self, file_info, output_dir, sheet_id=0): """Download the data from the Google Sheet to a CSV file. Args: file_info (dict): Information about the file to be downloaded. output_dir (str): The directory where the downloaded data will be stored. sheet_id (int): The index of the sheet to download from the Google Sheet file. """ # Use the credentials to authorize the Google Sheets API gc = gspread.service_account(filename=self.credentials_file) # Access the Google Sheet sheet = gc.open_by_key(file_info['id']) # Download data from the Google Sheet sheet_data = sheet.get_worksheet(sheet_id).get_all_values() # print(sheet_data[:5]) # Sanitize the filename and create the output file sanitized_filename = self.sanitize_filename(file_info['name']) output_file = Path(output_dir) / (sanitized_filename + '_' + str(sheet_id) + '.csv') output_file.parent.mkdir(parents=True, exist_ok=True) # Use pandas to write the data to a CSV file sheet_data_df = self.create_dataframe(sheet_data) # Normalize the headers headers_normalizer = HeadersNormalizer(sheet_data_df) sheet_data_df.columns = headers_normalizer.get_normalized_headers() # Write the data to a CSV file sheet_data_df.to_csv(output_file, index=False) def load_exclusion_patterns(exclusion_patterns_file): """Load the exclusion patterns from a JSON file. Args: exclusion_patterns_file (str): Path to the exclusion patterns JSON file. Returns: dict: The loaded exclusion patterns. """ with open(exclusion_patterns_file) as f: exclusion_patterns = json.load(f) return exclusion_patterns def load_categories(categories_file): """Load the categories from a JSON file. Args: categories_file (str): Path to the categories JSON file. Returns: dict: The loaded categories. """ with open(categories_file) as f: categories = json.load(f) return categories def main(): """Download data from Google Drive using the Google Drive API.""" # Create an argument parser to get the credentials file and the top folder id parser = argparse.ArgumentParser(description="Download data from Google Drive") # Arguments for the parser parser.add_argument('--credentials_file', type=str, default='secrets/credentials.json', help='The credentials file for the Google Drive API') parser.add_argument('--top_folder_id', type=str, default='1X8WNjmAzcPoe8xnCDXxrHmFoYWE1ToYz', help='The id of the top folder in the Google Drive') parser.add_argument('--output_dir', type=str, default='/tmp', help='The directory where the downloaded data will be stored') parser.add_argument('--sheet_index', type=int, default=1, help='The index of the sheet to download from the Google Sheet file') parser.add_argument('--organise_by_prefix', type=str2bool, default=True, help='Organise the downloaded files by their prefix into folders') parser.add_argument('--exclusion_patterns_file', type=str, default='data/conf/exclusion_patterns.json', help='The file containing the exclusion patterns to ignore files using regex') parser.add_argument('--categories_file', type=str, default='data/conf/categories.json', help='The file containing the categories to organize the files') # Parse the arguments from the command line args = parser.parse_args() # Load the exclusion patterns from the JSON file exclusion_patterns = load_exclusion_patterns(args.exclusion_patterns_file) # Create a DataDownloader object and scan the Google Drive to get the Google Sheets files and download the data scanner = DataDownloader(args.credentials_file, args.top_folder_id) scanner.scan_drive_to_json(args.output_dir + '/drive_contents.json') # Load the categories from the JSON file categories = load_categories(args.categories_file) # Iterate over the files and download the data from the Google Sheets files for file in tqdm(scanner.scan_drive()): if any([pattern in file['name'] for pattern in exclusion_patterns]): print( f"Not downloading any data from {file['name']} based on the exclusion patterns in {args.exclusion_patterns_file}") continue try: if args.organise_by_prefix and '_' in file['name'] and file[ 'mimeType'] == 'application/vnd.google-apps.spreadsheet': # Find the prefix for the file based on the categories in the JSON file and create a directory for it prefix = None for c in categories: if file['name'].lower().startswith(c.lower()): prefix = c.lower() # Exit the loop if a prefix is found for the file break if prefix is None: print(f"Could not find a category for file {file['name']}") raise ValueError(f"Could not find a category for file {file['name']}") # Create a directory for the prefix if it does not exist storage_dir = Path(args.output_dir) / prefix if not storage_dir.exists(): storage_dir.mkdir(parents=True, exist_ok=True) scanner.download_sheet_data(file, storage_dir, args.sheet_index) else: scanner.download_sheet_data(file, args.output_dir, args.sheet_index) except Exception as e: print(f"Error downloading sheet {args.sheet_index} data from file {file['name']}: {e}") if __name__ == "__main__": main()