184 lines
7.5 KiB
Python
184 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
run_benchmark.py
|
|
================
|
|
Pose-estimation benchmark over many scenes x methods against ground truth.
|
|
|
|
For each scene it ensures corner marker poses exist (pipeline step 3b), then
|
|
runs each pose-estimation method and compares the estimated joint angles to
|
|
simulation/SceneX/pose.json. Produces a matrix (mean/max joint error) and a
|
|
per-method aggregate including the spread across scenes — the actual measure
|
|
of STABILITY.
|
|
|
|
Usage:
|
|
python benchmark/run_benchmark.py
|
|
python benchmark/run_benchmark.py --scenes 4 5 8 --methods hybrid global_ba
|
|
python benchmark/run_benchmark.py --observation center_point # aruco_positions_initial.json
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from statistics import mean, pstdev
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
PY = sys.executable
|
|
ALL_METHODS = ["sequential_vector", "sequential_fk", "global_ba", "hybrid"]
|
|
|
|
|
|
def run(cmd: List[str]) -> bool:
|
|
r = subprocess.run(cmd, cwd=ROOT, capture_output=True, text=True)
|
|
if r.returncode != 0:
|
|
print(f"[WARN] command failed: {' '.join(cmd)}\n{r.stderr.strip()[:300]}")
|
|
return False
|
|
return True
|
|
|
|
|
|
def discover_scenes() -> List[str]:
|
|
out = []
|
|
sim = os.path.join(ROOT, "data", "simulation")
|
|
for s in sorted(os.listdir(sim)):
|
|
if not s.startswith("Scene"):
|
|
continue
|
|
ev = os.path.join(ROOT, "data", "evaluations", s)
|
|
if os.path.exists(os.path.join(sim, s, "pose.json")) and os.path.isdir(ev):
|
|
import glob
|
|
if glob.glob(os.path.join(ev, "*_aruco_detection.json")):
|
|
out.append(s)
|
|
return out
|
|
|
|
|
|
def ensure_marker_poses(scene: str, robot_path: str) -> Optional[str]:
|
|
eval_dir = os.path.join(ROOT, "data", "evaluations", scene)
|
|
mp = os.path.join(eval_dir, "aruco_marker_poses.json")
|
|
if not os.path.exists(mp):
|
|
ok = run([PY, "pipeline/3b_corner_marker_poses.py", "--evalDir", eval_dir, "--robot", robot_path])
|
|
if not ok:
|
|
return None
|
|
return mp
|
|
|
|
|
|
def center_input(scene: str) -> Optional[str]:
|
|
p = os.path.join(ROOT, "data", "evaluations", scene, "aruco_positions_initial.json")
|
|
return p if os.path.exists(p) else None
|
|
|
|
|
|
def eval_one(scene: str, method: str, markers_path: str, robot_path: str) -> Optional[Dict[str, Any]]:
|
|
eval_dir = os.path.join(ROOT, "data", "evaluations", scene)
|
|
rs = os.path.join(eval_dir, f"bench_rs_{method}.json")
|
|
if not run([PY, "pipeline/pose_estimation.py", markers_path, "-robot", robot_path,
|
|
"--method", method, "-out", rs]):
|
|
return None
|
|
ev = os.path.join(eval_dir, f"bench_ev_{method}.json")
|
|
gt = os.path.join(ROOT, "data", "simulation", scene, "pose.json")
|
|
# eval_pose returns nonzero when over tolerance; that's fine, we read the JSON
|
|
run([PY, "benchmark/eval_pose.py", rs, gt, "--out", ev, "--tolDeg", "999", "--tolMm", "999"])
|
|
if not os.path.exists(ev):
|
|
return None
|
|
return json.load(open(ev, "r", encoding="utf-8"))
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--scenes", nargs="*", default=None, help="scene names or numbers (default: all ready)")
|
|
ap.add_argument("--methods", nargs="*", default=ALL_METHODS)
|
|
ap.add_argument("--robot", default=os.path.join(ROOT, "data", "robot", "robot.json"))
|
|
ap.add_argument("--observation", choices=["corner_pose", "center_point"], default="corner_pose")
|
|
ap.add_argument("--out", default=os.path.join(ROOT, "benchmark", "benchmark_results.json"))
|
|
ap.add_argument("--csv", default=os.path.join(ROOT, "benchmark", "benchmark_results.csv"))
|
|
args = ap.parse_args()
|
|
|
|
scenes = discover_scenes()
|
|
if args.scenes:
|
|
want = {s if s.startswith("Scene") else f"Scene{s}" for s in args.scenes}
|
|
scenes = [s for s in scenes if s in want]
|
|
print(f"[INFO] scenes: {scenes}")
|
|
print(f"[INFO] methods: {args.methods} | observation: {args.observation}\n")
|
|
|
|
matrix: Dict[str, Dict[str, Any]] = {}
|
|
for scene in scenes:
|
|
if args.observation == "corner_pose":
|
|
mpath = ensure_marker_poses(scene, args.robot)
|
|
else:
|
|
mpath = center_input(scene)
|
|
if not mpath:
|
|
print(f"[WARN] {scene}: no {args.observation} input, skipping")
|
|
continue
|
|
matrix[scene] = {}
|
|
for m in args.methods:
|
|
ev = eval_one(scene, m, mpath, args.robot)
|
|
matrix[scene][m] = ev["summary"] if ev else None
|
|
|
|
# ---- print matrix ----
|
|
def fmt(s, key):
|
|
return f"{s[key]:5.2f}" if (s and s.get(key) is not None) else " - "
|
|
|
|
print("\n" + "=" * 78)
|
|
print("POSE ERROR — mean angle [deg] (max) / mean linear [mm] (max)")
|
|
print("=" * 78)
|
|
header = f"{'scene':>8} | " + " | ".join(f"{m[:12]:>16}" for m in args.methods)
|
|
print(header)
|
|
print("-" * len(header))
|
|
for scene in scenes:
|
|
if scene not in matrix:
|
|
continue
|
|
cells = []
|
|
for m in args.methods:
|
|
s = matrix[scene][m]
|
|
if s:
|
|
cells.append(f"{fmt(s,'mean_abs_deg')}({fmt(s,'max_abs_deg').strip()})/{fmt(s,'mean_abs_mm').strip()}")
|
|
else:
|
|
cells.append(" FAIL ")
|
|
print(f"{scene:>8} | " + " | ".join(f"{c:>16}" for c in cells))
|
|
|
|
# ---- per-method aggregate (stability = spread across scenes) ----
|
|
print("\n" + "=" * 78)
|
|
print("PER-METHOD AGGREGATE across scenes (lower = better, std = instability)")
|
|
print("=" * 78)
|
|
print(f"{'method':>16} | {'mean deg':>9} | {'std deg':>8} | {'worst deg':>9} | {'mean mm':>8} | {'worst mm':>8}")
|
|
print("-" * 78)
|
|
agg: Dict[str, Any] = {}
|
|
for m in args.methods:
|
|
degs = [matrix[s][m]["mean_abs_deg"] for s in matrix
|
|
if matrix[s].get(m) and matrix[s][m].get("mean_abs_deg") is not None]
|
|
maxdegs = [matrix[s][m]["max_abs_deg"] for s in matrix
|
|
if matrix[s].get(m) and matrix[s][m].get("max_abs_deg") is not None]
|
|
mms = [matrix[s][m]["mean_abs_mm"] for s in matrix
|
|
if matrix[s].get(m) and matrix[s][m].get("mean_abs_mm") is not None]
|
|
maxmms = [matrix[s][m]["max_abs_mm"] for s in matrix
|
|
if matrix[s].get(m) and matrix[s][m].get("max_abs_mm") is not None]
|
|
a = {
|
|
"mean_deg": mean(degs) if degs else None,
|
|
"std_deg": pstdev(degs) if len(degs) > 1 else 0.0,
|
|
"worst_deg": max(maxdegs) if maxdegs else None,
|
|
"mean_mm": mean(mms) if mms else None,
|
|
"worst_mm": max(maxmms) if maxmms else None,
|
|
"n_scenes": len(degs),
|
|
}
|
|
agg[m] = a
|
|
def f(x):
|
|
return f"{x:9.3f}" if x is not None else " - "
|
|
print(f"{m:>16} | {f(a['mean_deg'])} | {a['std_deg']:8.3f} | {f(a['worst_deg'])} | "
|
|
f"{f(a['mean_mm'])} | {f(a['worst_mm'])}")
|
|
|
|
json.dump({"observation": args.observation, "matrix": matrix, "aggregate": agg},
|
|
open(args.out, "w", encoding="utf-8"), indent=2)
|
|
# CSV
|
|
with open(args.csv, "w", encoding="utf-8") as f:
|
|
f.write("scene,method,mean_abs_deg,max_abs_deg,mean_abs_mm,max_abs_mm\n")
|
|
for scene in matrix:
|
|
for m in args.methods:
|
|
s = matrix[scene].get(m)
|
|
if s:
|
|
f.write(f"{scene},{m},{s.get('mean_abs_deg','')},{s.get('max_abs_deg','')},"
|
|
f"{s.get('mean_abs_mm','')},{s.get('max_abs_mm','')}\n")
|
|
print(f"\n[INFO] wrote {args.out} and {args.csv}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|