#!/usr/bin/env python3 """ run_pipeline.py =============== Plattformunabhängiger Orchestrator der Pose-Pipeline (Linux/Container & Windows). Funktionales Äquivalent zu run/run_pipeline.bat: 1 ArUco-Detektion (1_detect_aruco_observations.py) 2 Kamera-Posen (2_estimate_camera_from_observations.py) 3 Multi-View (center) (3_multiview_bundle_adjustment_v4.py) 3b Eck-Marker-Posen (3b_corner_marker_poses.py) 4 Pose-Estimation (pose_estimation.py) Aufruf: python pipeline/run_pipeline.py data/simulation/Scene8 python pipeline/run_pipeline.py data/simulation/Scene8 --robot data/robot/robot.json python pipeline/run_pipeline.py data/simulation/Scene8 --cameras a,c,f Im Container (Pipeline-Image) ist das der Standard-Einstiegspunkt. """ from __future__ import annotations import argparse import glob import os import re import subprocess import sys from pathlib import Path PIPE = Path(__file__).resolve().parent # .../pipeline ROOT = PIPE.parent # Repo-Wurzel PY = sys.executable def run(cmd) -> None: print(" $", " ".join(str(c) for c in cmd)) r = subprocess.run(cmd) if r.returncode != 0: print(f"[ERROR] Schritt fehlgeschlagen (exit {r.returncode})") sys.exit(r.returncode) def cam_id_of(path: str): m = re.match(r"render_([A-Za-z0-9]+)\.png", os.path.basename(path)) return m.group(1) if m else None def main() -> None: ap = argparse.ArgumentParser(description="Roboter-Pose-Pipeline: Bilder -> robot_state.json") ap.add_argument("scene", help="Bildordner, z.B. data/simulation/Scene8") ap.add_argument("--robot", default=os.environ.get("ROBOT_JSON", str(ROOT / "data" / "robot" / "robot.json"))) ap.add_argument("--evalDir", default=None, help="Ausgabeordner (Default: data/evaluations/)") ap.add_argument("--lambdaWeight", default="100.0") ap.add_argument("--cameras", default=None, help="Kommagetrennte Kamera-IDs, z.B. a,b,d (Standard: alle)") args = ap.parse_args() scene = Path(args.scene).resolve() eval_dir = Path(args.evalDir).resolve() if args.evalDir else (ROOT / "data" / "evaluations" / scene.name) eval_dir.mkdir(parents=True, exist_ok=True) robot = args.robot imgs = sorted(glob.glob(str(scene / "render_*.png"))) if args.cameras: wanted = set(args.cameras.split(",")) imgs = [img for img in imgs if cam_id_of(img) in wanted] if not imgs: print(f"[ERROR] keine passenden render_*.png in {scene}") sys.exit(1) cam_ids = [cam_id_of(i) for i in imgs] print(f"[INFO] Scene={scene.name} Kameras={cam_ids} -> {eval_dir}") # ── Schritt 1: ArUco-Detektion (pro Bild seine EIGENE npz) ── print("\n[STEP 1] ArUco-Detektion") for img in imgs: cid = cam_id_of(img) if not cid: continue npz = scene / f"render_{cid}.npz" if not npz.exists(): npz = scene / "render_a.npz" # Fallback (gleiche Intrinsik in der Sim) run([PY, str(PIPE / "1_detect_aruco_observations.py"), "-i", img, "-npz", str(npz), "-outDir", str(eval_dir), "-robot", robot, "-cameraId", cid]) # ── Schritt 2: Kamera-Posen ── print("\n[STEP 2] Kamera-Posen") dets = sorted(glob.glob(str(eval_dir / "*_aruco_detection.json"))) for d in dets: run([PY, str(PIPE / "2_estimate_camera_from_observations.py"), "-i", d, "-robot", robot, "-outDir", str(eval_dir)]) # ── Schritt 3: Multi-View Triangulation (center, für Viewer/Vergleich) ── print("\n[STEP 3] Multi-View Bundle Adjustment") poses = sorted(glob.glob(str(eval_dir / "*_camera_pose.json"))) det_args, pose_args = [], [] for d in dets: det_args += ["-det", d] for p in poses: pose_args += ["-pose", p] run([PY, str(PIPE / "3_multiview_bundle_adjustment_v4.py"), "-robot", robot, "-lambdaWeight", args.lambdaWeight] + det_args + pose_args) # ── Schritt 3b: Eck-Marker-Posen (Position + gemessene Normale) ── print("\n[STEP 3b] Eck-Marker-Posen") run([PY, str(PIPE / "3b_corner_marker_poses.py"), "--evalDir", str(eval_dir), "--robot", robot]) # ── Schritt 4: Pose-Estimation (Gelenkwinkel) ── print("\n[STEP 4] Pose-Estimation") run([PY, str(PIPE / "pose_estimation.py"), str(eval_dir / "aruco_marker_poses.json"), "-robot", robot, "-out", str(eval_dir / "robot_state.json")]) print(f"\n[DONE] -> {eval_dir / 'robot_state.json'}") if __name__ == "__main__": main()