Skip to content

Instantly share code, notes, and snippets.

@jeonghopark
Last active May 27, 2025 10:01
Show Gist options
  • Save jeonghopark/f986da8322f0ded972d45bc7de85e7c8 to your computer and use it in GitHub Desktop.
Save jeonghopark/f986da8322f0ded972d45bc7de85e7c8 to your computer and use it in GitHub Desktop.
video_tracker_core.py
# ===== Video Tracker Core Library =====
# Komplexer Implementierungsteil - Datei zum Hochladen auf Gist
import matplotlib
matplotlib.use('Agg')
import os
import sys
import re
import numpy as np
import matplotlib.pyplot as plt
import torch
from PIL import Image
import cv2
import time
from transformers import Owlv2Processor, Owlv2ForObjectDetection
from IPython.display import display, clear_output
from google.colab import files
from filterpy.kalman import KalmanFilter
# ===== SORT-Tracker-Implementierung =====
def linear_assignment(cost_matrix):
"""Lineare Zuordnung - Ungarischer Algorithmus"""
try:
import lap
_, x, y = lap.lapjv(cost_matrix, extend_cost=True)
return np.array([[y[i],i] for i in x if i >= 0])
except ImportError:
from scipy.optimize import linear_sum_assignment
x, y = linear_sum_assignment(cost_matrix)
return np.array(list(zip(x, y)))
def iou_batch(bb_test, bb_gt):
"""Batch-IoU-Berechnung"""
bb_gt = np.expand_dims(bb_gt, 0)
bb_test = np.expand_dims(bb_test, 1)
xx1 = np.maximum(bb_test[..., 0], bb_gt[..., 0])
yy1 = np.maximum(bb_test[..., 1], bb_gt[..., 1])
xx2 = np.minimum(bb_test[..., 2], bb_gt[..., 2])
yy2 = np.minimum(bb_test[..., 3], bb_gt[..., 3])
w = np.maximum(0., xx2 - xx1)
h = np.maximum(0., yy2 - yy1)
wh = w * h
o = wh / ((bb_test[..., 2] - bb_test[..., 0]) * (bb_test[..., 3] - bb_test[..., 1])
+ (bb_gt[..., 2] - bb_gt[..., 0]) * (bb_gt[..., 3] - bb_gt[..., 1]) - wh)
return(o)
def convert_bbox_to_z(bbox):
"""Bounding Box in Mittelpunktdarstellung umwandeln"""
w = bbox[2] - bbox[0]
h = bbox[3] - bbox[1]
x = bbox[0] + w/2.
y = bbox[1] + h/2.
s = w * h
r = w / float(h)
return np.array([x, y, s, r]).reshape((4, 1))
def convert_x_to_bbox(x, score=None):
"""Mittelpunktdarstellung in Bounding Box umwandeln"""
w = np.sqrt(x[2] * x[3])
h = x[2] / w
if(score==None):
return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.]).reshape((1,4))
else:
return np.array([x[0]-w/2.,x[1]-h/2.,x[0]+w/2.,x[1]+h/2.,score]).reshape((1,5))
class KalmanBoxTracker(object):
"""Einzelobjekt-Tracker basierend auf Kalman-Filter"""
count = 0
def __init__(self, bbox):
self.kf = KalmanFilter(dim_x=7, dim_z=4)
self.kf.F = np.array([[1,0,0,0,1,0,0],[0,1,0,0,0,1,0],[0,0,1,0,0,0,1],[0,0,0,1,0,0,0], [0,0,0,0,1,0,0],[0,0,0,0,0,1,0],[0,0,0,0,0,0,1]])
self.kf.H = np.array([[1,0,0,0,0,0,0],[0,1,0,0,0,0,0],[0,0,1,0,0,0,0],[0,0,0,1,0,0,0]])
self.kf.R[2:,2:] *= 10.
self.kf.P[4:,4:] *= 1000.
self.kf.P *= 10.
self.kf.Q[-1,-1] *= 0.01
self.kf.Q[4:,4:] *= 0.01
self.kf.x[:4] = convert_bbox_to_z(bbox)
self.time_since_update = 0
self.id = KalmanBoxTracker.count
KalmanBoxTracker.count += 1
self.history = []
self.hits = 0
self.hit_streak = 0
self.age = 0
def update(self, bbox):
self.time_since_update = 0
self.history = []
self.hits += 1
self.hit_streak += 1
self.kf.update(convert_bbox_to_z(bbox))
def predict(self):
if((self.kf.x[6]+self.kf.x[2])<=0):
self.kf.x[6] *= 0.0
self.kf.predict()
self.age += 1
if(self.time_since_update>0):
self.hit_streak = 0
self.time_since_update += 1
self.history.append(convert_x_to_bbox(self.kf.x))
return self.history[-1]
def get_state(self):
return convert_x_to_bbox(self.kf.x)
def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3):
"""Erkennungen mit Trackern verknüpfen"""
if(len(trackers)==0):
return np.empty((0,2),dtype=int), np.arange(len(detections)), np.empty((0,5),dtype=int)
iou_matrix = iou_batch(detections, trackers)
if min(iou_matrix.shape) > 0:
a = (iou_matrix > iou_threshold).astype(np.int32)
if a.sum(1).max() == 1 and a.sum(0).max() == 1:
matched_indices = np.stack(np.where(a), axis=1)
else:
matched_indices = linear_assignment(-iou_matrix)
else:
matched_indices = np.empty(shape=(0,2))
unmatched_detections = []
for d, det in enumerate(detections):
if(d not in matched_indices[:,0]):
unmatched_detections.append(d)
unmatched_trackers = []
for t, trk in enumerate(trackers):
if(t not in matched_indices[:,1]):
unmatched_trackers.append(t)
matches = []
for m in matched_indices:
if(iou_matrix[m[0], m[1]]<iou_threshold):
unmatched_detections.append(m[0])
unmatched_trackers.append(m[1])
else:
matches.append(m.reshape(1,2))
if(len(matches)==0):
matches = np.empty((0,2),dtype=int)
else:
matches = np.concatenate(matches,axis=0)
return matches, np.array(unmatched_detections), np.array(unmatched_trackers)
class Sort(object):
"""SORT-Tracker-Hauptklasse"""
def __init__(self, max_age=1, min_hits=3, iou_threshold=0.3):
self.max_age = max_age
self.min_hits = min_hits
self.iou_threshold = iou_threshold
self.trackers = []
self.frame_count = 0
def update(self, dets=np.empty((0, 5))):
self.frame_count += 1
trks = np.zeros((len(self.trackers), 5))
to_del = []
ret = []
for t, trk in enumerate(trks):
pos = self.trackers[t].predict()[0]
trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
if np.any(np.isnan(pos)):
to_del.append(t)
trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
for t in reversed(to_del):
self.trackers.pop(t)
matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets, trks, self.iou_threshold)
for m in matched:
self.trackers[m[1]].update(dets[m[0], :])
for i in unmatched_dets:
trk = KalmanBoxTracker(dets[i,:])
self.trackers.append(trk)
i = len(self.trackers)
for trk in reversed(self.trackers):
d = trk.get_state()[0]
if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
ret.append(np.concatenate((d,[trk.id+1])).reshape(1,-1))
i -= 1
if(trk.time_since_update > self.max_age):
self.trackers.pop(i)
if(len(ret)>0):
return np.concatenate(ret)
return np.empty((0,5))
# ===== Hilfsfunktionen =====
def show_frame_colab(frame, title="Frame", figsize=(12, 8)):
"""Frame in Colab anzeigen"""
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
plt.figure(figsize=figsize)
plt.imshow(frame_rgb)
plt.title(title, fontsize=14)
plt.axis('off')
plt.show()
plt.close()
def create_video_preview(frames, max_frames=4):
"""Vorschau für Videoframes erstellen"""
if not frames:
print("Keine Frames zum Anzeigen vorhanden.")
return
step = max(1, len(frames) // max_frames)
preview_frames = frames[::step][:max_frames]
cols = min(2, len(preview_frames))
rows = (len(preview_frames) + cols - 1) // cols
fig, axes = plt.subplots(rows, cols, figsize=(15, 5*rows))
if len(preview_frames) == 1:
axes = [axes]
elif rows == 1:
axes = list(axes) if cols > 1 else [axes]
else:
axes = axes.flatten()
for i, frame in enumerate(preview_frames):
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
axes[i].imshow(frame_rgb)
axes[i].set_title(f"Frame {i*step + 1}", fontsize=12)
axes[i].axis('off')
for i in range(len(preview_frames), len(axes)):
axes[i].axis('off')
plt.tight_layout()
plt.show()
plt.close()
def rotate_frame(frame, rotation):
"""Frame drehen"""
if rotation == 90:
return cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
elif rotation == 180:
return cv2.rotate(frame, cv2.ROTATE_180)
elif rotation == 270:
return cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
else:
return frame
def safe_filename(filename):
"""Dateinamen sicher machen"""
safe_name = filename.replace(' ', '_')
safe_name = re.sub(r'[()[\]{}]', '', safe_name)
safe_name = re.sub(r'[^a-zA-Z0-9._-]', '', safe_name)
safe_name = re.sub(r'_+', '_', safe_name)
safe_name = safe_name.strip('_')
return safe_name
def safe_download(filename):
"""Sicherer Dateidownload"""
try:
if os.path.exists(filename):
print(f"📁 Datei gefunden: {filename}")
files.download(filename)
return True
safe_name = safe_filename(filename)
if safe_name != filename:
import shutil
if os.path.exists(filename):
shutil.copy2(filename, safe_name)
print(f"🔄 In sicheren Dateinamen kopiert: {safe_name}")
files.download(safe_name)
return True
current_files = os.listdir('.')
processed_videos = [f for f in current_files if f.startswith('processed_') and f.lower().endswith('.mp4')]
if processed_videos:
latest_file = max(processed_videos, key=lambda x: os.path.getctime(x))
print(f"🔄 Versuche, die neueste Datei herunterzuladen: {latest_file}")
safe_latest = safe_filename(latest_file)
if safe_latest != latest_file:
import shutil
shutil.copy2(latest_file, safe_latest)
files.download(safe_latest)
else:
files.download(latest_file)
return True
return False
except Exception as e:
print(f"❌ Fehler beim Herunterladen: {e}")
return False
# ===== Bild-Overlay-Funktionen =====
def prepare_overlay_image(image_path, target_size=(100, 100)):
"""Overlay-Bild vorbereiten (inkl. Transparenz)"""
try:
if not os.path.exists(image_path):
print(f"❌ Bild nicht gefunden: {image_path}")
return None
img = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
if img is None:
print(f"❌ Bild konnte nicht geladen werden: {image_path}")
return None
print(f"✅ Bild geladen: {image_path} - Größe: {img.shape}")
if len(img.shape) == 3:
alpha_channel = np.ones((img.shape[0], img.shape[1], 1), dtype=img.dtype) * 255
img = np.concatenate([img, alpha_channel], axis=2)
print("🔄 Alpha-Kanal hinzugefügt")
return img
except Exception as e:
print(f"❌ Fehler bei der Bildvorbereitung: {e}")
return None
def overlay_single_image_on_bbox(background, overlay_img, bbox, scale_factor=1.0, alpha=0.8):
"""Einzelnes Bild exakt auf Bounding Box überlagern"""
try:
if overlay_img is None:
return background
x1, y1, x2, y2 = map(int, bbox[:4])
box_width = max(1, x2 - x1)
box_height = max(1, y2 - y1)
new_width = max(1, int(box_width * scale_factor))
new_height = max(1, int(box_height * scale_factor))
overlay_resized = cv2.resize(overlay_img, (new_width, new_height))
center_x = (x1 + x2) // 2
center_y = (y1 + y2) // 2
start_x = max(0, center_x - new_width // 2)
start_y = max(0, center_y - new_height // 2)
end_x = min(background.shape[1], start_x + new_width)
end_y = min(background.shape[0], start_y + new_height)
actual_width = end_x - start_x
actual_height = end_y - start_y
if actual_width <= 0 or actual_height <= 0:
return background
overlay_crop = overlay_resized[:actual_height, :actual_width]
if len(overlay_crop.shape) == 3 and overlay_crop.shape[2] == 4:
overlay_bgr = overlay_crop[:, :, :3]
overlay_alpha = overlay_crop[:, :, 3] / 255.0 * alpha
for c in range(3):
background[start_y:end_y, start_x:end_x, c] = (
overlay_alpha * overlay_bgr[:, :, c] +
(1 - overlay_alpha) * background[start_y:end_y, start_x:end_x, c]
)
else:
roi = background[start_y:end_y, start_x:end_x]
blended = cv2.addWeighted(roi, 1-alpha, overlay_crop, alpha, 0)
background[start_y:end_y, start_x:end_x] = blended
return background
except Exception as e:
print(f"⚠️ Fehler beim Anwenden des Overlays: {e}")
return background
def create_sample_overlay():
"""Beispiel-Overlay-Bild zum Testen erstellen"""
print("🎨 Beispiel-Overlay-Bild wird erstellt...")
img = np.zeros((200, 200, 4), dtype=np.uint8)
for i in range(200):
hue = int(i * 180 / 200)
color = cv2.cvtColor(np.uint8([[[hue, 255, 255]]]), cv2.COLOR_HSV2BGR)[0][0]
img[i, :, :3] = color
img[i, :, 3] = 200
cv2.putText(img, "OVERLAY", (30, 100), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (255, 255, 255), 3)
cv2.putText(img, "IMAGE", (50, 140), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 0), 2)
cv2.rectangle(img, (5, 5), (195, 195), (255, 255, 255), 3)
cv2.imwrite("sample_overlay.png", img)
print("✅ Beispielbild gespeichert: sample_overlay.png")
return "sample_overlay.png"
# ===== Hauptfunktion zur Videoverarbeitung =====
def process_video_with_continuous_overlay(input_path, output_path=None,
overlay_image_path=None,
detection_interval=2, manual_rotation=90,
overlay_scale=1.0, overlay_alpha=0.8,
enable_preview=True,
custom_queries=None):
"""Videoverarbeitung mit kontinuierlichem Bild-Overlay (verhindert Frame-Verluste)"""
print("🚀 Starte Videoverarbeitung mit kontinuierlichem Bild-Overlay!")
print("🔧 Kontinuierliches Overlay für alle Frames garantiert!")
print("="*60)
# Laden des KI-Modells
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"🔥 Verwendetes Gerät: {device}")
try:
print("📥 OWLv2-Modell wird heruntergeladen...")
model = Owlv2ForObjectDetection.from_pretrained("google/owlv2-base-patch16-ensemble")
processor = Owlv2Processor.from_pretrained("google/owlv2-base-patch16-ensemble")
model = model.to(device)
print("✅ Ensemble-Modell erfolgreich geladen")
except Exception as e:
print(f"⚠️ Ensemble fehlgeschlagen, versuche Basis-Modell: {e}")
model = Owlv2ForObjectDetection.from_pretrained("google/owlv2-base-patch16")
processor = Owlv2Processor.from_pretrained("google/owlv2-base-patch16")
model = model.to(device)
print("✅ Basis-Modell erfolgreich geladen")
# Zu erkennende Objekttypen (geänderter Teil)
if custom_queries and isinstance(custom_queries, list) and len(custom_queries) > 0:
text_queries = custom_queries
print(f"🔍 Benutzerdefinierte Abfragen verwenden: {text_queries}")
else:
# Standard-Abfrageliste
text_queries = [
"signboard", "sign", "shop sign", "store sign", "billboard",
"advertisement", "neon sign", "restaurant sign", "cafe sign"
]
print(f"🔍 Standardabfragen verwenden (Schilder/Werbung): {text_queries}")
# Farbdefinition (BGR)
colors = [
(0, 0, 255), # Rot
(255, 0, 0), # Blau
(0, 255, 0), # Grün
(0, 255, 255), # Gelb
(255, 0, 255), # Magenta
(255, 128, 0), # Türkis
(0, 0, 128), # Dunkelrot
(0, 128, 0), # Dunkelgrün
(128, 0, 0), # Dunkelblau
(0, 128, 128) # Dunkelgelb
]
# Overlay-Bild vorbereiten
overlay_img = None
if overlay_image_path:
overlay_img = prepare_overlay_image(overlay_image_path)
if overlay_img is None:
print("⚠️ Laden des Overlay-Bildes fehlgeschlagen, verwende Standardeffekt...")
# SORT-Tracker initialisieren - Tolerantere Einstellungen
tracker = Sort(
max_age=100, # Längere Beibehaltungszeit (100 Frames)
min_hits=1, # Sofortige Anzeige
iou_threshold=0.05 # Toleranteres Matching
)
print(f"🎯 SORT-Einstellungen: max_age=100, min_hits=1, iou_threshold=0.05")
# Video öffnen
cap = cv2.VideoCapture(input_path)
if not cap.isOpened():
print("❌ Video konnte nicht geöffnet werden")
return False
# Videoinformationen
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"📹 Video: {width}x{height}, {fps:.1f}fps, {total_frames} Frames")
print(f"🖼️ Overlay: Skalierung={overlay_scale}, Transparenz={overlay_alpha}")
# Ausgabegröße (Rotation berücksichtigen)
if manual_rotation in [90, 270]:
out_width, out_height = height, width
else:
out_width, out_height = width, height
# Ausgabe-Videoeinstellungen
out = None
if output_path:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (out_width, out_height))
print(f"💾 Ausgabe: {output_path}")
# Verarbeitungsvariablen initialisieren
frame_count = 0
tracked_objects = {}
sample_frames = []
detection_count = 0
# 🔥 Kern: Speichern des kontinuierlichen Tracking-Status
persistent_tracked_boxes = []
last_detection_frame = 0
print("🔄 Verarbeitung gestartet - Kontinuierliches Overlay garantiert...")
print("-" * 50)
try:
while cap.isOpened() and frame_count < total_frames:
ret, frame = cap.read()
if not ret:
break
# Rotation anwenden
if manual_rotation != 0:
frame = rotate_frame(frame, manual_rotation)
frame_count += 1
# Fortschritt anzeigen
if frame_count % 30 == 0:
progress = (frame_count / total_frames) * 100
active_trackers = len(tracker.trackers)
persistent_count = len(persistent_tracked_boxes)
print(f"⏳ {progress:.1f}% | Frame {frame_count}/{total_frames} | Aktiv:{active_trackers} | Persistent:{persistent_count}")
# 🎯 Objekterkennung (periodisch)
detect_this_frame = (frame_count % detection_interval == 0)
if detect_this_frame:
try:
# KI-Modell ausführen
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(rgb_frame)
inputs = processor(text=text_queries, images=pil_image, return_tensors="pt")
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
target_sizes = torch.tensor([pil_image.size[::-1]])
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
results = processor.post_process_object_detection(
outputs, target_sizes=target_sizes, threshold=0.15
)
# Erkennungsergebnisse umwandeln
current_detections = []
detection_labels = []
for result in results:
boxes = result["boxes"]
scores = result["scores"]
labels = result["labels"]
for box, score, label in zip(boxes, scores, labels):
if score >= 0.15:
box = box.tolist()
current_detections.append([*box, score.item()])
detection_labels.append(label.item())
# SORT-Tracking aktualisieren
if current_detections:
current_detections = np.array(current_detections)
tracked_boxes = tracker.update(current_detections)
# 🔥 Kontinuierlichen Tracking-Status aktualisieren
persistent_tracked_boxes = tracked_boxes.copy()
last_detection_frame = frame_count
if len(tracked_boxes) > 0:
print(f"🎯 Frame {frame_count}: {len(current_detections)} erkannt → {len(tracked_boxes)} verfolgt")
# ID-Label-Mapping
if len(tracked_boxes) > 0 and len(detection_labels) > 0:
for i, box in enumerate(tracked_boxes):
obj_id = int(box[4])
if obj_id not in tracked_objects and i < len(detection_labels):
tracked_objects[obj_id] = detection_labels[i]
detection_count += len(tracked_boxes)
else:
# Keine Erkennung - Leeres Update
tracked_boxes = tracker.update(np.empty((0, 5)))
except Exception as e:
print(f"⚠️ Frame {frame_count} Erkennungsfehler: {str(e)[:50]}...")
# Bei Fehler persistenten Status beibehalten
tracked_boxes = persistent_tracked_boxes.copy()
else:
# 🔥 Frame ohne Erkennung: Persistenten Tracking-Status verwenden
try:
# Nur SORT-Vorhersage durchführen
predicted_boxes = tracker.update(np.empty((0, 5)))
# Wenn vorhergesagte Boxen vorhanden, verwenden, sonst persistenten Status verwenden
if len(predicted_boxes) > 0:
tracked_boxes = predicted_boxes
# Persistenten Status ebenfalls aktualisieren (Positionsprognose berücksichtigen)
persistent_tracked_boxes = predicted_boxes.copy()
else:
# Bei Vorhersagefehler persistenten Status verwenden
tracked_boxes = persistent_tracked_boxes.copy()
# 🔧 Position leicht aktualisieren (natürliche Bewegung)
frame_diff = frame_count - last_detection_frame
if frame_diff > 50: # Mehr als 50 Frames keine Erkennung
# Tracking-Status zurücksetzen
persistent_tracked_boxes = []
tracked_boxes = []
except:
# Auch bei Ausnahme persistenten Status verwenden
tracked_boxes = persistent_tracked_boxes.copy()
# 🎨 Kontinuierliches Bild-Overlay anwenden
overlay_applied_count = 0
if len(tracked_boxes) > 0:
for box in tracked_boxes:
try:
x1, y1, x2, y2, obj_id = box
obj_id = int(obj_id)
if x2 > x1 and y2 > y1:
if overlay_img is not None:
# 🖼️ Benutzerdefiniertes Bild-Overlay
frame = overlay_single_image_on_bbox(
frame, overlay_img, box,
scale_factor=overlay_scale,
alpha=overlay_alpha
)
overlay_applied_count += 1
else:
# Standard-Weichzeichner-Effekt
roi = frame[int(y1):int(y2), int(x1):int(x2)]
if roi.size > 0:
kernel_size = max(3, min(51, int((x2-x1)//5)))
if kernel_size % 2 == 0:
kernel_size += 1
blurred = cv2.GaussianBlur(roi, (kernel_size, kernel_size), 0)
frame[int(y1):int(y2), int(x1):int(x2)] = blurred
overlay_applied_count += 1
# ID-Label anzeigen
color = colors[obj_id % len(colors)]
cv2.putText(frame, f"ID: {obj_id}", (int(x1), int(y1) - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
except Exception as e:
continue
# 🔢 Informationen anzeigen
info_y = 30
cv2.putText(frame, f"Frame: {frame_count}/{total_frames}", (10, info_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
info_y += 30
cv2.putText(frame, f"Tracked: {len(tracked_boxes)} | Overlays: {overlay_applied_count}", (10, info_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
info_y += 30
detect_status = "DETECT" if detect_this_frame else "PREDICT"
cv2.putText(frame, f"Mode: {detect_status}", (10, info_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
info_y += 30
cv2.putText(frame, f"Persistent: {len(persistent_tracked_boxes)}", (10, info_y),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 255), 2)
# Speichern
if out:
out.write(frame)
# Beispiel-Frames speichern
if enable_preview and len(sample_frames) < 4:
frame_interval = max(1, total_frames // 4)
if frame_count % frame_interval == 0:
sample_frames.append(frame.copy())
except KeyboardInterrupt:
print("Vom Benutzer unterbrochen")
except Exception as e:
print(f"Fehler während der Verarbeitung: {e}")
finally:
# Aufräumen
cap.release()
if out:
out.release()
print("-" * 50)
print(f"✅ Verarbeitung mit kontinuierlichem Overlay abgeschlossen!")
print(f"🎯 Gesamtzahl der Erkennungen: {detection_count}")
print(f"🔢 Endgültige Anzahl der Tracker: {len(tracker.trackers)}")
print(f"📊 Verfolgte eindeutige Objekte: {len(tracked_objects)}")
# Vorschau anzeigen
if enable_preview and sample_frames:
print("\n🖼️ Beispiel für Verarbeitungsergebnisse:")
create_video_preview(sample_frames, 4)
return True
print("📦 Video Tracker Core Library geladen!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment