Skip to content

Instantly share code, notes, and snippets.

@habedi
Last active September 2, 2024 15:52
Show Gist options
  • Save habedi/effb3632a9e8a203a0613176eb2b57cf to your computer and use it in GitHub Desktop.
Save habedi/effb3632a9e8a203a0613176eb2b57cf to your computer and use it in GitHub Desktop.

Revisions

  1. habedi revised this gist Sep 2, 2024. No changes.
  2. habedi created this gist Sep 2, 2024.
    384 changes: 384 additions & 0 deletions gdrive_downloader.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,384 @@
    import argparse
    import json
    import re
    from pathlib import Path

    import gspread
    import pandas as pd
    from getfilelistpy import getfilelist
    from google.oauth2 import service_account
    from tqdm import tqdm


    def str2bool(v):
    """Convert a string representation of truth to true or false.
    Args:
    v (str): The string to convert.
    Returns:
    bool: The boolean value corresponding to the string.
    Raises:
    argparse.ArgumentTypeError: If the string is not a valid boolean representation.
    """
    if v.lower() in ('yes', 'true', 't', 'y', '1'):
    return True
    elif v.lower() in ('no', 'false', 'f', 'n', '0'):
    return False
    else:
    raise argparse.ArgumentTypeError('Boolean value expected.')


    class DuplicateHeaderError(Exception):
    """Custom exception for duplicate headers after normalization."""

    def __init__(self, message):
    """Initialize the DuplicateHeaderError with a message.
    Args:
    message (str): The error message.
    """
    self.message = message
    super().__init__(self.message)


    class HeadersNormalizer:
    """Normalize the headers of a DataFrame by replacing spaces with underscores and converting to lowercase."""

    def __init__(self, dataframe: pd.DataFrame, handle_duplicates='suffix'):
    """Initialize the HeadersNormalizer with a DataFrame and a method to handle duplicates.
    Args:
    dataframe (pd.DataFrame): The DataFrame whose headers are to be normalized.
    handle_duplicates (str): Method to handle duplicate headers ('raise' or 'suffix').
    """
    self.dataframe = dataframe
    self.original_headers = dataframe.columns.tolist()
    self.handle_duplicates = handle_duplicates
    self.normalized_headers = self._normalize_headers()
    self._check_for_duplicates()

    def _normalize_headers(self):
    """Normalize the headers of the DataFrame.
    Returns:
    list: The normalized headers.
    """
    return [self.normalize_header(header) for header in self.original_headers]

    def normalize_header(self, header):
    """Normalize a single header.
    Args:
    header (str): The header to normalize.
    Returns:
    str: The normalized header.
    """
    # Replace spaces with underscores, convert to lowercase, and remove invalid characters
    normalized = header.strip().replace(' ', '_').lower()
    # Remove invalid characters (keep only letters, digits, and underscores)
    normalized = re.sub(r'[^a-zA-Z0-9_]', '', normalized)
    return normalized

    def _check_for_duplicates(self):
    """Check for duplicate normalized headers and handle them based on the specified method."""
    if len(set(self.normalized_headers)) != len(self.normalized_headers):
    if self.handle_duplicates == 'raise':
    duplicates = [header for header in set(self.normalized_headers) if
    self.normalized_headers.count(header) > 1]
    raise DuplicateHeaderError(f"Duplicate normalized headers found: {duplicates}")
    elif self.handle_duplicates == 'suffix':
    self._resolve_duplicates()

    def _resolve_duplicates(self):
    """Add a suffix to the duplicate normalized headers."""
    header_count = {}
    for i, header in enumerate(self.normalized_headers):
    if header not in header_count:
    header_count[header] = 1
    else:
    header_count[header] += 1
    self.normalized_headers[i] = f"{header}_{header_count[header]}"

    def get_original_headers(self):
    """Return the original headers of the DataFrame.
    Returns:
    list: The original headers.
    """
    return self.original_headers

    def get_normalized_headers(self):
    """Return the normalized headers of the DataFrame.
    Returns:
    list: The normalized headers.
    """
    return self.normalized_headers


    class DataDownloader:
    """A class to download data from Google Drive using the Google Drive API."""

    def __init__(self, credentials_file, top_folder_id):
    """Initialize the DataDownloader class with credentials and top folder ID.
    Args:
    credentials_file (str): Path to the Google service account credentials file.
    top_folder_id (str): The ID of the top folder in Google Drive.
    """
    self.credentials = service_account.Credentials.from_service_account_file(
    credentials_file, scopes=['https://www.googleapis.com/auth/drive']
    )
    self.credentials_file = credentials_file
    self.top_folder_id = top_folder_id

    def _create_resource(self):
    """Create the resource to get the file list from Google Drive.
    Returns:
    dict: The resource dictionary for the Google Drive API.
    """
    return {
    "service_account": self.credentials,
    "id": self.top_folder_id,
    "fields": "files(name, id, mimeType, parents, createdTime, modifiedTime)",
    }

    @staticmethod
    def _filter_google_sheets_files(folder_contents):
    """Filter the Google Sheets files from the folder contents.
    Args:
    folder_contents (dict): The contents of the folder from Google Drive.
    Returns:
    list: A list of Google Sheets files.
    """
    contents_list = []
    for item in folder_contents['fileList']:
    for file in item['files']:
    if file['mimeType'] == 'application/vnd.google-apps.spreadsheet':
    contents_list.append(file)
    return contents_list

    def _scan_folder(self):
    """Scan the folder and get the Google Sheets files.
    Returns:
    list: A list of Google Sheets files in the folder.
    """
    resource = self._create_resource()
    folder_contents = getfilelist.GetFileList(resource)
    return self._filter_google_sheets_files(folder_contents)

    def scan_drive(self):
    """Scan the Google Drive and get the Google Sheets files.
    Returns:
    list: A list of Google Sheets files in the Google Drive.
    """
    return self._scan_folder()

    @staticmethod
    def _write_to_json_file(result, output_file):
    """Write the result to a JSON file.
    Args:
    result (list): The result to be written to the JSON file.
    output_file (str): The path to the output JSON file.
    """
    output_file = Path(output_file)
    if not output_file.parent.exists():
    output_file.parent.mkdir(parents=True, exist_ok=True)

    with open(output_file, 'w') as f:
    json.dump(result, f, indent=4)

    def scan_drive_to_json(self, output_file):
    """Scan the Google Drive and write the result to a JSON file.
    Args:
    output_file (str): The path to the output JSON file.
    """
    result = self.scan_drive()
    self._write_to_json_file(result, output_file)

    @staticmethod
    def create_dataframe(data, skip_empty_headers=True):
    """Create a pandas DataFrame from the data extracted from the Google Sheet.
    Args:
    data (list): The data from the Google Sheet.
    skip_empty_headers (bool): Whether to skip columns with empty headers.
    Returns:
    pd.DataFrame: The resulting pandas DataFrame.
    """
    if skip_empty_headers:
    # Filter out columns with empty string headers
    header_indices = [i for i, c in enumerate(data[0]) if c != '']
    else:
    header_indices = range(len(data[0]))

    # Extract data for the selected columns
    new_data = [[row[i] for i in header_indices] for row in data[1:]]

    # Convert to DataFrame
    df = pd.DataFrame(new_data, columns=[data[0][i] for i in header_indices])
    return df

    @staticmethod
    def sanitize_filename(filename: str) -> str:
    """Sanitize the filename by removing invalid characters.
    Args:
    filename (str): The original filename.
    Returns:
    str: The sanitized filename.
    """
    # Remove leading and trailing whitespaces from the filename
    filename = filename.strip()

    # Remove invalid characters (?, :, /, \, *, ", <, >, |, &, . and space) from the filename
    return re.sub(r'[?/:\\*"<>&.\s]', '_', filename)

    def download_sheet_data(self, file_info, output_dir, sheet_id=0):
    """Download the data from the Google Sheet to a CSV file.
    Args:
    file_info (dict): Information about the file to be downloaded.
    output_dir (str): The directory where the downloaded data will be stored.
    sheet_id (int): The index of the sheet to download from the Google Sheet file.
    """
    # Use the credentials to authorize the Google Sheets API
    gc = gspread.service_account(filename=self.credentials_file)

    # Access the Google Sheet
    sheet = gc.open_by_key(file_info['id'])

    # Download data from the Google Sheet
    sheet_data = sheet.get_worksheet(sheet_id).get_all_values()

    # print(sheet_data[:5])

    # Sanitize the filename and create the output file
    sanitized_filename = self.sanitize_filename(file_info['name'])
    output_file = Path(output_dir) / (sanitized_filename + '_' + str(sheet_id) + '.csv')
    output_file.parent.mkdir(parents=True, exist_ok=True)

    # Use pandas to write the data to a CSV file
    sheet_data_df = self.create_dataframe(sheet_data)

    # Normalize the headers
    headers_normalizer = HeadersNormalizer(sheet_data_df)
    sheet_data_df.columns = headers_normalizer.get_normalized_headers()

    # Write the data to a CSV file
    sheet_data_df.to_csv(output_file, index=False)


    def load_exclusion_patterns(exclusion_patterns_file):
    """Load the exclusion patterns from a JSON file.
    Args:
    exclusion_patterns_file (str): Path to the exclusion patterns JSON file.
    Returns:
    dict: The loaded exclusion patterns.
    """
    with open(exclusion_patterns_file) as f:
    exclusion_patterns = json.load(f)
    return exclusion_patterns


    def load_categories(categories_file):
    """Load the categories from a JSON file.
    Args:
    categories_file (str): Path to the categories JSON file.
    Returns:
    dict: The loaded categories.
    """
    with open(categories_file) as f:
    categories = json.load(f)
    return categories


    def main():
    """Download data from Google Drive using the Google Drive API."""
    # Create an argument parser to get the credentials file and the top folder id
    parser = argparse.ArgumentParser(description="Download data from Google Drive")

    # Arguments for the parser
    parser.add_argument('--credentials_file', type=str, default='secrets/credentials.json',
    help='The credentials file for the Google Drive API')
    parser.add_argument('--top_folder_id', type=str, default='1X8WNjmAzcPoe8xnCDXxrHmFoYWE1ToYz',
    help='The id of the top folder in the Google Drive')
    parser.add_argument('--output_dir', type=str, default='/tmp',
    help='The directory where the downloaded data will be stored')
    parser.add_argument('--sheet_index', type=int, default=1,
    help='The index of the sheet to download from the Google Sheet file')
    parser.add_argument('--organise_by_prefix', type=str2bool, default=True,
    help='Organise the downloaded files by their prefix into folders')
    parser.add_argument('--exclusion_patterns_file', type=str, default='data/conf/exclusion_patterns.json',
    help='The file containing the exclusion patterns to ignore files using regex')
    parser.add_argument('--categories_file', type=str, default='data/conf/categories.json',
    help='The file containing the categories to organize the files')

    # Parse the arguments from the command line
    args = parser.parse_args()

    # Load the exclusion patterns from the JSON file
    exclusion_patterns = load_exclusion_patterns(args.exclusion_patterns_file)

    # Create a DataDownloader object and scan the Google Drive to get the Google Sheets files and download the data
    scanner = DataDownloader(args.credentials_file, args.top_folder_id)
    scanner.scan_drive_to_json(args.output_dir + '/drive_contents.json')

    # Load the categories from the JSON file
    categories = load_categories(args.categories_file)

    # Iterate over the files and download the data from the Google Sheets files
    for file in tqdm(scanner.scan_drive()):

    if any([pattern in file['name'] for pattern in exclusion_patterns]):
    print(
    f"Not downloading any data from {file['name']} based on the exclusion patterns in {args.exclusion_patterns_file}")
    continue

    try:
    if args.organise_by_prefix and '_' in file['name'] and file[
    'mimeType'] == 'application/vnd.google-apps.spreadsheet':

    # Find the prefix for the file based on the categories in the JSON file and create a directory for it
    prefix = None
    for c in categories:
    if file['name'].lower().startswith(c.lower()):
    prefix = c.lower()
    # Exit the loop if a prefix is found for the file
    break

    if prefix is None:
    print(f"Could not find a category for file {file['name']}")
    raise ValueError(f"Could not find a category for file {file['name']}")

    # Create a directory for the prefix if it does not exist
    storage_dir = Path(args.output_dir) / prefix

    if not storage_dir.exists():
    storage_dir.mkdir(parents=True, exist_ok=True)

    scanner.download_sheet_data(file, storage_dir, args.sheet_index)
    else:
    scanner.download_sheet_data(file, args.output_dir, args.sheet_index)
    except Exception as e:
    print(f"Error downloading sheet {args.sheet_index} data from file {file['name']}: {e}")


    if __name__ == "__main__":
    main()