#!/usr/bin/env python import os import sys import json import argparse import numpy as np from skimage.metrics import structural_similarity as ssim from skimage.transform import resize from moviepy import VideoFileClip from tqdm import tqdm # --- Parameters You Can Tune --- # The minimum similarity score (from 0 to 1) for a frame to be considered a match. # Higher is stricter. 0.95 is a good starting point. DEFAULT_SIMILARITY_THRESHOLD = 0.95 # The minimum duration of a loop in seconds. Prevents finding tiny, useless loops. DEFAULT_MIN_LOOP_DURATION = 1.5 # To speed up the process, we can downscale frames before comparing them. # A value of 0.25 means we resize to 25% of the original size. DEFAULT_COMPARISON_RESIZE_FACTOR = 0.25 # The directory to save the output media. DEFAULT_OUTPUT_DIRECTORY = "output_loops" # Video quality settings for MP4 and WEBM codecs. # Lower CRF values mean higher quality and larger files. # Default (high quality): 23. Prototype mode: 28. DEFAULT_CRF = 23 PROTO_CRF = 28 # The number of frames to load into memory at once for processing. DEFAULT_FRAME_BUFFER_SIZE = 500 def find_all_suitable_loops(video_path: str, similarity_threshold: float, min_loop_duration: float, comparison_resize_factor: float, frame_buffer_size: int): """ Analyzes a video to find all suitable start and end timestamps for a seamless loop. This version uses a frame buffer to be memory-efficient while maintaining performance. Args: video_path: Path to the video file. similarity_threshold: The minimum similarity score for a frame to be considered a match. min_loop_duration: The minimum duration of a loop in seconds. comparison_resize_factor: The factor to downscale frames for comparison. frame_buffer_size: The number of frames to load into memory at once. Returns: A list of dictionaries containing all suitable loops, or an empty list if no suitable loops are found. """ print(f"Analyzing '{video_path}' for all suitable looping pairs...") try: clip = VideoFileClip(video_path) frame_rate = clip.fps num_frames = int(clip.duration * frame_rate) except Exception as e: print(f"Error loading video file: {e}") return [] min_loop_frames = int(min_loop_duration * frame_rate) all_found_loops = [] print("Step 1 of 2: Processing video with frame buffer...") resized_frames = [] # Process the entire video in chunks for i in tqdm(range(0, num_frames, frame_buffer_size), desc="Buffering frames"): end_index = min(i + frame_buffer_size, num_frames) frames_to_process = [clip.get_frame(t / frame_rate) for t in range(i, end_index)] resized_chunk = [resize(frame, (int(frame.shape[0] * comparison_resize_factor), int(frame.shape[1] * comparison_resize_factor)), anti_aliasing=True) for frame in frames_to_process] resized_frames.extend(resized_chunk) clip.close() print("Step 2 of 2: Comparing buffered frames to find suitable loops.") with tqdm(total=len(resized_frames) * (len(resized_frames) - min_loop_frames), desc="Comparing frames") as pbar: for i in range(len(resized_frames)): for j in range(i + min_loop_frames, len(resized_frames)): frame1 = resized_frames[i] frame2 = resized_frames[j] # Calculate SSIM score data_range = frame1.max() - frame1.min() ssim_score = ssim(frame1, frame2, channel_axis=-1, win_size=3, data_range=data_range) if ssim_score >= similarity_threshold: all_found_loops.append({ 'start_time': i / frame_rate, 'end_time': j / frame_rate, 'score': ssim_score }) pbar.update(1) print(f"Analysis complete. Found {len(all_found_loops)} suitable loops.") return sorted(all_found_loops, key=lambda x: x['score'], reverse=True) def create_looping_media(video_path: str, start_time: float, end_time: float, output_dir: str, output_format: str = 'mp4', mute_audio: bool = False, prototype_mode: bool = False, crf: int = None, preset: str = None, scale: int = None, fps: int = None): """ Creates a looping video or GIF from a portion of the original video. Args: video_path: Path to the input video file. start_time: The timestamp (in seconds) where the loop should start. end_time: The timestamp (in seconds) where the loop should end. output_dir: The directory to save the output media. output_format: The format of the output file ('mp4', 'webm', or 'gif'). mute_audio: If True, the output video will have no audio. prototype_mode: If True, uses a lower quality setting for smaller files. crf: The Constant Rate Factor for video quality. Overrides prototype_mode if set. scale: The desired output video width in pixels. fps: The desired output video frames per second. Returns: True if the file was created successfully, False otherwise. """ if not os.path.exists(output_dir): os.makedirs(output_dir) base_name = os.path.splitext(os.path.basename(video_path))[0] output_path = os.path.join(output_dir, f"{base_name}_loop.{output_format}") try: clip = VideoFileClip(video_path) # Apply scaling and FPS changes on a single pass to prevent quality loss. if scale: clip = clip.resized(width=scale) if fps: clip = clip.set_fps(fps) # Use .subclipped() to extract the specific loop segment loop_clip = clip.subclipped(start_time, end_time) if output_format in ['mp4', 'webm']: # Determine if audio should be included. include_audio = (clip.audio is not None) and not mute_audio codec = "libx264" if output_format == 'mp4' else "libvpx-vp9" # Set CRF value based on command-line arguments crf_value = crf if crf is not None else (PROTO_CRF if prototype_mode else DEFAULT_CRF) preset_value = preset if preset is not None else ('fast' if prototype_mode else 'veryslow') ffmpeg_params = ['-g', '128', '-crf', str(crf_value), '-preset', preset_value] if output_format == 'webm': ffmpeg_params = ['-crf', str(crf_value), '-b:v', '0', '-cpu-used', '0', '-g', '128'] loop_clip.write_videofile(output_path, codec=codec, audio=include_audio, logger='bar', ffmpeg_params=ffmpeg_params) elif output_format == 'gif': # Export as a high-quality GIF (GIFs do not have audio) loop_clip.write_gif(output_path, fps=clip.fps) print(f"Successfully saved to '{output_path}'") return True except Exception as e: print(f"Error creating media: {e}") return False finally: if 'clip' in locals() and clip.reader: clip.close() # --- Main Execution --- if __name__ == "__main__": parser = argparse.ArgumentParser(description=""" Finds a seamless loop in a video and creates a looping video or GIF. """) parser.add_argument("input_video", type=str, help="Path to the input video file.") parser.add_argument("--output_dir", type=str, default=DEFAULT_OUTPUT_DIRECTORY, help=f"Directory to save the output media. Defaults to '{DEFAULT_OUTPUT_DIRECTORY}'.") parser.add_argument("--similarity_threshold", type=float, default=DEFAULT_SIMILARITY_THRESHOLD, help=f"Minimum similarity score (0 to 1). Defaults to {DEFAULT_SIMILARITY_THRESHOLD}.") parser.add_argument("--min_loop_duration", type=float, default=DEFAULT_MIN_LOOP_DURATION, help=f"Minimum duration of the loop in seconds. Defaults to {DEFAULT_MIN_LOOP_DURATION}.") parser.add_argument("--resize_factor", type=float, default=DEFAULT_COMPARISON_RESIZE_FACTOR, help=f"Factor to downscale frames for faster comparison (e.g., 0.5 for 50%%). Defaults to {DEFAULT_COMPARISON_RESIZE_FACTOR}.") parser.add_argument("--format", type=str, choices=['mp4', 'webm', 'gif'], default='mp4', help="Output media format. Choices are 'mp4', 'webm', or 'gif'. Defaults to 'mp4'.") parser.add_argument("--mute", action="store_true", help="If set, the output video will not have audio. This option is ignored for GIF output.") parser.add_argument("--proto", action="store_true", help="If set, uses a lower quality setting for smaller files, suitable for prototypes.") parser.add_argument("--crf", type=int, help=f"Constant Rate Factor (CRF) for video quality. Lower values mean higher quality. If set, this value overrides the --proto flag.") parser.add_argument("--shortest", action="store_true", help="Creates the shortest valid loop instead of the highest-scoring one.") parser.add_argument("--longest", action="store_true", help="Creates the longest valid loop instead of the highest-scoring one.") parser.add_argument("--scale", type=int, help="Sets the output video width in pixels, maintaining aspect ratio. Useful for reducing file size.") parser.add_argument("--fps", type=int, help="Sets the output video frames per second. Useful for reducing file size.") parser.add_argument("--buffer_size", type=int, default=DEFAULT_FRAME_BUFFER_SIZE, help=f"Number of frames to buffer for in-memory processing. A larger number is faster but uses more RAM. Defaults to {DEFAULT_FRAME_BUFFER_SIZE}.") args = parser.parse_args() if args.shortest and args.longest: print("Error: Cannot use both --shortest and --longest flags together.") sys.exit(1) # Create the output directory if it doesn't exist os.makedirs(args.output_dir, exist_ok=True) CACHE_FILE = os.path.join(args.output_dir, f"{os.path.basename(args.input_video)}_cached_loops.json") all_loops = [] # Check for a cached list of loops if os.path.exists(CACHE_FILE): try: with open(CACHE_FILE, 'r') as f: all_loops = json.load(f) if isinstance(all_loops, list) and all_loops: print(f"Found {len(all_loops)} cached loops. Skipping analysis.") except (IOError, json.JSONDecodeError) as e: print(f"Error reading cache file, will re-analyze: {e}") # If no cached data, find all suitable loops and save them if not all_loops: all_loops = find_all_suitable_loops( args.input_video, args.similarity_threshold, args.min_loop_duration, args.resize_factor, args.buffer_size ) if all_loops: try: with open(CACHE_FILE, 'w') as f: json.dump(all_loops, f, indent=4) print(f"Saved {len(all_loops)} loops to cache file: {CACHE_FILE}") except IOError as e: print(f"Warning: Could not save cache file: {e}") # Select the loop to create based on user arguments loop_to_create = None if all_loops: if args.shortest: loop_to_create = min(all_loops, key=lambda x: x['end_time'] - x['start_time']) print("Selected shortest loop.") elif args.longest: loop_to_create = max(all_loops, key=lambda x: x['end_time'] - x['start_time']) print("Selected longest loop.") else: # Default to the highest-scoring loop loop_to_create = all_loops[0] print("Selected highest-scoring loop.") if loop_to_create: creation_success = create_looping_media( args.input_video, loop_to_create['start_time'], loop_to_create['end_time'], args.output_dir, output_format=args.format, mute_audio=args.mute, prototype_mode=args.proto, crf=args.crf, scale=args.scale, fps=args.fps ) if creation_success: sys.exit(0) else: sys.exit(1) else: print("Could not find a suitable loop point.") sys.exit(1)