import argparse import os import sys import zoneinfo from datetime import datetime import shutil import pandas as pd from pyicloud import PyiCloudService from pyicloud.exceptions import PyiCloudAPIResponseException # Define the target timezone as requested LA_TZ = zoneinfo.ZoneInfo("America/Los_Angeles") def connect_to_icloud( username: str, cookie_directory: str | None = None ) -> PyiCloudService: """Handles connection and authentication with iCloud.""" print("Connecting to iCloud...") # To avoid password prompts, run `icloud --username=your_email@example.com` # in your terminal to save credentials to your system's keyring. try: api = PyiCloudService(username, cookie_directory=cookie_directory) except PyiCloudAPIResponseException as e: print(f"Error connecting to iCloud: {e}", file=sys.stderr) sys.exit(1) # Handle two-factor authentication if api.requires_2fa: print("Two-factor authentication required.") code = input("Enter the code you received on your device: ") result = api.validate_2fa_code(code) print(f"Code validation result: {result}") if not result: print("Failed to verify 2FA code.", file=sys.stderr) sys.exit(1) if not api.is_trusted_session: print("Session is not trusted. Requesting trust...") result = api.trust_session() print(f"Session trust result: {result}") if not result: print("Failed to request trust for session.", file=sys.stderr) return api def list_libraries(username: str, cookie_directory: str | None = None): """Connects to iCloud and lists available photo libraries.""" api = connect_to_icloud(username, cookie_directory) print("\nAvailable photo libraries:") library_keys = sorted(list(api.photos.libraries.keys())) if not library_keys: print(" No photo libraries found.") else: for library_name in library_keys: print(f" - {library_name}") class DatabaseManager: """Handles all interactions with the download records CSV database.""" def __init__(self, db_path: str, download_dir: str): self._db_path = db_path self._download_dir = download_dir self._columns = [ "PhotoID", "Library", "Version", "Filename", "Filepath", "Size", "AssetDate", "DownloadTimestamp", "RemovedFromCloud", ] self._records_df = pd.DataFrame(columns=self._columns) self._new_records = [] self._existing_records_map = {} self._seen_records = set() self._load() def _load(self): if not self._db_path or not os.path.exists(self._db_path): print("No existing database found or path not specified.") return print(f"\nLoading existing records from database: {self._db_path}") try: self._records_df = self._read_and_normalize_db( self._db_path, dtype={"PhotoID": str, "Size": "Int64"} ) self._existing_records_map = self._records_df.set_index( ["PhotoID", "Version"] )["Filepath"].to_dict() print( f"Loaded {len(self._existing_records_map)} active records for skip-check." ) except Exception as e: print( f"\nWarning: Could not load or parse existing database at {self._db_path}. " f"Will not use DB for skipping. Error: {e}", file=sys.stderr, ) self._records_df = pd.DataFrame(columns=self._columns) self._existing_records_map = {} def _read_and_normalize_db(self, db_path: str, dtype=None) -> pd.DataFrame: """ Reads the database CSV and normalizes 'Filepath' to absolute paths. Raises exceptions on failure. """ df = pd.read_csv(db_path, dtype=dtype) def make_absolute(p): if pd.isna(p): return p path_str = str(p) if os.path.isabs(path_str): return os.path.abspath(path_str) # Normalize return os.path.abspath(os.path.join(self._download_dir, path_str)) if "Filepath" in df.columns: df["Filepath"] = df["Filepath"].apply(make_absolute) return df def has_record(self, photo_id: str, version: str) -> bool: return (photo_id, version) in self._existing_records_map def get_filepath(self, photo_id: str, version: str) -> str | None: return self._existing_records_map.get((photo_id, version)) def add_record(self, record_dict: dict): filepath = record_dict.get("Filepath") if filepath: # Ensure we are working with absolute paths internally. # They will be converted to relative on save. abs_path = os.path.abspath(filepath) record_dict["Filepath"] = abs_path self._new_records.append(record_dict) def save(self, download_complete: bool = False): if not self._db_path: print("No database path specified, not saving records.", file=sys.stderr) return if not self._new_records and self._records_df.empty: print("No records to save.") return print(f"\nSaving database to: {self._db_path}") # New records are in a separate list. # The main dataframe (_records_df) holds the state from the last load. if self._new_records: print(f"Adding {len(self._new_records)} new records.") new_df = pd.DataFrame(self._new_records) else: new_df = pd.DataFrame(columns=self._columns) try: # If a record was re-downloaded, it will appear in both _records_df (old state) # and new_df (new state). We prioritize the one from new_df by using keep='last'. if self._records_df.empty: combined_df = new_df elif new_df.empty: combined_df = self._records_df else: combined_df = pd.concat([self._records_df, new_df], ignore_index=True) combined_df = ( combined_df.drop_duplicates(subset=["PhotoID", "Version"], keep="last") .sort_values(by="Filepath") .reset_index(drop=True) ) if download_complete: print("Download complete, updating 'RemovedFromCloud' status.") # Create a multi-index from the PhotoID and Version columns # to check against the set of seen records. record_ids = pd.MultiIndex.from_frame( combined_df[["PhotoID", "Version"]].astype(str) ) # Create a set of seen (photo_id, version) tuples, ensuring they are strings seen_ids = {(str(pid), str(v)) for pid, v in self._seen_records} # Check which of these IDs were seen during the run is_seen_mask = record_ids.isin(seen_ids) combined_df["RemovedFromCloud"] = ~is_seen_mask else: print( "Download not complete. Only adding new/updated records. " "'RemovedFromCloud' status for existing records is unchanged." ) # No change needed. The concat/drop_duplicates logic correctly preserves # existing statuses while adding/updating new records. # Before saving, convert absolute filepaths to relative to the download directory. df_to_save = combined_df.copy() df_to_save["Filepath"] = df_to_save["Filepath"].apply( lambda p: os.path.relpath(p, self._download_dir) if pd.notna(p) else p ) df_to_save.to_csv(self._db_path, index=False) print(f"Database saved. Total entries: {len(df_to_save)}") # Reset state for the next run within the same session (if any) # We keep absolute paths in memory. self._records_df = combined_df self._new_records = [] self._seen_records = set() self._existing_records_map = self._records_df.set_index( ["PhotoID", "Version"] )["Filepath"].to_dict() except Exception as e: print( f"\nCould not update database at {self._db_path}: {e}", file=sys.stderr ) if not new_df.empty: fallback_path = os.path.join( os.path.dirname(self._db_path), f"download_records_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv", ) new_df.to_csv(fallback_path, index=False) print(f"Saved new records to {fallback_path}", file=sys.stderr) def record_seen(self, photo_id: str, version: str): """ Records that a (photo_id, version) tuple has been seen in the cloud sync. """ self._seen_records.add((photo_id, version)) def _get_files_on_disk(self, directory: str) -> set[str] | None: """Scans a directory and returns a set of absolute file paths.""" try: files_on_disk = set() abs_db_path = os.path.abspath(self._db_path) for root, _, files in os.walk(directory): for file_name in files: full_path = os.path.abspath(os.path.join(root, file_name)) if full_path == abs_db_path: continue files_on_disk.add(full_path) print(f"Found {len(files_on_disk)} files in '{directory}'.") return files_on_disk except Exception as e: print(f"Error scanning directory '{directory}': {e}", file=sys.stderr) return None def _check_for_duplicates(self, df: pd.DataFrame) -> bool: """Checks for and reports duplicate filepaths in the dataframe.""" # keep=False marks all duplicates as True duplicated_filepaths = df[df.duplicated(subset=["Filepath"], keep=False)] if duplicated_filepaths.empty: return False print("\n🚨 Found multiple database records pointing to the same file:") # Sort by filepath to group them together in the output for filepath, group in duplicated_filepaths.sort_values("Filepath").groupby( "Filepath" ): print(f" - File: {filepath}") for _, row in group.iterrows(): print( f" - Record: PhotoID={row['PhotoID']}, Version={row['Version']}" ) return True def _get_removed_from_cloud_files(self, df: pd.DataFrame) -> list[str]: """Gets a list of filepaths for files marked as removed from iCloud.""" if "RemovedFromCloud" not in df.columns: return [] # astype(bool) handles 'True'/'False' strings if they exist from older formats removed_from_cloud_df = df[df["RemovedFromCloud"].astype(bool)] if removed_from_cloud_df.empty: return [] return removed_from_cloud_df["Filepath"].dropna().tolist() def validate(self): """ Validates the downloaded files against the database. """ self.validate_and_repair(repair=False) def repair(self): """ Repairs inconsistencies between the database and the file system. """ self.validate_and_repair(repair=True) def validate_and_repair(self, repair: bool = False): """ Validates and optionally repairs inconsistencies between the database and the file system. - If repair=False, only reports issues. - If repair=True: - Removes records for files that are missing on disk. - Deletes files on disk that are not in the database. - Deletes files and records for items marked as 'RemovedFromCloud'. """ action = "Repairing" if repair else "Validating" print(f"\n{action} database and file system...") if not self._db_path: print(f"No database path specified, cannot perform {action.lower()}.", file=sys.stderr) return if not os.path.exists(self._db_path): print(f"Database file not found at '{self._db_path}'.", file=sys.stderr) return try: df = self._read_and_normalize_db(self._db_path) except Exception as e: print(f"Error reading database file '{self._db_path}': {e}", file=sys.stderr) return files_on_disk = self._get_files_on_disk(self._download_dir) if files_on_disk is None: return # Error already printed # --- Identify inconsistencies --- has_duplicates = self._check_for_duplicates(df) files_in_db = set(df["Filepath"].dropna()) print(f"Found {len(files_in_db)} records in database '{self._db_path}'.") only_in_db = files_in_db - files_on_disk only_on_disk = files_on_disk - files_in_db files_removed_from_cloud = self._get_removed_from_cloud_files(df) if not any([only_on_disk, only_in_db, files_removed_from_cloud, has_duplicates]): print("✅ Database and download directory are in sync.") return # --- Report or Perform repairs --- final_df = df.copy() if only_on_disk: print(f"\n🚨 Found {len(only_on_disk)} files on disk that are NOT in the database:") for f in sorted(list(only_on_disk)): print(f" - {f}") if repair: try: os.remove(f) print(f" - Deleted.") except OSError as e: print(f" - Error deleting file: {e}", file=sys.stderr) if only_in_db: print(f"\n🚨 Found {len(only_in_db)} records in the database that do NOT exist on disk:") for f in sorted(list(only_in_db)): print(f" - {f}") if repair: final_df = final_df[~final_df["Filepath"].isin(only_in_db)] print(" - Records removed from database.") if files_removed_from_cloud: print(f"\nâ„šī¸ Found {len(files_removed_from_cloud)} records for files marked as removed from iCloud:") for f in sorted(files_removed_from_cloud): print(f" - {f}") if repair: if os.path.exists(f): try: os.remove(f) print(f" - Deleted file.") except OSError as e: print(f" - Error deleting file: {e}", file=sys.stderr) else: print(f" - File already gone.") if repair: final_df = final_df[~final_df["Filepath"].isin(files_removed_from_cloud)] print(" - Records for removed files pruned from database.") if repair: print("\nSaving repaired database...") try: final_df = final_df.reset_index(drop=True) df_to_save = final_df.copy() df_to_save["Filepath"] = df_to_save["Filepath"].apply( lambda p: os.path.relpath(p, self._download_dir) if pd.notna(p) else p ) df_to_save.to_csv(self._db_path, index=False) print(f"Database saved. Total entries remaining: {len(df_to_save)}") print("Reloading database into memory...") self._load() except Exception as e: print(f"Could not save repaired database at {self._db_path}: {e}", file=sys.stderr) else: print(f"\n{action} finished with issues. Run with --repair-database to fix.") class PhotoDownloader: """Encapsulates the photo download logic and session state.""" def __init__( self, api: PyiCloudService, db: DatabaseManager, library_name: str, directory: str, skip_existing: bool, ): self._api = api self._db = db self._directory = directory self._skip_existing = skip_existing self._library_name = library_name self._total_processed = 0 self._total_downloaded = 0 self._total_skipped = 0 def run(self): """Downloads all photos from a given library.""" print("Accessing photo libraries...") if self._library_name not in self._api.photos.libraries: print(f"Library '{self._library_name}' not found.", file=sys.stderr) print("\nAvailable photo libraries:", file=sys.stderr) library_keys = sorted(list(self._api.photos.libraries.keys())) if not library_keys: print(" No photo libraries found.", file=sys.stderr) else: for key in library_keys: print(f" - {key}", file=sys.stderr) sys.exit(1) lib = self._api.photos.libraries[self._library_name] print(f"\nProcessing library: {self._library_name}") try: photos_in_lib = lib.all.photos for photo in photos_in_lib: self._total_processed += 1 try: self._process_photo(photo) progress = ( f"🔄 Library '{self._library_name}': Processed {self._total_processed} photos " f"(Downloaded: {self._total_downloaded}, Skipped: {self._total_skipped})" ) print(progress, end="\r", flush=True) except Exception as e: photo_id = getattr(photo, "id", "N/A") print( f"\nError processing photo {photo_id}. Skipping. Error: {e}", file=sys.stderr, ) print( f"\n✅ Finished library '{self._library_name}'. Processed {self._total_processed} photos (Downloaded: {self._total_downloaded}, Skipped: {self._total_skipped})." ) except Exception as e: print( f"\nCould not process photos from library {self._library_name}: {e}", file=sys.stderr, ) def _process_photo(self, photo): asset_date = photo.asset_date or photo.created if not asset_date: print( f"\nSkipping photo {photo.id} because it has no date information.", file=sys.stderr, ) return local_date = asset_date.astimezone(LA_TZ) date_path = local_date.strftime("%Y/%m/%d") download_dir = os.path.join(self._directory, date_path) filepath_base = self._determine_filepath_base(photo, download_dir) if not filepath_base: print( f"\nSkipping photo {photo.id} because it has no 'original' version.", file=sys.stderr, ) return # Now we have a filepath_base. Let's download versions. original_filename = photo.versions["original"]["filename"] _, orig_ext = os.path.splitext(original_filename) image_filepath = f"{filepath_base}{orig_ext}" original_status = self._download_version(photo, "original", image_filepath) live_status = "no_version" if photo.is_live_photo: video_filename = photo.versions["original_video"]["filename"] _, video_ext = os.path.splitext(video_filename) video_filepath = f"{filepath_base}{video_ext}" live_status = self._download_version( photo, "original_video", video_filepath ) if original_status == "downloaded" or live_status == "downloaded": self._total_downloaded += 1 elif original_status == "skipped" or live_status == "skipped": self._total_skipped += 1 def _determine_filepath_base(self, photo, download_dir: str) -> str | None: """ Determines a unique base filepath (path without extension) for a photo. It reuses existing paths from the DB or creates a new unique one. """ try: original_filename = photo.versions["original"]["filename"] except KeyError: return None # Try to find an existing record to maintain filename consistency. if self._db.has_record(photo.id, "original"): db_filepath = self._db.get_filepath(photo.id, "original") if db_filepath: filepath_base, _ = os.path.splitext(db_filepath) return filepath_base # If no existing record, determine a new unique path. base, orig_ext = os.path.splitext(original_filename) candidate_base = os.path.join(download_dir, base) # Get all potential extensions for this photo to check for collisions. extensions = {orig_ext} if photo.is_live_photo: _, video_ext = os.path.splitext( photo.versions["original_video"]["filename"] ) extensions.add(video_ext) # Find a unique base name by appending a counter if necessary. counter = 0 final_base = candidate_base while True: collision = any( os.path.exists(f"{final_base}{ext}") for ext in extensions ) if not collision: return final_base counter += 1 final_base = f"{candidate_base}_{counter}" def _download_version(self, photo, version: str, filepath: str) -> str: try: version_data = photo.versions[version] except KeyError: return "no_version" self._db.record_seen(photo.id, version) original_filename = version_data["filename"] filename = os.path.basename(filepath) # Check if we should skip this download. if self._skip_existing and self._db.has_record(photo.id, version): db_filepath = self._db.get_filepath(photo.id, version) if db_filepath: if ( filepath == db_filepath and os.path.exists(filepath) and os.path.getsize(filepath) == version_data["size"] ): return "skipped" temp_filepath = filepath + ".part" try: download = photo.download(version) if not download: print( f"\nCould not download {version} for {filename}. It might not be ready on iCloud's side.", file=sys.stderr, ) return "failed" os.makedirs(os.path.dirname(filepath), exist_ok=True) with open(temp_filepath, "wb") as f: for chunk in download.iter_content(chunk_size=1024 * 1024): if chunk: f.write(chunk) shutil.move(temp_filepath, filepath) self._db.add_record( { "PhotoID": photo.id, "Library": self._library_name, "Version": version, "Filename": original_filename, "Filepath": filepath, "Size": version_data["size"], "AssetDate": photo.asset_date or photo.created, "DownloadTimestamp": datetime.now(LA_TZ).isoformat(), "RemovedFromCloud": False, } ) return "downloaded" except KeyboardInterrupt: print( f"\nInterrupted during download of {filename}. Deleting partial file.", file=sys.stderr, ) if os.path.exists(temp_filepath): os.remove(temp_filepath) raise except (PyiCloudAPIResponseException, ConnectionError) as e: print(f"\nFailed to download {filename}: {e}", file=sys.stderr) if os.path.exists(temp_filepath): os.remove(temp_filepath) return "failed" def main(): """Parses command-line arguments. Lists libraries or starts the download process.""" parser = argparse.ArgumentParser( description="Download photos from iCloud, organized by date." ) parser.add_argument( "--username", help="Your iCloud username (email address)." ) parser.add_argument( "--library", help="Specify a single photo library to download from. If not provided, lists available libraries and exits.", ) parser.add_argument( "--directory", help="The base directory to download photos into. Required if --library is specified.", ) parser.add_argument( "--skip-existing", action="store_true", help="Skip downloading files that already exist in the destination.", ) parser.add_argument( "--cookie-directory", help="Directory to store iCloud session cookies." ) parser.add_argument( "--validate", action="store_true", help="Validate the download directory against the database and exit.", ) parser.add_argument( "--repair-database", action="store_true", help="Repair inconsistencies between the database and local files.", ) args = parser.parse_args() if args.repair_database or args.validate: if not args.directory: parser.error( "--directory is required when using --validate or --repair-database." ) db_path = os.path.join(args.directory, "database.csv") db = DatabaseManager(db_path, args.directory) if args.repair_database: db.repair() else: db.validate() return if not args.username: parser.error( "--username is required for iCloud operations (e.g., download, list libraries)." ) if not args.library: list_libraries(args.username, args.cookie_directory) return if not args.directory: parser.error("--directory is required when --library is specified.") db_path = os.path.join(args.directory, "database.csv") db = DatabaseManager(db_path, args.directory) # Connect to iCloud and download photos api = connect_to_icloud(args.username, args.cookie_directory) downloader = PhotoDownloader( api, db, args.library, args.directory, args.skip_existing ) download_completed = True try: downloader.run() except KeyboardInterrupt: print("\nInterrupted by user. Saving progress...") download_completed = False finally: db.save(download_complete=download_completed) if download_completed: db.validate() if __name__ == "__main__": main()