#!/usr/bin/env python3 """ run_benchmark.py ================ Pose-estimation benchmark over many scenes x methods against ground truth. For each scene it ensures corner marker poses exist (pipeline step 3b), then runs each pose-estimation method and compares the estimated joint angles to simulation/SceneX/pose.json. Produces a matrix (mean/max joint error) and a per-method aggregate including the spread across scenes — the actual measure of STABILITY. Usage: python benchmark/run_benchmark.py python benchmark/run_benchmark.py --scenes 4 5 8 --methods hybrid global_ba python benchmark/run_benchmark.py --observation center_point # aruco_positions_initial.json """ from __future__ import annotations import argparse import json import os import subprocess import sys from statistics import mean, pstdev from typing import Any, Dict, List, Optional ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) PY = sys.executable ALL_METHODS = ["sequential_vector", "sequential_fk", "global_ba", "hybrid"] def run(cmd: List[str]) -> bool: r = subprocess.run(cmd, cwd=ROOT, capture_output=True, text=True) if r.returncode != 0: print(f"[WARN] command failed: {' '.join(cmd)}\n{r.stderr.strip()[:300]}") return False return True def discover_scenes() -> List[str]: out = [] sim = os.path.join(ROOT, "data", "simulation") for s in sorted(os.listdir(sim)): if not s.startswith("Scene"): continue ev = os.path.join(ROOT, "data", "evaluations", s) if os.path.exists(os.path.join(sim, s, "pose.json")) and os.path.isdir(ev): import glob if glob.glob(os.path.join(ev, "*_aruco_detection.json")): out.append(s) return out def ensure_marker_poses(scene: str, robot_path: str) -> Optional[str]: eval_dir = os.path.join(ROOT, "data", "evaluations", scene) mp = os.path.join(eval_dir, "aruco_marker_poses.json") if not os.path.exists(mp): ok = run([PY, "pipeline/3b_corner_marker_poses.py", "--evalDir", eval_dir, "--robot", robot_path]) if not ok: return None return mp def center_input(scene: str) -> Optional[str]: p = os.path.join(ROOT, "data", "evaluations", scene, "aruco_positions_initial.json") return p if os.path.exists(p) else None def eval_one(scene: str, method: str, markers_path: str, robot_path: str) -> Optional[Dict[str, Any]]: eval_dir = os.path.join(ROOT, "data", "evaluations", scene) rs = os.path.join(eval_dir, f"bench_rs_{method}.json") if not run([PY, "pipeline/pose_estimation.py", markers_path, "-robot", robot_path, "--method", method, "-out", rs]): return None ev = os.path.join(eval_dir, f"bench_ev_{method}.json") gt = os.path.join(ROOT, "data", "simulation", scene, "pose.json") # eval_pose returns nonzero when over tolerance; that's fine, we read the JSON run([PY, "benchmark/eval_pose.py", rs, gt, "--out", ev, "--tolDeg", "999", "--tolMm", "999"]) if not os.path.exists(ev): return None return json.load(open(ev, "r", encoding="utf-8")) def main() -> None: ap = argparse.ArgumentParser() ap.add_argument("--scenes", nargs="*", default=None, help="scene names or numbers (default: all ready)") ap.add_argument("--methods", nargs="*", default=ALL_METHODS) ap.add_argument("--robot", default=os.path.join(ROOT, "data", "robot", "robot.json")) ap.add_argument("--observation", choices=["corner_pose", "center_point"], default="corner_pose") ap.add_argument("--out", default=os.path.join(ROOT, "benchmark", "benchmark_results.json")) ap.add_argument("--csv", default=os.path.join(ROOT, "benchmark", "benchmark_results.csv")) args = ap.parse_args() scenes = discover_scenes() if args.scenes: want = {s if s.startswith("Scene") else f"Scene{s}" for s in args.scenes} scenes = [s for s in scenes if s in want] print(f"[INFO] scenes: {scenes}") print(f"[INFO] methods: {args.methods} | observation: {args.observation}\n") matrix: Dict[str, Dict[str, Any]] = {} for scene in scenes: if args.observation == "corner_pose": mpath = ensure_marker_poses(scene, args.robot) else: mpath = center_input(scene) if not mpath: print(f"[WARN] {scene}: no {args.observation} input, skipping") continue matrix[scene] = {} for m in args.methods: ev = eval_one(scene, m, mpath, args.robot) matrix[scene][m] = ev["summary"] if ev else None # ---- print matrix ---- def fmt(s, key): return f"{s[key]:5.2f}" if (s and s.get(key) is not None) else " - " print("\n" + "=" * 78) print("POSE ERROR — mean angle [deg] (max) / mean linear [mm] (max)") print("=" * 78) header = f"{'scene':>8} | " + " | ".join(f"{m[:12]:>16}" for m in args.methods) print(header) print("-" * len(header)) for scene in scenes: if scene not in matrix: continue cells = [] for m in args.methods: s = matrix[scene][m] if s: cells.append(f"{fmt(s,'mean_abs_deg')}({fmt(s,'max_abs_deg').strip()})/{fmt(s,'mean_abs_mm').strip()}") else: cells.append(" FAIL ") print(f"{scene:>8} | " + " | ".join(f"{c:>16}" for c in cells)) # ---- per-method aggregate (stability = spread across scenes) ---- print("\n" + "=" * 78) print("PER-METHOD AGGREGATE across scenes (lower = better, std = instability)") print("=" * 78) print(f"{'method':>16} | {'mean deg':>9} | {'std deg':>8} | {'worst deg':>9} | {'mean mm':>8} | {'worst mm':>8}") print("-" * 78) agg: Dict[str, Any] = {} for m in args.methods: degs = [matrix[s][m]["mean_abs_deg"] for s in matrix if matrix[s].get(m) and matrix[s][m].get("mean_abs_deg") is not None] maxdegs = [matrix[s][m]["max_abs_deg"] for s in matrix if matrix[s].get(m) and matrix[s][m].get("max_abs_deg") is not None] mms = [matrix[s][m]["mean_abs_mm"] for s in matrix if matrix[s].get(m) and matrix[s][m].get("mean_abs_mm") is not None] maxmms = [matrix[s][m]["max_abs_mm"] for s in matrix if matrix[s].get(m) and matrix[s][m].get("max_abs_mm") is not None] a = { "mean_deg": mean(degs) if degs else None, "std_deg": pstdev(degs) if len(degs) > 1 else 0.0, "worst_deg": max(maxdegs) if maxdegs else None, "mean_mm": mean(mms) if mms else None, "worst_mm": max(maxmms) if maxmms else None, "n_scenes": len(degs), } agg[m] = a def f(x): return f"{x:9.3f}" if x is not None else " - " print(f"{m:>16} | {f(a['mean_deg'])} | {a['std_deg']:8.3f} | {f(a['worst_deg'])} | " f"{f(a['mean_mm'])} | {f(a['worst_mm'])}") json.dump({"observation": args.observation, "matrix": matrix, "aggregate": agg}, open(args.out, "w", encoding="utf-8"), indent=2) # CSV with open(args.csv, "w", encoding="utf-8") as f: f.write("scene,method,mean_abs_deg,max_abs_deg,mean_abs_mm,max_abs_mm\n") for scene in matrix: for m in args.methods: s = matrix[scene].get(m) if s: f.write(f"{scene},{m},{s.get('mean_abs_deg','')},{s.get('max_abs_deg','')}," f"{s.get('mean_abs_mm','')},{s.get('max_abs_mm','')}\n") print(f"\n[INFO] wrote {args.out} and {args.csv}") if __name__ == "__main__": main()