Skip to content

Instantly share code, notes, and snippets.

@randy3k
Last active October 1, 2025 03:55
Show Gist options
  • Save randy3k/722e14d79ac4de61d61e7a0a8eb09a4c to your computer and use it in GitHub Desktop.
Save randy3k/722e14d79ac4de61d61e7a0a8eb09a4c to your computer and use it in GitHub Desktop.
download icloud photos
import argparse
import os
import sys
import zoneinfo
from datetime import datetime
import shutil
import pandas as pd
from pyicloud import PyiCloudService
from pyicloud.exceptions import PyiCloudAPIResponseException
# Define the target timezone as requested
LA_TZ = zoneinfo.ZoneInfo("America/Los_Angeles")
def connect_to_icloud(
username: str, cookie_directory: str | None = None
) -> PyiCloudService:
"""Handles connection and authentication with iCloud."""
print("Connecting to iCloud...")
# To avoid password prompts, run `icloud [email protected]`
# in your terminal to save credentials to your system's keyring.
try:
api = PyiCloudService(username, cookie_directory=cookie_directory)
except PyiCloudAPIResponseException as e:
print(f"Error connecting to iCloud: {e}", file=sys.stderr)
sys.exit(1)
# Handle two-factor authentication
if api.requires_2fa:
print("Two-factor authentication required.")
code = input("Enter the code you received on your device: ")
result = api.validate_2fa_code(code)
print(f"Code validation result: {result}")
if not result:
print("Failed to verify 2FA code.", file=sys.stderr)
sys.exit(1)
if not api.is_trusted_session:
print("Session is not trusted. Requesting trust...")
result = api.trust_session()
print(f"Session trust result: {result}")
if not result:
print("Failed to request trust for session.", file=sys.stderr)
return api
def list_libraries(username: str, cookie_directory: str | None = None):
"""Connects to iCloud and lists available photo libraries."""
api = connect_to_icloud(username, cookie_directory)
print("\nAvailable photo libraries:")
library_keys = sorted(list(api.photos.libraries.keys()))
if not library_keys:
print(" No photo libraries found.")
else:
for library_name in library_keys:
print(f" - {library_name}")
class DatabaseManager:
"""Handles all interactions with the download records CSV database."""
def __init__(self, db_path: str, download_dir: str):
self._db_path = db_path
self._download_dir = download_dir
self._columns = [
"PhotoID",
"Library",
"Version",
"Filename",
"Filepath",
"Size",
"AssetDate",
"DownloadTimestamp",
"RemovedFromCloud",
]
self._records_df = pd.DataFrame(columns=self._columns)
self._new_records = []
self._existing_records_map = {}
self._seen_records = set()
self._load()
def _load(self):
if not self._db_path or not os.path.exists(self._db_path):
print("No existing database found or path not specified.")
return
print(f"\nLoading existing records from database: {self._db_path}")
try:
self._records_df = self._read_and_normalize_db(
self._db_path, dtype={"PhotoID": str, "Size": "Int64"}
)
self._existing_records_map = self._records_df.set_index(
["PhotoID", "Version"]
)["Filepath"].to_dict()
print(
f"Loaded {len(self._existing_records_map)} active records for skip-check."
)
except Exception as e:
print(
f"\nWarning: Could not load or parse existing database at {self._db_path}. "
f"Will not use DB for skipping. Error: {e}",
file=sys.stderr,
)
self._records_df = pd.DataFrame(columns=self._columns)
self._existing_records_map = {}
def _read_and_normalize_db(self, db_path: str, dtype=None) -> pd.DataFrame:
"""
Reads the database CSV and normalizes 'Filepath' to absolute paths.
Raises exceptions on failure.
"""
df = pd.read_csv(db_path, dtype=dtype)
def make_absolute(p):
if pd.isna(p):
return p
path_str = str(p)
if os.path.isabs(path_str):
return os.path.abspath(path_str) # Normalize
return os.path.abspath(os.path.join(self._download_dir, path_str))
if "Filepath" in df.columns:
df["Filepath"] = df["Filepath"].apply(make_absolute)
return df
def has_record(self, photo_id: str, version: str) -> bool:
return (photo_id, version) in self._existing_records_map
def get_filepath(self, photo_id: str, version: str) -> str | None:
return self._existing_records_map.get((photo_id, version))
def add_record(self, record_dict: dict):
filepath = record_dict.get("Filepath")
if filepath:
# Ensure we are working with absolute paths internally.
# They will be converted to relative on save.
abs_path = os.path.abspath(filepath)
record_dict["Filepath"] = abs_path
self._new_records.append(record_dict)
def save(self, download_complete: bool = False):
if not self._db_path:
print("No database path specified, not saving records.", file=sys.stderr)
return
if not self._new_records and self._records_df.empty:
print("No records to save.")
return
print(f"\nSaving database to: {self._db_path}")
# New records are in a separate list.
# The main dataframe (_records_df) holds the state from the last load.
if self._new_records:
print(f"Adding {len(self._new_records)} new records.")
new_df = pd.DataFrame(self._new_records)
else:
new_df = pd.DataFrame(columns=self._columns)
try:
# If a record was re-downloaded, it will appear in both _records_df (old state)
# and new_df (new state). We prioritize the one from new_df by using keep='last'.
if self._records_df.empty:
combined_df = new_df
elif new_df.empty:
combined_df = self._records_df
else:
combined_df = pd.concat([self._records_df, new_df], ignore_index=True)
combined_df = (
combined_df.drop_duplicates(subset=["PhotoID", "Version"], keep="last")
.sort_values(by="Filepath")
.reset_index(drop=True)
)
if download_complete:
print("Download complete, updating 'RemovedFromCloud' status.")
# Create a multi-index from the PhotoID and Version columns
# to check against the set of seen records.
record_ids = pd.MultiIndex.from_frame(
combined_df[["PhotoID", "Version"]].astype(str)
)
# Create a set of seen (photo_id, version) tuples, ensuring they are strings
seen_ids = {(str(pid), str(v)) for pid, v in self._seen_records}
# Check which of these IDs were seen during the run
is_seen_mask = record_ids.isin(seen_ids)
combined_df["RemovedFromCloud"] = ~is_seen_mask
else:
print(
"Download not complete. Only adding new/updated records. "
"'RemovedFromCloud' status for existing records is unchanged."
)
# No change needed. The concat/drop_duplicates logic correctly preserves
# existing statuses while adding/updating new records.
# Before saving, convert absolute filepaths to relative to the download directory.
df_to_save = combined_df.copy()
df_to_save["Filepath"] = df_to_save["Filepath"].apply(
lambda p: os.path.relpath(p, self._download_dir) if pd.notna(p) else p
)
df_to_save.to_csv(self._db_path, index=False)
print(f"Database saved. Total entries: {len(df_to_save)}")
# Reset state for the next run within the same session (if any)
# We keep absolute paths in memory.
self._records_df = combined_df
self._new_records = []
self._seen_records = set()
self._existing_records_map = self._records_df.set_index(
["PhotoID", "Version"]
)["Filepath"].to_dict()
except Exception as e:
print(
f"\nCould not update database at {self._db_path}: {e}", file=sys.stderr
)
if not new_df.empty:
fallback_path = os.path.join(
os.path.dirname(self._db_path),
f"download_records_{datetime.now().strftime('%Y%m%d%H%M%S')}.csv",
)
new_df.to_csv(fallback_path, index=False)
print(f"Saved new records to {fallback_path}", file=sys.stderr)
def record_seen(self, photo_id: str, version: str):
"""
Records that a (photo_id, version) tuple has been seen in the cloud sync.
"""
self._seen_records.add((photo_id, version))
def _get_files_on_disk(self, directory: str) -> set[str] | None:
"""Scans a directory and returns a set of absolute file paths."""
try:
files_on_disk = set()
abs_db_path = os.path.abspath(self._db_path)
for root, _, files in os.walk(directory):
for file_name in files:
full_path = os.path.abspath(os.path.join(root, file_name))
if full_path == abs_db_path:
continue
files_on_disk.add(full_path)
print(f"Found {len(files_on_disk)} files in '{directory}'.")
return files_on_disk
except Exception as e:
print(f"Error scanning directory '{directory}': {e}", file=sys.stderr)
return None
def _check_for_duplicates(self, df: pd.DataFrame) -> bool:
"""Checks for and reports duplicate filepaths in the dataframe."""
# keep=False marks all duplicates as True
duplicated_filepaths = df[df.duplicated(subset=["Filepath"], keep=False)]
if duplicated_filepaths.empty:
return False
print("\n🚨 Found multiple database records pointing to the same file:")
# Sort by filepath to group them together in the output
for filepath, group in duplicated_filepaths.sort_values("Filepath").groupby(
"Filepath"
):
print(f" - File: {filepath}")
for _, row in group.iterrows():
print(
f" - Record: PhotoID={row['PhotoID']}, Version={row['Version']}"
)
return True
def _get_removed_from_cloud_files(self, df: pd.DataFrame) -> list[str]:
"""Gets a list of filepaths for files marked as removed from iCloud."""
if "RemovedFromCloud" not in df.columns:
return []
# astype(bool) handles 'True'/'False' strings if they exist from older formats
removed_from_cloud_df = df[df["RemovedFromCloud"].astype(bool)]
if removed_from_cloud_df.empty:
return []
return removed_from_cloud_df["Filepath"].dropna().tolist()
def validate(self):
"""
Validates the downloaded files against the database.
"""
self.validate_and_repair(repair=False)
def repair(self):
"""
Repairs inconsistencies between the database and the file system.
"""
self.validate_and_repair(repair=True)
def validate_and_repair(self, repair: bool = False):
"""
Validates and optionally repairs inconsistencies between the database and the file system.
- If repair=False, only reports issues.
- If repair=True:
- Removes records for files that are missing on disk.
- Deletes files on disk that are not in the database.
- Deletes files and records for items marked as 'RemovedFromCloud'.
"""
action = "Repairing" if repair else "Validating"
print(f"\n{action} database and file system...")
if not self._db_path:
print(f"No database path specified, cannot perform {action.lower()}.", file=sys.stderr)
return
if not os.path.exists(self._db_path):
print(f"Database file not found at '{self._db_path}'.", file=sys.stderr)
return
try:
df = self._read_and_normalize_db(self._db_path)
except Exception as e:
print(f"Error reading database file '{self._db_path}': {e}", file=sys.stderr)
return
files_on_disk = self._get_files_on_disk(self._download_dir)
if files_on_disk is None:
return # Error already printed
# --- Identify inconsistencies ---
has_duplicates = self._check_for_duplicates(df)
files_in_db = set(df["Filepath"].dropna())
print(f"Found {len(files_in_db)} records in database '{self._db_path}'.")
only_in_db = files_in_db - files_on_disk
only_on_disk = files_on_disk - files_in_db
files_removed_from_cloud = self._get_removed_from_cloud_files(df)
if not any([only_on_disk, only_in_db, files_removed_from_cloud, has_duplicates]):
print("✅ Database and download directory are in sync.")
return
# --- Report or Perform repairs ---
final_df = df.copy()
if only_on_disk:
print(f"\n🚨 Found {len(only_on_disk)} files on disk that are NOT in the database:")
for f in sorted(list(only_on_disk)):
print(f" - {f}")
if repair:
try:
os.remove(f)
print(f" - Deleted.")
except OSError as e:
print(f" - Error deleting file: {e}", file=sys.stderr)
if only_in_db:
print(f"\n🚨 Found {len(only_in_db)} records in the database that do NOT exist on disk:")
for f in sorted(list(only_in_db)):
print(f" - {f}")
if repair:
final_df = final_df[~final_df["Filepath"].isin(only_in_db)]
print(" - Records removed from database.")
if files_removed_from_cloud:
print(f"\nℹ️ Found {len(files_removed_from_cloud)} records for files marked as removed from iCloud:")
for f in sorted(files_removed_from_cloud):
print(f" - {f}")
if repair:
if os.path.exists(f):
try:
os.remove(f)
print(f" - Deleted file.")
except OSError as e:
print(f" - Error deleting file: {e}", file=sys.stderr)
else:
print(f" - File already gone.")
if repair:
final_df = final_df[~final_df["Filepath"].isin(files_removed_from_cloud)]
print(" - Records for removed files pruned from database.")
if repair:
print("\nSaving repaired database...")
try:
final_df = final_df.reset_index(drop=True)
df_to_save = final_df.copy()
df_to_save["Filepath"] = df_to_save["Filepath"].apply(
lambda p: os.path.relpath(p, self._download_dir) if pd.notna(p) else p
)
df_to_save.to_csv(self._db_path, index=False)
print(f"Database saved. Total entries remaining: {len(df_to_save)}")
print("Reloading database into memory...")
self._load()
except Exception as e:
print(f"Could not save repaired database at {self._db_path}: {e}", file=sys.stderr)
else:
print(f"\n{action} finished with issues. Run with --repair-database to fix.")
class PhotoDownloader:
"""Encapsulates the photo download logic and session state."""
def __init__(
self,
api: PyiCloudService,
db: DatabaseManager,
library_name: str,
directory: str,
skip_existing: bool,
):
self._api = api
self._db = db
self._directory = directory
self._skip_existing = skip_existing
self._library_name = library_name
self._total_processed = 0
self._total_downloaded = 0
self._total_skipped = 0
def run(self):
"""Downloads all photos from a given library."""
print("Accessing photo libraries...")
if self._library_name not in self._api.photos.libraries:
print(f"Library '{self._library_name}' not found.", file=sys.stderr)
print("\nAvailable photo libraries:", file=sys.stderr)
library_keys = sorted(list(self._api.photos.libraries.keys()))
if not library_keys:
print(" No photo libraries found.", file=sys.stderr)
else:
for key in library_keys:
print(f" - {key}", file=sys.stderr)
sys.exit(1)
lib = self._api.photos.libraries[self._library_name]
print(f"\nProcessing library: {self._library_name}")
try:
photos_in_lib = lib.all.photos
for photo in photos_in_lib:
self._total_processed += 1
try:
self._process_photo(photo)
progress = (
f"🔄 Library '{self._library_name}': Processed {self._total_processed} photos "
f"(Downloaded: {self._total_downloaded}, Skipped: {self._total_skipped})"
)
print(progress, end="\r", flush=True)
except Exception as e:
photo_id = getattr(photo, "id", "N/A")
print(
f"\nError processing photo {photo_id}. Skipping. Error: {e}",
file=sys.stderr,
)
print(
f"\n✅ Finished library '{self._library_name}'. Processed {self._total_processed} photos (Downloaded: {self._total_downloaded}, Skipped: {self._total_skipped})."
)
except Exception as e:
print(
f"\nCould not process photos from library {self._library_name}: {e}",
file=sys.stderr,
)
def _process_photo(self, photo):
asset_date = photo.asset_date or photo.created
if not asset_date:
print(
f"\nSkipping photo {photo.id} because it has no date information.",
file=sys.stderr,
)
return
local_date = asset_date.astimezone(LA_TZ)
date_path = local_date.strftime("%Y/%m/%d")
download_dir = os.path.join(self._directory, date_path)
filepath_base = self._determine_filepath_base(photo, download_dir)
if not filepath_base:
print(
f"\nSkipping photo {photo.id} because it has no 'original' version.",
file=sys.stderr,
)
return
# Now we have a filepath_base. Let's download versions.
original_filename = photo.versions["original"]["filename"]
_, orig_ext = os.path.splitext(original_filename)
image_filepath = f"{filepath_base}{orig_ext}"
original_status = self._download_version(photo, "original", image_filepath)
live_status = "no_version"
if photo.is_live_photo:
video_filename = photo.versions["original_video"]["filename"]
_, video_ext = os.path.splitext(video_filename)
video_filepath = f"{filepath_base}{video_ext}"
live_status = self._download_version(
photo, "original_video", video_filepath
)
if original_status == "downloaded" or live_status == "downloaded":
self._total_downloaded += 1
elif original_status == "skipped" or live_status == "skipped":
self._total_skipped += 1
def _determine_filepath_base(self, photo, download_dir: str) -> str | None:
"""
Determines a unique base filepath (path without extension) for a photo.
It reuses existing paths from the DB or creates a new unique one.
"""
try:
original_filename = photo.versions["original"]["filename"]
except KeyError:
return None
# Try to find an existing record to maintain filename consistency.
if self._db.has_record(photo.id, "original"):
db_filepath = self._db.get_filepath(photo.id, "original")
if db_filepath:
filepath_base, _ = os.path.splitext(db_filepath)
return filepath_base
# If no existing record, determine a new unique path.
base, orig_ext = os.path.splitext(original_filename)
candidate_base = os.path.join(download_dir, base)
# Get all potential extensions for this photo to check for collisions.
extensions = {orig_ext}
if photo.is_live_photo:
_, video_ext = os.path.splitext(
photo.versions["original_video"]["filename"]
)
extensions.add(video_ext)
# Find a unique base name by appending a counter if necessary.
counter = 0
final_base = candidate_base
while True:
collision = any(
os.path.exists(f"{final_base}{ext}")
for ext in extensions
)
if not collision:
return final_base
counter += 1
final_base = f"{candidate_base}_{counter}"
def _download_version(self, photo, version: str, filepath: str) -> str:
try:
version_data = photo.versions[version]
except KeyError:
return "no_version"
self._db.record_seen(photo.id, version)
original_filename = version_data["filename"]
filename = os.path.basename(filepath)
# Check if we should skip this download.
if self._skip_existing and self._db.has_record(photo.id, version):
db_filepath = self._db.get_filepath(photo.id, version)
if db_filepath:
if (
filepath == db_filepath
and os.path.exists(filepath)
and os.path.getsize(filepath) == version_data["size"]
):
return "skipped"
temp_filepath = filepath + ".part"
try:
download = photo.download(version)
if not download:
print(
f"\nCould not download {version} for {filename}. It might not be ready on iCloud's side.",
file=sys.stderr,
)
return "failed"
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(temp_filepath, "wb") as f:
for chunk in download.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
shutil.move(temp_filepath, filepath)
self._db.add_record(
{
"PhotoID": photo.id,
"Library": self._library_name,
"Version": version,
"Filename": original_filename,
"Filepath": filepath,
"Size": version_data["size"],
"AssetDate": photo.asset_date or photo.created,
"DownloadTimestamp": datetime.now(LA_TZ).isoformat(),
"RemovedFromCloud": False,
}
)
return "downloaded"
except KeyboardInterrupt:
print(
f"\nInterrupted during download of {filename}. Deleting partial file.",
file=sys.stderr,
)
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
raise
except (PyiCloudAPIResponseException, ConnectionError) as e:
print(f"\nFailed to download {filename}: {e}", file=sys.stderr)
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
return "failed"
def main():
"""Parses command-line arguments. Lists libraries or starts the download process."""
parser = argparse.ArgumentParser(
description="Download photos from iCloud, organized by date."
)
parser.add_argument(
"--username", help="Your iCloud username (email address)."
)
parser.add_argument(
"--library",
help="Specify a single photo library to download from. If not provided, lists available libraries and exits.",
)
parser.add_argument(
"--directory",
help="The base directory to download photos into. Required if --library is specified.",
)
parser.add_argument(
"--skip-existing",
action="store_true",
help="Skip downloading files that already exist in the destination.",
)
parser.add_argument(
"--cookie-directory", help="Directory to store iCloud session cookies."
)
parser.add_argument(
"--validate",
action="store_true",
help="Validate the download directory against the database and exit.",
)
parser.add_argument(
"--repair-database",
action="store_true",
help="Repair inconsistencies between the database and local files.",
)
args = parser.parse_args()
if args.repair_database or args.validate:
if not args.directory:
parser.error(
"--directory is required when using --validate or --repair-database."
)
db_path = os.path.join(args.directory, "database.csv")
db = DatabaseManager(db_path, args.directory)
if args.repair_database:
db.repair()
else:
db.validate()
return
if not args.username:
parser.error(
"--username is required for iCloud operations (e.g., download, list libraries)."
)
if not args.library:
list_libraries(args.username, args.cookie_directory)
return
if not args.directory:
parser.error("--directory is required when --library is specified.")
db_path = os.path.join(args.directory, "database.csv")
db = DatabaseManager(db_path, args.directory)
# Connect to iCloud and download photos
api = connect_to_icloud(args.username, args.cookie_directory)
downloader = PhotoDownloader(
api, db, args.library, args.directory, args.skip_existing
)
download_completed = True
try:
downloader.run()
except KeyboardInterrupt:
print("\nInterrupted by user. Saving progress...")
download_completed = False
finally:
db.save(download_complete=download_completed)
if download_completed:
db.validate()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment