#!/usr/bin/env python3 """ benchmark/camera_count/run_camera_study.py ========================================== Untersucht, wie sich die Pose-Genauigkeit (Gelenkwinkelfehler, Handgelenk- und Finger-Positionsfehler in mm) mit der Kameraanzahl verändert. Für jede Szene mit bekannter Ground-Truth (pose.json) und jede Kameraanzahl k wird eine zufällige Stichprobe von k-Kamera-Subsets durch die volle Pipeline geschickt und ausgewertet. Ergebnisse landen in benchmark/camera_count/results/. Erweiterbar: funktioniert mit beliebig vielen Szenen und Kameras — neue Szenen werden automatisch erkannt, sobald sie pose.json haben. Aufruf: python benchmark/camera_count/run_camera_study.py python benchmark/camera_count/run_camera_study.py --scenes Scene7 Scene9 python benchmark/camera_count/run_camera_study.py --k-min 3 --k-max 5 --samples 15 python benchmark/camera_count/run_camera_study.py --force # Pipeline neu rechnen python benchmark/camera_count/run_camera_study.py --clean # alles löschen + neu python benchmark/camera_count/run_camera_study.py --clean --scenes Scene10 # nur diese Szene """ from __future__ import annotations import argparse import glob import itertools import json import random import re import shutil import subprocess import sys from pathlib import Path from statistics import mean ROOT = Path(__file__).resolve().parent.parent.parent RESULTS_DIR = Path(__file__).resolve().parent / "results" PY = sys.executable def discover_scenes(sim_dir: Path) -> list[str]: """Alle Szenen mit render_*.png und pose.json.""" scenes = [] for d in sorted(sim_dir.iterdir()): if not d.name.startswith("Scene"): continue if not (d / "pose.json").exists(): continue if not list(d.glob("render_*.png")): continue scenes.append(d.name) return scenes def camera_ids(scene_dir: Path) -> list[str]: """Sortierte Kamera-IDs (a, b, c ...) einer Szene.""" ids = [] for f in sorted(scene_dir.glob("render_*.png")): m = re.match(r"render_([A-Za-z0-9]+)\.png", f.name) if m: ids.append(m.group(1)) return ids def run_pipeline(scene_dir: Path, cams: list[str], eval_dir: Path, robot: str) -> bool: cmd = [ PY, str(ROOT / "pipeline" / "run_pipeline.py"), str(scene_dir), "--robot", robot, "--evalDir", str(eval_dir), "--cameras", ",".join(cams), ] r = subprocess.run(cmd, cwd=str(ROOT), capture_output=True, text=True) if r.returncode != 0: snippet = (r.stderr or r.stdout).strip()[-300:] print(f" [WARN] Pipeline fehlgeschlagen: {snippet}") return False return True def eval_pose(robot_state: Path, gt: Path, robot: str) -> dict | None: out = robot_state.with_suffix(".eval.json") cmd = [ PY, str(ROOT / "benchmark" / "eval_pose.py"), str(robot_state), str(gt), "--out", str(out), "--robot", robot, "--tolDeg", "999", "--tolMm", "999", ] subprocess.run(cmd, cwd=str(ROOT), capture_output=True, text=True) return json.loads(out.read_text(encoding="utf-8")) if out.exists() else None def main() -> None: ap = argparse.ArgumentParser(description="Kamera-Anzahl vs. Pose-Genauigkeit") ap.add_argument("--scenes", nargs="*", default=None, help="Szenen-Namen oder Nummern (Standard: alle mit pose.json)") ap.add_argument("--k-min", type=int, default=3, help="Minimale Kameraanzahl (Standard: 3)") ap.add_argument("--k-max", type=int, default=None, help="Maximale Kameraanzahl (Standard: alle verfügbaren)") ap.add_argument("--samples", type=int, default=10, help="Zufällige Subsets pro (Szene, k) — Standard: 10") ap.add_argument("--seed", type=int, default=42) ap.add_argument("--robot", default=str(ROOT / "data" / "robot" / "robot.json")) ap.add_argument("--force", action="store_true", help="Vorhandene robot_state.json neu rechnen (Pipeline erneut laufen)") ap.add_argument("--clean", action="store_true", help="Zwischenergebnisse vorher löschen und komplett neu rechnen. " "Mit --scenes nur diese Szenen, ohne --scenes alles.") ap.add_argument("--out", default=str(RESULTS_DIR / "camera_study.json")) ap.add_argument("--csv", default=str(RESULTS_DIR / "camera_study.csv")) args = ap.parse_args() random.seed(args.seed) RESULTS_DIR.mkdir(parents=True, exist_ok=True) sim_dir = ROOT / "data" / "simulation" study_root = ROOT / "data" / "camera_study" scenes = discover_scenes(sim_dir) if args.scenes: want = {s if s.startswith("Scene") else f"Scene{s}" for s in args.scenes} scenes = [s for s in scenes if s in want] if not scenes: print("[ERROR] Keine Szenen mit pose.json und render_*.png gefunden.") sys.exit(1) # --clean: Zwischenergebnisse entfernen. if args.clean: if args.scenes: for s in scenes: d = study_root / s if d.exists(): shutil.rmtree(d) print(f"[CLEAN] entfernt {d}") else: if study_root.exists(): shutil.rmtree(study_root) print(f"[CLEAN] entfernt {study_root}") for f in (Path(args.out), Path(args.csv)): if f.exists(): f.unlink() print(f"[CLEAN] entfernt {f}") args.force = True print(f"[INFO] Szenen: {scenes}") print(f"[INFO] Seed={args.seed} Samples/k={args.samples}\n") all_results: list[dict] = [] for scene in scenes: scene_dir = sim_dir / scene cams = camera_ids(scene_dir) k_max = min(args.k_max or len(cams), len(cams)) k_range = range(args.k_min, k_max + 1) gt = scene_dir / "pose.json" print(f"[SZENE] {scene} Kameras={cams} k={list(k_range)}") for k in k_range: all_combos = list(itertools.combinations(cams, k)) n_sample = min(args.samples, len(all_combos)) sample = random.sample(all_combos, n_sample) errs: list[float] = [] print(f" k={k}: {n_sample}/{len(all_combos)} Subsets") for combo in sample: label = "".join(combo) eval_dir = ROOT / "data" / "camera_study" / scene / f"k{k}_{label}" rs = eval_dir / "robot_state.json" if rs.exists() and not args.force: print(f" {label}: übersprungen (robot_state.json existiert)") else: eval_dir.mkdir(parents=True, exist_ok=True) ok = run_pipeline(scene_dir, list(combo), eval_dir, args.robot) if not ok or not rs.exists(): print(f" {label}: FEHLER — übersprungen") continue ev = eval_pose(rs, gt, args.robot) if not ev: print(f" {label}: Auswertung fehlgeschlagen") continue s = ev["summary"] row = { "scene": scene, "k": k, "subset": label, "mean_abs_deg": s.get("mean_abs_deg"), "max_abs_deg": s.get("max_abs_deg"), "mean_abs_mm": s.get("mean_abs_mm"), "max_abs_mm": s.get("max_abs_mm"), "wrist_error_mm": s.get("wrist_error_mm"), "finger_error_mm": s.get("finger_error_mm"), "n_unobservable": s.get("n_unobservable"), } all_results.append(row) errs.append(row["mean_abs_deg"] or 0.0) we, fe = row["wrist_error_mm"], row["finger_error_mm"] we_str = f"{we:.2f}mm" if we is not None else "n/a" fe_str = f"{fe:.2f}mm" if fe is not None else "n/a" print(f" {label}: mean={row['mean_abs_deg']:.3f}° " f"wrist={we_str} finger={fe_str} unobs={row['n_unobservable']}") if errs: print(f" k={k} Zusammenfassung: mean={mean(errs):.3f}° " f"min={min(errs):.3f}° max={max(errs):.3f}°") # Ergebnisse mergen — andere Szenen erhalten out_path = Path(args.out) processed = set(scenes) merged: list[dict] = [] if out_path.exists(): try: existing = json.loads(out_path.read_text(encoding="utf-8")) merged = [r for r in existing if r.get("scene") not in processed] except (json.JSONDecodeError, OSError): merged = [] merged.extend(all_results) merged.sort(key=lambda r: (r["scene"], r["k"], r["subset"])) out_path.write_text(json.dumps(merged, indent=2), encoding="utf-8") with open(args.csv, "w", encoding="utf-8") as f: f.write("scene,k,subset,mean_abs_deg,max_abs_deg,mean_abs_mm,max_abs_mm," "wrist_error_mm,finger_error_mm,n_unobservable\n") for r in merged: f.write(f"{r['scene']},{r['k']},{r['subset']}," f"{r['mean_abs_deg'] or ''}," f"{r['max_abs_deg'] or ''}," f"{r['mean_abs_mm'] or ''}," f"{r['max_abs_mm'] or ''}," f"{r['wrist_error_mm'] if r['wrist_error_mm'] is not None else ''}," f"{r['finger_error_mm'] if r['finger_error_mm'] is not None else ''}," f"{r['n_unobservable'] if r['n_unobservable'] is not None else ''}\n") print(f"\n[DONE] {len(all_results)} neu, {len(merged)} gesamt -> {args.out}") if __name__ == "__main__": main()