docker probleme
This commit is contained in:
183
scripts/orchestrator.py
Normal file
183
scripts/orchestrator.py
Normal file
@@ -0,0 +1,183 @@
|
||||
"""
|
||||
orchestrator.py
|
||||
===============
|
||||
Orchestrator — ruft die Pipeline-Schritte als Subprocess auf und gibt
|
||||
ein strukturiertes PipelineResult zurück.
|
||||
|
||||
Die einzelnen Schritt-Skripte liegen im Verzeichnis ``scripts/pipeline/``
|
||||
und werden per Subprocess über ihren Dateipfad aufgerufen (s. ``SCRIPTS``).
|
||||
|
||||
Schritte:
|
||||
1 ArUco-Detektion (scripts/pipeline/1_detect_aruco_observations.py)
|
||||
2 Kamera-Posen (scripts/pipeline/2_estimate_camera_from_observations.py)
|
||||
3 Multi-View Triangulation (scripts/pipeline/3_multiview_bundle_adjustment_v4.py)
|
||||
3b Eck-Marker-Posen (scripts/pipeline/3b_corner_marker_poses.py)
|
||||
4 Pose-Estimation (scripts/pipeline/pose_estimation.py)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
SCRIPTS = Path(__file__).parent / "pipeline"
|
||||
PY = sys.executable
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineResult:
|
||||
"""Ergebnis der Pose-Schätzung."""
|
||||
joints: Dict[str, float] # x,y,z,a,b,c,e → Wert (mm oder °)
|
||||
confidence: Dict[str, str] # x,y,z,a,b,c,e → high|medium|low|none
|
||||
n_markers: int # Anzahl triangulierter Marker
|
||||
residual_rms: float # Residuum der Schätzung (mm)
|
||||
processing_ms: float # Laufzeit der Pipeline
|
||||
robot_state_path: Optional[Path] = None # Pfad zur erzeugten robot_state.json
|
||||
errors: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
def _run(cmd: list, step: str) -> None:
|
||||
"""Subprocess-Aufruf mit Fehlerbehandlung."""
|
||||
r = subprocess.run(cmd, capture_output=True, text=True)
|
||||
if r.returncode != 0:
|
||||
raise RuntimeError(f"[{step}] exit {r.returncode}:\n{r.stderr.strip()[-800:]}")
|
||||
|
||||
|
||||
def _cam_id(path: str) -> Optional[str]:
|
||||
m = re.match(r"render_([A-Za-z0-9]+)\.(png|jpg|jpeg)", os.path.basename(path), re.I)
|
||||
return m.group(1) if m else None
|
||||
|
||||
|
||||
def estimate_from_dir(
|
||||
image_dir: str | Path,
|
||||
robot_json: str | Path | None = None,
|
||||
eval_dir: str | Path | None = None,
|
||||
lambda_weight: float = 100.0,
|
||||
camera_filter: Optional[List[str]] = None,
|
||||
) -> PipelineResult:
|
||||
"""
|
||||
Pose-Schätzung aus einem Bildordner.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
image_dir Ordner mit render_*.png und render_*.npz
|
||||
robot_json Pfad zu robot.json (Default: ROBOT_JSON env oder Exception)
|
||||
eval_dir Ausgabeordner (Default: temporäres Verzeichnis)
|
||||
lambda_weight Constraint-Gewicht für Bundle Adjustment
|
||||
camera_filter Liste von Kamera-IDs; None = alle
|
||||
|
||||
Returns
|
||||
-------
|
||||
PipelineResult
|
||||
"""
|
||||
t0 = time.time()
|
||||
image_dir = Path(image_dir).resolve()
|
||||
|
||||
# robot.json bestimmen
|
||||
if robot_json is None:
|
||||
robot_json = os.environ.get("ROBOT_JSON")
|
||||
if not robot_json:
|
||||
raise ValueError("robot_json muss angegeben werden oder ROBOT_JSON env gesetzt sein")
|
||||
robot_json = Path(robot_json).resolve()
|
||||
|
||||
# Ausgabeordner
|
||||
_tmp = None
|
||||
if eval_dir is None:
|
||||
_tmp = tempfile.mkdtemp(prefix="approbot_")
|
||||
eval_dir = Path(_tmp)
|
||||
else:
|
||||
eval_dir = Path(eval_dir).resolve()
|
||||
eval_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
errors = []
|
||||
|
||||
try:
|
||||
# Bilder sammeln
|
||||
imgs = sorted(
|
||||
glob.glob(str(image_dir / "render_*.png")) +
|
||||
glob.glob(str(image_dir / "render_*.PNG")) +
|
||||
glob.glob(str(image_dir / "render_*.jpg")) +
|
||||
glob.glob(str(image_dir / "render_*.jpeg"))
|
||||
)
|
||||
if camera_filter:
|
||||
imgs = [i for i in imgs if _cam_id(i) in set(camera_filter)]
|
||||
if not imgs:
|
||||
raise FileNotFoundError(f"Keine render_*.png/jpg in {image_dir}")
|
||||
|
||||
# ── Schritt 1: ArUco-Detektion ──────────────────────────────
|
||||
for img in imgs:
|
||||
cid = _cam_id(img)
|
||||
if not cid:
|
||||
continue
|
||||
npz = image_dir / f"render_{cid}.npz"
|
||||
if not npz.exists():
|
||||
npz_candidates = list(image_dir.glob("*.npz"))
|
||||
if not npz_candidates:
|
||||
raise FileNotFoundError(f"Keine .npz-Intrinsik in {image_dir}")
|
||||
npz = npz_candidates[0]
|
||||
_run([PY, str(SCRIPTS / "1_detect_aruco_observations.py"),
|
||||
"-i", str(img), "-npz", str(npz),
|
||||
"-outDir", str(eval_dir), "-robot", str(robot_json), "-cameraId", cid],
|
||||
"Schritt 1")
|
||||
|
||||
# ── Schritt 2: Kamera-Posen ──────────────────────────────────
|
||||
dets = sorted(glob.glob(str(eval_dir / "*_aruco_detection.json")))
|
||||
if not dets:
|
||||
raise RuntimeError("Keine ArUco-Detektionen erzeugt (Schritt 1)")
|
||||
for d in dets:
|
||||
_run([PY, str(SCRIPTS / "2_estimate_camera_from_observations.py"),
|
||||
"-i", d, "-robot", str(robot_json), "-outDir", str(eval_dir)],
|
||||
"Schritt 2")
|
||||
|
||||
# ── Schritt 3: Multi-View Triangulation ──────────────────────
|
||||
poses = sorted(glob.glob(str(eval_dir / "*_camera_pose.json")))
|
||||
if not poses:
|
||||
raise RuntimeError("Keine Kamera-Posen erzeugt (Schritt 2)")
|
||||
det_args = sum([["-det", d] for d in dets], [])
|
||||
pose_args = sum([["-pose", p] for p in poses], [])
|
||||
_run([PY, str(SCRIPTS / "3_multiview_bundle_adjustment_v4.py"),
|
||||
"-robot", str(robot_json), "-lambdaWeight", str(lambda_weight)]
|
||||
+ det_args + pose_args, "Schritt 3")
|
||||
|
||||
# ── Schritt 3b: Eck-Marker-Posen ─────────────────────────────
|
||||
_run([PY, str(SCRIPTS / "3b_corner_marker_poses.py"),
|
||||
"--evalDir", str(eval_dir), "--robot", str(robot_json)], "Schritt 3b")
|
||||
|
||||
# ── Schritt 4: Pose-Estimation ────────────────────────────────
|
||||
marker_poses = eval_dir / "aruco_marker_poses.json"
|
||||
state_out = eval_dir / "robot_state.json"
|
||||
_run([PY, str(SCRIPTS / "pose_estimation.py"),
|
||||
str(marker_poses), "-robot", str(robot_json), "-out", str(state_out)],
|
||||
"Schritt 4")
|
||||
|
||||
# Ergebnis lesen
|
||||
state = json.load(open(state_out, "r", encoding="utf-8"))
|
||||
mv = state.get("movements", {})
|
||||
joints = {k: float(v.get("value", 0.0)) for k, v in mv.items() if isinstance(v, dict)}
|
||||
confidence = {k: str(v.get("confidence", "none")) for k, v in mv.items() if isinstance(v, dict)}
|
||||
|
||||
return PipelineResult(
|
||||
joints=joints,
|
||||
confidence=confidence,
|
||||
n_markers=int(state.get("num_markers", 0)),
|
||||
residual_rms=float(state.get("residual_rms", 0.0)),
|
||||
processing_ms=round((time.time() - t0) * 1000),
|
||||
robot_state_path=state_out,
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
except Exception as exc:
|
||||
errors.append(str(exc))
|
||||
raise
|
||||
finally:
|
||||
# Temporäres Verzeichnis bleibt absichtlich erhalten für Debugging;
|
||||
# Aufrufer kann es über result.robot_state_path.parent aufräumen.
|
||||
pass
|
||||
Reference in New Issue
Block a user