""" orchestrator.py =============== Orchestrator — ruft die Pipeline-Schritte als Subprocess auf und gibt ein strukturiertes PipelineResult zurück. Die einzelnen Schritt-Skripte liegen im Verzeichnis ``scripts/pipeline/`` und werden per Subprocess über ihren Dateipfad aufgerufen (s. ``SCRIPTS``). Schritte: 1 ArUco-Detektion (scripts/pipeline/1_detect_aruco_observations.py) 2 Kamera-Posen (scripts/pipeline/2_estimate_camera_from_observations.py) 3 Multi-View Triangulation (scripts/pipeline/3_multiview_bundle_adjustment_v4.py) 3b Eck-Marker-Posen (scripts/pipeline/3b_corner_marker_poses.py) 4 Pose-Estimation (scripts/pipeline/pose_estimation.py) """ from __future__ import annotations import glob import json import os import re import subprocess import sys import tempfile import time from dataclasses import dataclass, field from pathlib import Path from typing import Dict, List, Optional SCRIPTS = Path(__file__).parent / "pipeline" PY = sys.executable @dataclass class PipelineResult: """Ergebnis der Pose-Schätzung.""" joints: Dict[str, float] # x,y,z,a,b,c,e → Wert (mm oder °) confidence: Dict[str, str] # x,y,z,a,b,c,e → high|medium|low|none n_markers: int # Anzahl triangulierter Marker residual_rms: float # Residuum der Schätzung (mm) processing_ms: float # Laufzeit der Pipeline robot_state_path: Optional[Path] = None # Pfad zur erzeugten robot_state.json errors: List[str] = field(default_factory=list) def _run(cmd: list, step: str) -> None: """Subprocess-Aufruf mit Fehlerbehandlung.""" r = subprocess.run(cmd, capture_output=True, text=True) if r.returncode != 0: raise RuntimeError(f"[{step}] exit {r.returncode}:\n{r.stderr.strip()[-800:]}") def _cam_id(path: str) -> Optional[str]: m = re.match(r"render_([A-Za-z0-9]+)\.(png|jpg|jpeg)", os.path.basename(path), re.I) return m.group(1) if m else None def estimate_from_dir( image_dir: str | Path, robot_json: str | Path | None = None, eval_dir: str | Path | None = None, lambda_weight: float = 100.0, camera_filter: Optional[List[str]] = None, ) -> PipelineResult: """ Pose-Schätzung aus einem Bildordner. Parameters ---------- image_dir Ordner mit render_*.png und render_*.npz robot_json Pfad zu robot.json (Default: ROBOT_JSON env oder Exception) eval_dir Ausgabeordner (Default: temporäres Verzeichnis) lambda_weight Constraint-Gewicht für Bundle Adjustment camera_filter Liste von Kamera-IDs; None = alle Returns ------- PipelineResult """ t0 = time.time() image_dir = Path(image_dir).resolve() # robot.json bestimmen if robot_json is None: robot_json = os.environ.get("ROBOT_JSON") if not robot_json: raise ValueError("robot_json muss angegeben werden oder ROBOT_JSON env gesetzt sein") robot_json = Path(robot_json).resolve() # Ausgabeordner _tmp = None if eval_dir is None: _tmp = tempfile.mkdtemp(prefix="approbot_") eval_dir = Path(_tmp) else: eval_dir = Path(eval_dir).resolve() eval_dir.mkdir(parents=True, exist_ok=True) errors = [] try: # Bilder sammeln imgs = sorted( glob.glob(str(image_dir / "render_*.png")) + glob.glob(str(image_dir / "render_*.PNG")) + glob.glob(str(image_dir / "render_*.jpg")) + glob.glob(str(image_dir / "render_*.jpeg")) ) if camera_filter: imgs = [i for i in imgs if _cam_id(i) in set(camera_filter)] if not imgs: raise FileNotFoundError(f"Keine render_*.png/jpg in {image_dir}") # ── Schritt 1: ArUco-Detektion ────────────────────────────── for img in imgs: cid = _cam_id(img) if not cid: continue npz = image_dir / f"render_{cid}.npz" if not npz.exists(): npz_candidates = list(image_dir.glob("*.npz")) if not npz_candidates: raise FileNotFoundError(f"Keine .npz-Intrinsik in {image_dir}") npz = npz_candidates[0] _run([PY, str(SCRIPTS / "1_detect_aruco_observations.py"), "-i", str(img), "-npz", str(npz), "-outDir", str(eval_dir), "-robot", str(robot_json), "-cameraId", cid], "Schritt 1") # ── Schritt 2: Kamera-Posen ────────────────────────────────── dets = sorted(glob.glob(str(eval_dir / "*_aruco_detection.json"))) if not dets: raise RuntimeError("Keine ArUco-Detektionen erzeugt (Schritt 1)") for d in dets: _run([PY, str(SCRIPTS / "2_estimate_camera_from_observations.py"), "-i", d, "-robot", str(robot_json), "-outDir", str(eval_dir)], "Schritt 2") # ── Schritt 3: Multi-View Triangulation ────────────────────── poses = sorted(glob.glob(str(eval_dir / "*_camera_pose.json"))) if not poses: raise RuntimeError("Keine Kamera-Posen erzeugt (Schritt 2)") det_args = sum([["-det", d] for d in dets], []) pose_args = sum([["-pose", p] for p in poses], []) _run([PY, str(SCRIPTS / "3_multiview_bundle_adjustment_v4.py"), "-robot", str(robot_json), "-lambdaWeight", str(lambda_weight)] + det_args + pose_args, "Schritt 3") # ── Schritt 3b: Eck-Marker-Posen ───────────────────────────── _run([PY, str(SCRIPTS / "3b_corner_marker_poses.py"), "--evalDir", str(eval_dir), "--robot", str(robot_json)], "Schritt 3b") # ── Schritt 4: Pose-Estimation ──────────────────────────────── marker_poses = eval_dir / "aruco_marker_poses.json" state_out = eval_dir / "robot_state.json" _run([PY, str(SCRIPTS / "pose_estimation.py"), str(marker_poses), "-robot", str(robot_json), "-out", str(state_out)], "Schritt 4") # Ergebnis lesen state = json.load(open(state_out, "r", encoding="utf-8")) mv = state.get("movements", {}) joints = {k: float(v.get("value", 0.0)) for k, v in mv.items() if isinstance(v, dict)} confidence = {k: str(v.get("confidence", "none")) for k, v in mv.items() if isinstance(v, dict)} return PipelineResult( joints=joints, confidence=confidence, n_markers=int(state.get("num_markers", 0)), residual_rms=float(state.get("residual_rms", 0.0)), processing_ms=round((time.time() - t0) * 1000), robot_state_path=state_out, errors=errors, ) except Exception as exc: errors.append(str(exc)) raise finally: # Temporäres Verzeichnis bleibt absichtlich erhalten für Debugging; # Aufrufer kann es über result.robot_state_path.parent aufräumen. pass