#!/usr/bin/env -S uv --quiet run --script # /// script # requires-python = ">=3.13" # dependencies = [ # "markdown-it-py", # "requests", # "tqdm", # ] # /// import os import base64 import mimetypes import concurrent.futures from pathlib import Path from urllib.parse import urlparse import re # Make sure re is imported import requests from markdown_it import MarkdownIt # from markdown_it.renderer import RendererHTML # Not used in final regex approach # from markdown_it.utils import OptionsDict # Not used from tqdm import tqdm # Configuration MAX_WORKERS = 3 OUTPUT_DIR = Path("output") REQUEST_TIMEOUT = 10 # seconds for network requests VERBOSE = False # Set to True for detailed per-image logging, False for cleaner output # Ensure mimetypes are initialized mimetypes.init() def get_image_mimetype(image_path_or_url, content=None): """Determines the MIME type of an image.""" parsed_url = urlparse(image_path_or_url) if parsed_url.scheme in ['http', 'https']: if content: pass # Placeholder for more advanced content-based MIME detection mime_type, _ = mimetypes.guess_type(parsed_url.path) if mime_type: return mime_type else: # Local file mime_type, _ = mimetypes.guess_type(image_path_or_url) if mime_type: return mime_type # Default or if unable to determine ext = Path(image_path_or_url).suffix.lower() if ext == ".png": return "image/png" if ext in [".jpg", ".jpeg"]: return "image/jpeg" if ext == ".gif": return "image/gif" if ext == ".svg": return "image/svg+xml" if ext == ".webp": return "image/webp" return "application/octet-stream" def image_to_base64(image_src, md_file_path: Path): """ Converts an image (local or remote) to a base64 data URI. Returns (data_uri_string_or_None, status_message_or_None) """ log_messages = [] try: parsed_url = urlparse(image_src) image_data = None content_type_header = None if parsed_url.scheme in ['http', 'https']: if VERBOSE: log_messages.append(f" Fetching remote: {image_src[:70]}...") response = requests.get(image_src, timeout=REQUEST_TIMEOUT, stream=True) response.raise_for_status() image_data = response.content content_type_header = response.headers.get('Content-Type') elif not parsed_url.scheme and not parsed_url.netloc: base_dir = md_file_path.parent local_image_path = (base_dir / image_src).resolve() if local_image_path.is_file(): if VERBOSE: log_messages.append(f" Reading local: {local_image_path.name}") with open(local_image_path, "rb") as f: image_data = f.read() else: return None, f" ERROR: Local image not found: {local_image_path} (referenced in {md_file_path.name})" else: return None, f" WARNING: Unsupported image scheme: {image_src} (in {md_file_path.name})" if image_data: base64_encoded_data = base64.b64encode(image_data).decode('utf-8') mime_type = content_type_header or get_image_mimetype(image_src, image_data) if VERBOSE: log_messages.append(f" Encoded {image_src[:50]}... as {mime_type}") return f"data:{mime_type};base64,{base64_encoded_data}", "\n".join(log_messages) if log_messages else None except requests.exceptions.RequestException as e: return None, f" ERROR fetching {image_src[:70]}...: {e} (in {md_file_path.name})" except IOError as e: return None, f" ERROR reading {image_src}: {e} (in {md_file_path.name})" except Exception as e: return None, f" ERROR processing {image_src}: {e} (in {md_file_path.name})" # Fallback if something unexpected happened before returning final_message = "\n".join(log_messages) if log_messages else None if not image_data and not final_message: # Ensure there's a message if we fall through without success final_message = f" ERROR: Unknown issue processing image {image_src} (in {md_file_path.name})" return None, final_message def process_markdown_file(md_file_path: Path, output_base_dir: Path): """ Reads a Markdown file, embeds images, and saves it to the output directory. Returns (success_boolean, list_of_human_readable_messages) """ file_operation_messages = [] if VERBOSE: file_operation_messages.append(f"Starting processing: {md_file_path}") try: relative_path = md_file_path.relative_to(Path.cwd()) output_file_path = output_base_dir / relative_path output_file_path.parent.mkdir(parents=True, exist_ok=True) with open(md_file_path, "r", encoding="utf-8") as f: content = f.read() image_pattern = re.compile(r"!\[(.*?)\]\((.*?)(?: \"(.*?)\")?\)") processed_image_links_count = 0 successfully_embedded_count = 0 def replacer(match): nonlocal processed_image_links_count, successfully_embedded_count # Allow modification alt_text = match.group(1) original_src = match.group(2) title_text = match.group(3) if match.group(3) else "" if original_src.startswith("data:"): # Already embedded return match.group(0) processed_image_links_count += 1 if VERBOSE: file_operation_messages.append(f" Found image link: {original_src[:70]}... in {md_file_path.name}") base64_uri, image_status_msg = image_to_base64(original_src, md_file_path) if image_status_msg: # Add any messages from image_to_base64 # Only add detailed image status if VERBOSE, or if it's an error/warning if VERBOSE or "ERROR" in image_status_msg.upper() or "WARNING" in image_status_msg.upper(): file_operation_messages.append(image_status_msg) if base64_uri: successfully_embedded_count += 1 new_tag = f"![{alt_text}]({base64_uri}" if title_text: new_tag += f' "{title_text}"' new_tag += ")" if VERBOSE: file_operation_messages.append(f" Embedded: {original_src[:50]}...") return new_tag else: # If embedding failed and no specific error was added from image_to_base64, add a generic one if not any(msg for msg in file_operation_messages if original_src in msg and ("ERROR" in msg.upper() or "WARNING" in msg.upper())): file_operation_messages.append(f" WARNING: Failed to embed {original_src[:70]}... (in {md_file_path.name}). Kept original.") return match.group(0) # Return original match if embedding fails modified_content = image_pattern.sub(replacer, content) with open(output_file_path, "w", encoding="utf-8") as f: f.write(modified_content) # Summary for this file summary_msg = f"Finished: {md_file_path.name}." if processed_image_links_count > 0: summary_msg += f" (Found: {processed_image_links_count}, Embedded: {successfully_embedded_count}" failures = processed_image_links_count - successfully_embedded_count if failures > 0: summary_msg += f", Failed/Skipped: {failures}" summary_msg += ")" elif VERBOSE: # Only if verbose and no images were found summary_msg += " (No image links applicable for embedding)." file_operation_messages.append(summary_msg) return True, file_operation_messages except Exception as e: err_msg = f" Major ERROR processing file {md_file_path.name}: {e}" if VERBOSE: import traceback err_msg += "\n" + traceback.format_exc() file_operation_messages.append(err_msg) return False, file_operation_messages def main(): """ Main function to find Markdown files and process them. """ OUTPUT_DIR.mkdir(parents=True, exist_ok=True) current_dir = Path.cwd() md_files = [ p for p in current_dir.rglob("*.md") if not str(p.resolve()).startswith(str(OUTPUT_DIR.resolve())) ] if not md_files: print("No Markdown files found in the current directory (excluding 'output' directory).") return print(f"Found {len(md_files)} Markdown files to process. VERBOSE output is {'ON' if VERBOSE else 'OFF'}.") successful_count = 0 failed_count = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_file = { executor.submit(process_markdown_file, md_file, OUTPUT_DIR): md_file for md_file in md_files } for future in tqdm(concurrent.futures.as_completed(future_to_file), total=len(md_files), desc="Embedding Images"): md_file = future_to_file[future] try: success, messages = future.result() if messages: for msg in messages: # Use tqdm.write to print messages without breaking the progress bar # Add a file context to messages if not already clear if md_file.name not in msg and VERBOSE: # Check helps avoid redundant file name prints tqdm.write(f"[{md_file.name}] {msg}") else: tqdm.write(msg) if success: successful_count +=1 else: failed_count += 1 # The error message should already be in 'messages' and printed by tqdm.write except Exception as exc: # Catches exceptions from the worker task logic itself, if not handled by process_markdown_file tqdm.write(f" Critical unhandled exception for {md_file.name}: {exc}") failed_count += 1 if VERBOSE: import traceback tqdm.write(traceback.format_exc()) print("\n--- Summary ---") print(f"Successfully processed files: {successful_count}") print(f"Files with errors/failures: {failed_count}") print(f"Output files are in: {OUTPUT_DIR.resolve()}") if __name__ == "__main__": main()