113 lines
4.2 KiB
Python
113 lines
4.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
run_pipeline.py
|
|
===============
|
|
Plattformunabhängiger Orchestrator der Pose-Pipeline (Linux/Container & Windows).
|
|
Funktionales Äquivalent zu run/run_pipeline.bat:
|
|
|
|
1 ArUco-Detektion (1_detect_aruco_observations.py)
|
|
2 Kamera-Posen (2_estimate_camera_from_observations.py)
|
|
3 Multi-View (center) (3_multiview_bundle_adjustment_v4.py)
|
|
3b Eck-Marker-Posen (3b_corner_marker_poses.py)
|
|
4 Pose-Estimation (pose_estimation.py)
|
|
|
|
Aufruf:
|
|
python pipeline/run_pipeline.py data/simulation/Scene8
|
|
python pipeline/run_pipeline.py data/simulation/Scene8 --robot data/robot/robot.json
|
|
|
|
Im Container (Pipeline-Image) ist das der Standard-Einstiegspunkt.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import glob
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
PIPE = Path(__file__).resolve().parent # .../pipeline
|
|
ROOT = PIPE.parent # Repo-Wurzel
|
|
PY = sys.executable
|
|
|
|
|
|
def run(cmd) -> None:
|
|
print(" $", " ".join(str(c) for c in cmd))
|
|
r = subprocess.run(cmd)
|
|
if r.returncode != 0:
|
|
print(f"[ERROR] Schritt fehlgeschlagen (exit {r.returncode})")
|
|
sys.exit(r.returncode)
|
|
|
|
|
|
def cam_id_of(path: str):
|
|
m = re.match(r"render_([A-Za-z0-9]+)\.png", os.path.basename(path))
|
|
return m.group(1) if m else None
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description="Roboter-Pose-Pipeline: Bilder -> robot_state.json")
|
|
ap.add_argument("scene", help="Bildordner, z.B. data/simulation/Scene8")
|
|
ap.add_argument("--robot", default=os.environ.get("ROBOT_JSON", str(ROOT / "data" / "robot" / "robot.json")))
|
|
ap.add_argument("--evalDir", default=None, help="Ausgabeordner (Default: data/evaluations/<SceneName>)")
|
|
ap.add_argument("--lambdaWeight", default="100.0")
|
|
args = ap.parse_args()
|
|
|
|
scene = Path(args.scene).resolve()
|
|
eval_dir = Path(args.evalDir).resolve() if args.evalDir else (ROOT / "data" / "evaluations" / scene.name)
|
|
eval_dir.mkdir(parents=True, exist_ok=True)
|
|
robot = args.robot
|
|
|
|
imgs = sorted(glob.glob(str(scene / "render_*.png")))
|
|
if not imgs:
|
|
print(f"[ERROR] keine render_*.png in {scene}")
|
|
sys.exit(1)
|
|
print(f"[INFO] Scene={scene.name} Bilder={len(imgs)} -> {eval_dir}")
|
|
|
|
# ── Schritt 1: ArUco-Detektion (pro Bild seine EIGENE npz) ──
|
|
print("\n[STEP 1] ArUco-Detektion")
|
|
for img in imgs:
|
|
cid = cam_id_of(img)
|
|
if not cid:
|
|
continue
|
|
npz = scene / f"render_{cid}.npz"
|
|
if not npz.exists():
|
|
npz = scene / "render_a.npz" # Fallback (gleiche Intrinsik in der Sim)
|
|
run([PY, str(PIPE / "1_detect_aruco_observations.py"),
|
|
"-i", img, "-npz", str(npz), "-outDir", str(eval_dir),
|
|
"-robot", robot, "-cameraId", cid])
|
|
|
|
# ── Schritt 2: Kamera-Posen ──
|
|
print("\n[STEP 2] Kamera-Posen")
|
|
dets = sorted(glob.glob(str(eval_dir / "*_aruco_detection.json")))
|
|
for d in dets:
|
|
run([PY, str(PIPE / "2_estimate_camera_from_observations.py"),
|
|
"-i", d, "-robot", robot, "-outDir", str(eval_dir)])
|
|
|
|
# ── Schritt 3: Multi-View Triangulation (center, für Viewer/Vergleich) ──
|
|
print("\n[STEP 3] Multi-View Bundle Adjustment")
|
|
poses = sorted(glob.glob(str(eval_dir / "*_camera_pose.json")))
|
|
det_args, pose_args = [], []
|
|
for d in dets:
|
|
det_args += ["-det", d]
|
|
for p in poses:
|
|
pose_args += ["-pose", p]
|
|
run([PY, str(PIPE / "3_multiview_bundle_adjustment_v4.py"),
|
|
"-robot", robot, "-lambdaWeight", args.lambdaWeight] + det_args + pose_args)
|
|
|
|
# ── Schritt 3b: Eck-Marker-Posen (Position + gemessene Normale) ──
|
|
print("\n[STEP 3b] Eck-Marker-Posen")
|
|
run([PY, str(PIPE / "3b_corner_marker_poses.py"), "--evalDir", str(eval_dir), "--robot", robot])
|
|
|
|
# ── Schritt 4: Pose-Estimation (Gelenkwinkel) ──
|
|
print("\n[STEP 4] Pose-Estimation")
|
|
run([PY, str(PIPE / "pose_estimation.py"),
|
|
str(eval_dir / "aruco_marker_poses.json"), "-robot", robot,
|
|
"-out", str(eval_dir / "robot_state.json")])
|
|
|
|
print(f"\n[DONE] -> {eval_dir / 'robot_state.json'}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|