Last active
          May 27, 2025 10:01 
        
      - 
      
- 
        Save jeonghopark/f986da8322f0ded972d45bc7de85e7c8 to your computer and use it in GitHub Desktop. 
    video_tracker_core.py
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | # ===== Video Tracker Core Library ===== | |
| # Komplexer Implementierungsteil - Datei zum Hochladen auf Gist | |
| import matplotlib | |
| matplotlib.use('Agg') | |
| import os | |
| import sys | |
| import re | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import torch | |
| from PIL import Image | |
| import cv2 | |
| import time | |
| from transformers import Owlv2Processor, Owlv2ForObjectDetection | |
| from IPython.display import display, clear_output | |
| from google.colab import files | |
| from filterpy.kalman import KalmanFilter | |
| # ===== SORT-Tracker-Implementierung ===== | |
| def linear_assignment(cost_matrix): | |
| """Lineare Zuordnung - Ungarischer Algorithmus""" | |
| try: | |
| import lap | |
| _, x, y = lap.lapjv(cost_matrix, extend_cost=True) | |
| return np.array([[y[i],i] for i in x if i >= 0]) | |
| except ImportError: | |
| from scipy.optimize import linear_sum_assignment | |
| x, y = linear_sum_assignment(cost_matrix) | |
| return np.array(list(zip(x, y))) | |
| def iou_batch(bb_test, bb_gt): | |
| """Batch-IoU-Berechnung""" | |
| bb_gt = np.expand_dims(bb_gt, 0) | |
| bb_test = np.expand_dims(bb_test, 1) | |
| xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0]) | |
| yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1]) | |
| xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2]) | |
| yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3]) | |
| w = np.maximum(0., xx2 - xx1) | |
| h = np.maximum(0., yy2 - yy1) | |
| wh = w * h | |
| o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1]) | |
| + (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh) | |
| return(o) | |
| def convert_bbox_to_z(bbox): | |
| """Bounding Box in Mittelpunktdarstellung umwandeln""" | |
| w = bbox[2] - bbox[0] | |
| h = bbox[3] - bbox[1] | |
| x = bbox[0] + w/2. | |
| y = bbox[1] + h/2. | |
| s = w * h | |
| r = w / float(h) | |
| return np.array([x, y, s, r]).reshape((4, 1)) | |
| def convert_x_to_bbox(x, score=None): | |
| """Mittelpunktdarstellung in Bounding Box umwandeln""" | |
| w = np.sqrt(x[2] * x[3]) | |
| h = x[2] / w | |
| if(score==None): | |
| return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.]).reshape((1,4)) | |
| else: | |
| return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.,score]).reshape((1,5)) | |
| class KalmanBoxTracker(object): | |
| """Einzelobjekt-Tracker basierend auf Kalman-Filter""" | |
| count = 0 | |
| def __init__(self, bbox): | |
| self.kf = KalmanFilter(dim_x=7, dim_z=4) | |
| self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],[0,0,0,1,0,0,0], [0,0,0,0,1,0,0],[0,0,0,0,0,1,0],[0,0,0,0,0,0,1]]) | |
| self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],[0,0,1,0,0,0,0],[0,0,0,1,0,0,0]]) | |
| self.kf.R[2:,2:] *= 10. | |
| self.kf.P[4:,4:] *= 1000. | |
| self.kf.P *= 10. | |
| self.kf.Q[-1,-1] *= 0.01 | |
| self.kf.Q[4:,4:] *= 0.01 | |
| self.kf.x[:4] = convert_bbox_to_z(bbox) | |
| self.time_since_update = 0 | |
| self.id = KalmanBoxTracker.count | |
| KalmanBoxTracker.count += 1 | |
| self.history = [] | |
| self.hits = 0 | |
| self.hit_streak = 0 | |
| self.age = 0 | |
| def update(self, bbox): | |
| self.time_since_update = 0 | |
| self.history = [] | |
| self.hits += 1 | |
| self.hit_streak += 1 | |
| self.kf.update(convert_bbox_to_z(bbox)) | |
| def predict(self): | |
| if((self.kf.x[6]+self.kf.x[2])<=0): | |
| self.kf.x[6] *= 0.0 | |
| self.kf.predict() | |
| self.age += 1 | |
| if(self.time_since_update>0): | |
| self.hit_streak = 0 | |
| self.time_since_update += 1 | |
| self.history.append(convert_x_to_bbox(self.kf.x)) | |
| return self.history[-1] | |
| def get_state(self): | |
| return convert_x_to_bbox(self.kf.x) | |
| def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3): | |
| """Erkennungen mit Trackern verknüpfen""" | |
| if(len(trackers)==0): | |
| return np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,5),dtype=int) | |
| iou_matrix = iou_batch(detections, trackers) | |
| if min(iou_matrix.shape) > 0: | |
| a = (iou_matrix > iou_threshold).astype(np.int32) | |
| if a.sum(1).max() == 1 and a.sum(0).max() == 1: | |
| matched_indices = np.stack(np.where(a), axis=1) | |
| else: | |
| matched_indices = linear_assignment(-iou_matrix) | |
| else: | |
| matched_indices = np.empty(shape=(0,2)) | |
| unmatched_detections = [] | |
| for d, det in enumerate(detections): | |
| if(d not in matched_indices[:,0]): | |
| unmatched_detections.append(d) | |
| unmatched_trackers = [] | |
| for t, trk in enumerate(trackers): | |
| if(t not in matched_indices[:,1]): | |
| unmatched_trackers.append(t) | |
| matches = [] | |
| for m in matched_indices: | |
| if(iou_matrix[m[0], m[1]]<iou_threshold): | |
| unmatched_detections.append(m[0]) | |
| unmatched_trackers.append(m[1]) | |
| else: | |
| matches.append(m.reshape(1,2)) | |
| if(len(matches)==0): | |
| matches = np.empty((0,2),dtype=int) | |
| else: | |
| matches = np.concatenate(matches,axis=0) | |
| return matches, np.array(unmatched_detections), np.array(unmatched_trackers) | |
| class Sort(object): | |
| """SORT-Tracker-Hauptklasse""" | |
| def __init__(self, max_age=1, min_hits=3, iou_threshold=0.3): | |
| self.max_age = max_age | |
| self.min_hits = min_hits | |
| self.iou_threshold = iou_threshold | |
| self.trackers = [] | |
| self.frame_count = 0 | |
| def update(self, dets=np.empty((0, 5))): | |
| self.frame_count += 1 | |
| trks = np.zeros((len(self.trackers), 5)) | |
| to_del = [] | |
| ret = [] | |
| for t, trk in enumerate(trks): | |
| pos = self.trackers[t].predict()[0] | |
| trk[:] = [pos[0], pos[1], pos[2], pos[3], 0] | |
| if np.any(np.isnan(pos)): | |
| to_del.append(t) | |
| trks = np.ma.compress_rows(np.ma.masked_invalid(trks)) | |
| for t in reversed(to_del): | |
| self.trackers.pop(t) | |
| matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets, trks, self.iou_threshold) | |
| for m in matched: | |
| self.trackers[m[1]].update(dets[m[0], :]) | |
| for i in unmatched_dets: | |
| trk = KalmanBoxTracker(dets[i,:]) | |
| self.trackers.append(trk) | |
| i = len(self.trackers) | |
| for trk in reversed(self.trackers): | |
| d = trk.get_state()[0] | |
| if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits): | |
| ret.append(np.concatenate((d,[trk.id+1])).reshape(1,-1)) | |
| i -= 1 | |
| if(trk.time_since_update > self.max_age): | |
| self.trackers.pop(i) | |
| if(len(ret)>0): | |
| return np.concatenate(ret) | |
| return np.empty((0,5)) | |
| # ===== Hilfsfunktionen ===== | |
| def show_frame_colab(frame, title="Frame", figsize=(12, 8)): | |
| """Frame in Colab anzeigen""" | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| plt.figure(figsize=figsize) | |
| plt.imshow(frame_rgb) | |
| plt.title(title, fontsize=14) | |
| plt.axis('off') | |
| plt.show() | |
| plt.close() | |
| def create_video_preview(frames, max_frames=4): | |
| """Vorschau für Videoframes erstellen""" | |
| if not frames: | |
| print("Keine Frames zum Anzeigen vorhanden.") | |
| return | |
| step = max(1, len(frames) // max_frames) | |
| preview_frames = frames[::step][:max_frames] | |
| cols = min(2, len(preview_frames)) | |
| rows = (len(preview_frames) + cols - 1) // cols | |
| fig, axes = plt.subplots(rows, cols, figsize=(15, 5*rows)) | |
| if len(preview_frames) == 1: | |
| axes = [axes] | |
| elif rows == 1: | |
| axes = list(axes) if cols > 1 else [axes] | |
| else: | |
| axes = axes.flatten() | |
| for i, frame in enumerate(preview_frames): | |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| axes[i].imshow(frame_rgb) | |
| axes[i].set_title(f"Frame {i*step + 1}", fontsize=12) | |
| axes[i].axis('off') | |
| for i in range(len(preview_frames), len(axes)): | |
| axes[i].axis('off') | |
| plt.tight_layout() | |
| plt.show() | |
| plt.close() | |
| def rotate_frame(frame, rotation): | |
| """Frame drehen""" | |
| if rotation == 90: | |
| return cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE) | |
| elif rotation == 180: | |
| return cv2.rotate(frame, cv2.ROTATE_180) | |
| elif rotation == 270: | |
| return cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE) | |
| else: | |
| return frame | |
| def safe_filename(filename): | |
| """Dateinamen sicher machen""" | |
| safe_name = filename.replace(' ', '_') | |
| safe_name = re.sub(r'[()[\]{}]', '', safe_name) | |
| safe_name = re.sub(r'[^a-zA-Z0-9._-]', '', safe_name) | |
| safe_name = re.sub(r'_+', '_', safe_name) | |
| safe_name = safe_name.strip('_') | |
| return safe_name | |
| def safe_download(filename): | |
| """Sicherer Dateidownload""" | |
| try: | |
| if os.path.exists(filename): | |
| print(f"📁 Datei gefunden: {filename}") | |
| files.download(filename) | |
| return True | |
| safe_name = safe_filename(filename) | |
| if safe_name != filename: | |
| import shutil | |
| if os.path.exists(filename): | |
| shutil.copy2(filename, safe_name) | |
| print(f"🔄 In sicheren Dateinamen kopiert: {safe_name}") | |
| files.download(safe_name) | |
| return True | |
| current_files = os.listdir('.') | |
| processed_videos = [f for f in current_files if f.startswith('processed_') and f.lower().endswith('.mp4')] | |
| if processed_videos: | |
| latest_file = max(processed_videos, key=lambda x: os.path.getctime(x)) | |
| print(f"🔄 Versuche, die neueste Datei herunterzuladen: {latest_file}") | |
| safe_latest = safe_filename(latest_file) | |
| if safe_latest != latest_file: | |
| import shutil | |
| shutil.copy2(latest_file, safe_latest) | |
| files.download(safe_latest) | |
| else: | |
| files.download(latest_file) | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"❌ Fehler beim Herunterladen: {e}") | |
| return False | |
| # ===== Bild-Overlay-Funktionen ===== | |
| def prepare_overlay_image(image_path, target_size=(100, 100)): | |
| """Overlay-Bild vorbereiten (inkl. Transparenz)""" | |
| try: | |
| if not os.path.exists(image_path): | |
| print(f"❌ Bild nicht gefunden: {image_path}") | |
| return None | |
| img = cv2.imread(image_path, cv2.IMREAD_UNCHANGED) | |
| if img is None: | |
| print(f"❌ Bild konnte nicht geladen werden: {image_path}") | |
| return None | |
| print(f"✅ Bild geladen: {image_path} - Größe: {img.shape}") | |
| if len(img.shape) == 3: | |
| alpha_channel = np.ones((img.shape[0], img.shape[1], 1), dtype=img.dtype) * 255 | |
| img = np.concatenate([img, alpha_channel], axis=2) | |
| print("🔄 Alpha-Kanal hinzugefügt") | |
| return img | |
| except Exception as e: | |
| print(f"❌ Fehler bei der Bildvorbereitung: {e}") | |
| return None | |
| def overlay_single_image_on_bbox(background, overlay_img, bbox, scale_factor=1.0, alpha=0.8): | |
| """Einzelnes Bild exakt auf Bounding Box überlagern""" | |
| try: | |
| if overlay_img is None: | |
| return background | |
| x1, y1, x2, y2 = map(int, bbox[:4]) | |
| box_width = max(1, x2 - x1) | |
| box_height = max(1, y2 - y1) | |
| new_width = max(1, int(box_width * scale_factor)) | |
| new_height = max(1, int(box_height * scale_factor)) | |
| overlay_resized = cv2.resize(overlay_img, (new_width, new_height)) | |
| center_x = (x1 + x2) // 2 | |
| center_y = (y1 + y2) // 2 | |
| start_x = max(0, center_x - new_width // 2) | |
| start_y = max(0, center_y - new_height // 2) | |
| end_x = min(background.shape[1], start_x + new_width) | |
| end_y = min(background.shape[0], start_y + new_height) | |
| actual_width = end_x - start_x | |
| actual_height = end_y - start_y | |
| if actual_width <= 0 or actual_height <= 0: | |
| return background | |
| overlay_crop = overlay_resized[:actual_height, :actual_width] | |
| if len(overlay_crop.shape) == 3 and overlay_crop.shape[2] == 4: | |
| overlay_bgr = overlay_crop[:, :, :3] | |
| overlay_alpha = overlay_crop[:, :, 3] / 255.0 * alpha | |
| for c in range(3): | |
| background[start_y:end_y, start_x:end_x, c] = ( | |
| overlay_alpha * overlay_bgr[:, :, c] + | |
| (1 - overlay_alpha) * background[start_y:end_y, start_x:end_x, c] | |
| ) | |
| else: | |
| roi = background[start_y:end_y, start_x:end_x] | |
| blended = cv2.addWeighted(roi, 1-alpha, overlay_crop, alpha, 0) | |
| background[start_y:end_y, start_x:end_x] = blended | |
| return background | |
| except Exception as e: | |
| print(f"⚠️ Fehler beim Anwenden des Overlays: {e}") | |
| return background | |
| def create_sample_overlay(): | |
| """Beispiel-Overlay-Bild zum Testen erstellen""" | |
| print("🎨 Beispiel-Overlay-Bild wird erstellt...") | |
| img = np.zeros((200, 200, 4), dtype=np.uint8) | |
| for i in range(200): | |
| hue = int(i * 180 / 200) | |
| color = cv2.cvtColor(np.uint8([[[hue, 255, 255]]]), cv2.COLOR_HSV2BGR)[0][0] | |
| img[i, :, :3] = color | |
| img[i, :, 3] = 200 | |
| cv2.putText(img, "OVERLAY", (30, 100), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (255, 255, 255), 3) | |
| cv2.putText(img, "IMAGE", (50, 140), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 0), 2) | |
| cv2.rectangle(img, (5, 5), (195, 195), (255, 255, 255), 3) | |
| cv2.imwrite("sample_overlay.png", img) | |
| print("✅ Beispielbild gespeichert: sample_overlay.png") | |
| return "sample_overlay.png" | |
| # ===== Hauptfunktion zur Videoverarbeitung ===== | |
| def process_video_with_continuous_overlay(input_path, output_path=None, | |
| overlay_image_path=None, | |
| detection_interval=2, manual_rotation=90, | |
| overlay_scale=1.0, overlay_alpha=0.8, | |
| enable_preview=True, | |
| custom_queries=None): | |
| """Videoverarbeitung mit kontinuierlichem Bild-Overlay (verhindert Frame-Verluste)""" | |
| print("🚀 Starte Videoverarbeitung mit kontinuierlichem Bild-Overlay!") | |
| print("🔧 Kontinuierliches Overlay für alle Frames garantiert!") | |
| print("="*60) | |
| # Laden des KI-Modells | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| print(f"🔥 Verwendetes Gerät: {device}") | |
| try: | |
| print("📥 OWLv2-Modell wird heruntergeladen...") | |
| model = Owlv2ForObjectDetection.from_pretrained("google/owlv2-base-patch16-ensemble") | |
| processor = Owlv2Processor.from_pretrained("google/owlv2-base-patch16-ensemble") | |
| model = model.to(device) | |
| print("✅ Ensemble-Modell erfolgreich geladen") | |
| except Exception as e: | |
| print(f"⚠️ Ensemble fehlgeschlagen, versuche Basis-Modell: {e}") | |
| model = Owlv2ForObjectDetection.from_pretrained("google/owlv2-base-patch16") | |
| processor = Owlv2Processor.from_pretrained("google/owlv2-base-patch16") | |
| model = model.to(device) | |
| print("✅ Basis-Modell erfolgreich geladen") | |
| # Zu erkennende Objekttypen (geänderter Teil) | |
| if custom_queries and isinstance(custom_queries, list) and len(custom_queries) > 0: | |
| text_queries = custom_queries | |
| print(f"🔍 Benutzerdefinierte Abfragen verwenden: {text_queries}") | |
| else: | |
| # Standard-Abfrageliste | |
| text_queries = [ | |
| "signboard", "sign", "shop sign", "store sign", "billboard", | |
| "advertisement", "neon sign", "restaurant sign", "cafe sign" | |
| ] | |
| print(f"🔍 Standardabfragen verwenden (Schilder/Werbung): {text_queries}") | |
| # Farbdefinition (BGR) | |
| colors = [ | |
| (0, 0, 255), # Rot | |
| (255, 0, 0), # Blau | |
| (0, 255, 0), # Grün | |
| (0, 255, 255), # Gelb | |
| (255, 0, 255), # Magenta | |
| (255, 128, 0), # Türkis | |
| (0, 0, 128), # Dunkelrot | |
| (0, 128, 0), # Dunkelgrün | |
| (128, 0, 0), # Dunkelblau | |
| (0, 128, 128) # Dunkelgelb | |
| ] | |
| # Overlay-Bild vorbereiten | |
| overlay_img = None | |
| if overlay_image_path: | |
| overlay_img = prepare_overlay_image(overlay_image_path) | |
| if overlay_img is None: | |
| print("⚠️ Laden des Overlay-Bildes fehlgeschlagen, verwende Standardeffekt...") | |
| # SORT-Tracker initialisieren - Tolerantere Einstellungen | |
| tracker = Sort( | |
| max_age=100, # Längere Beibehaltungszeit (100 Frames) | |
| min_hits=1, # Sofortige Anzeige | |
| iou_threshold=0.05 # Toleranteres Matching | |
| ) | |
| print(f"🎯 SORT-Einstellungen: max_age=100, min_hits=1, iou_threshold=0.05") | |
| # Video öffnen | |
| cap = cv2.VideoCapture(input_path) | |
| if not cap.isOpened(): | |
| print("❌ Video konnte nicht geöffnet werden") | |
| return False | |
| # Videoinformationen | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| print(f"📹 Video: {width}x{height}, {fps:.1f}fps, {total_frames} Frames") | |
| print(f"🖼️ Overlay: Skalierung={overlay_scale}, Transparenz={overlay_alpha}") | |
| # Ausgabegröße (Rotation berücksichtigen) | |
| if manual_rotation in [90, 270]: | |
| out_width, out_height = height, width | |
| else: | |
| out_width, out_height = width, height | |
| # Ausgabe-Videoeinstellungen | |
| out = None | |
| if output_path: | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (out_width, out_height)) | |
| print(f"💾 Ausgabe: {output_path}") | |
| # Verarbeitungsvariablen initialisieren | |
| frame_count = 0 | |
| tracked_objects = {} | |
| sample_frames = [] | |
| detection_count = 0 | |
| # 🔥 Kern: Speichern des kontinuierlichen Tracking-Status | |
| persistent_tracked_boxes = [] | |
| last_detection_frame = 0 | |
| print("🔄 Verarbeitung gestartet - Kontinuierliches Overlay garantiert...") | |
| print("-" * 50) | |
| try: | |
| while cap.isOpened() and frame_count < total_frames: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Rotation anwenden | |
| if manual_rotation != 0: | |
| frame = rotate_frame(frame, manual_rotation) | |
| frame_count += 1 | |
| # Fortschritt anzeigen | |
| if frame_count % 30 == 0: | |
| progress = (frame_count / total_frames) * 100 | |
| active_trackers = len(tracker.trackers) | |
| persistent_count = len(persistent_tracked_boxes) | |
| print(f"⏳ {progress:.1f}% | Frame {frame_count}/{total_frames} | Aktiv:{active_trackers} | Persistent:{persistent_count}") | |
| # 🎯 Objekterkennung (periodisch) | |
| detect_this_frame = (frame_count % detection_interval == 0) | |
| if detect_this_frame: | |
| try: | |
| # KI-Modell ausführen | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| pil_image = Image.fromarray(rgb_frame) | |
| inputs = processor(text=text_queries, images=pil_image, return_tensors="pt") | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| target_sizes = torch.tensor([pil_image.size[::-1]]) | |
| import warnings | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore") | |
| results = processor.post_process_object_detection( | |
| outputs, target_sizes=target_sizes, threshold=0.15 | |
| ) | |
| # Erkennungsergebnisse umwandeln | |
| current_detections = [] | |
| detection_labels = [] | |
| for result in results: | |
| boxes = result["boxes"] | |
| scores = result["scores"] | |
| labels = result["labels"] | |
| for box, score, label in zip(boxes, scores, labels): | |
| if score >= 0.15: | |
| box = box.tolist() | |
| current_detections.append([*box, score.item()]) | |
| detection_labels.append(label.item()) | |
| # SORT-Tracking aktualisieren | |
| if current_detections: | |
| current_detections = np.array(current_detections) | |
| tracked_boxes = tracker.update(current_detections) | |
| # 🔥 Kontinuierlichen Tracking-Status aktualisieren | |
| persistent_tracked_boxes = tracked_boxes.copy() | |
| last_detection_frame = frame_count | |
| if len(tracked_boxes) > 0: | |
| print(f"🎯 Frame {frame_count}: {len(current_detections)} erkannt → {len(tracked_boxes)} verfolgt") | |
| # ID-Label-Mapping | |
| if len(tracked_boxes) > 0 and len(detection_labels) > 0: | |
| for i, box in enumerate(tracked_boxes): | |
| obj_id = int(box[4]) | |
| if obj_id not in tracked_objects and i < len(detection_labels): | |
| tracked_objects[obj_id] = detection_labels[i] | |
| detection_count += len(tracked_boxes) | |
| else: | |
| # Keine Erkennung - Leeres Update | |
| tracked_boxes = tracker.update(np.empty((0, 5))) | |
| except Exception as e: | |
| print(f"⚠️ Frame {frame_count} Erkennungsfehler: {str(e)[:50]}...") | |
| # Bei Fehler persistenten Status beibehalten | |
| tracked_boxes = persistent_tracked_boxes.copy() | |
| else: | |
| # 🔥 Frame ohne Erkennung: Persistenten Tracking-Status verwenden | |
| try: | |
| # Nur SORT-Vorhersage durchführen | |
| predicted_boxes = tracker.update(np.empty((0, 5))) | |
| # Wenn vorhergesagte Boxen vorhanden, verwenden, sonst persistenten Status verwenden | |
| if len(predicted_boxes) > 0: | |
| tracked_boxes = predicted_boxes | |
| # Persistenten Status ebenfalls aktualisieren (Positionsprognose berücksichtigen) | |
| persistent_tracked_boxes = predicted_boxes.copy() | |
| else: | |
| # Bei Vorhersagefehler persistenten Status verwenden | |
| tracked_boxes = persistent_tracked_boxes.copy() | |
| # 🔧 Position leicht aktualisieren (natürliche Bewegung) | |
| frame_diff = frame_count - last_detection_frame | |
| if frame_diff > 50: # Mehr als 50 Frames keine Erkennung | |
| # Tracking-Status zurücksetzen | |
| persistent_tracked_boxes = [] | |
| tracked_boxes = [] | |
| except: | |
| # Auch bei Ausnahme persistenten Status verwenden | |
| tracked_boxes = persistent_tracked_boxes.copy() | |
| # 🎨 Kontinuierliches Bild-Overlay anwenden | |
| overlay_applied_count = 0 | |
| if len(tracked_boxes) > 0: | |
| for box in tracked_boxes: | |
| try: | |
| x1, y1, x2, y2, obj_id = box | |
| obj_id = int(obj_id) | |
| if x2 > x1 and y2 > y1: | |
| if overlay_img is not None: | |
| # 🖼️ Benutzerdefiniertes Bild-Overlay | |
| frame = overlay_single_image_on_bbox( | |
| frame, overlay_img, box, | |
| scale_factor=overlay_scale, | |
| alpha=overlay_alpha | |
| ) | |
| overlay_applied_count += 1 | |
| else: | |
| # Standard-Weichzeichner-Effekt | |
| roi = frame[int(y1):int(y2), int(x1):int(x2)] | |
| if roi.size > 0: | |
| kernel_size = max(3, min(51, int((x2-x1)//5))) | |
| if kernel_size % 2 == 0: | |
| kernel_size += 1 | |
| blurred = cv2.GaussianBlur(roi, (kernel_size, kernel_size), 0) | |
| frame[int(y1):int(y2), int(x1):int(x2)] = blurred | |
| overlay_applied_count += 1 | |
| # ID-Label anzeigen | |
| color = colors[obj_id % len(colors)] | |
| cv2.putText(frame, f"ID: {obj_id}", (int(x1), int(y1) - 10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) | |
| except Exception as e: | |
| continue | |
| # 🔢 Informationen anzeigen | |
| info_y = 30 | |
| cv2.putText(frame, f"Frame: {frame_count}/{total_frames}", (10, info_y), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) | |
| info_y += 30 | |
| cv2.putText(frame, f"Tracked: {len(tracked_boxes)} | Overlays: {overlay_applied_count}", (10, info_y), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) | |
| info_y += 30 | |
| detect_status = "DETECT" if detect_this_frame else "PREDICT" | |
| cv2.putText(frame, f"Mode: {detect_status}", (10, info_y), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2) | |
| info_y += 30 | |
| cv2.putText(frame, f"Persistent: {len(persistent_tracked_boxes)}", (10, info_y), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 2) | |
| # Speichern | |
| if out: | |
| out.write(frame) | |
| # Beispiel-Frames speichern | |
| if enable_preview and len(sample_frames) < 4: | |
| frame_interval = max(1, total_frames // 4) | |
| if frame_count % frame_interval == 0: | |
| sample_frames.append(frame.copy()) | |
| except KeyboardInterrupt: | |
| print("Vom Benutzer unterbrochen") | |
| except Exception as e: | |
| print(f"Fehler während der Verarbeitung: {e}") | |
| finally: | |
| # Aufräumen | |
| cap.release() | |
| if out: | |
| out.release() | |
| print("-" * 50) | |
| print(f"✅ Verarbeitung mit kontinuierlichem Overlay abgeschlossen!") | |
| print(f"🎯 Gesamtzahl der Erkennungen: {detection_count}") | |
| print(f"🔢 Endgültige Anzahl der Tracker: {len(tracker.trackers)}") | |
| print(f"📊 Verfolgte eindeutige Objekte: {len(tracked_objects)}") | |
| # Vorschau anzeigen | |
| if enable_preview and sample_frames: | |
| print("\n🖼️ Beispiel für Verarbeitungsergebnisse:") | |
| create_video_preview(sample_frames, 4) | |
| return True | |
| print("📦 Video Tracker Core Library geladen!") | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment