#!/usr/bin/env python3 """ jRun.py – Test-Pipeline: detect → camera-pose → ArUco-CSV Ablauf: 1. pipeline/1_detect_aruco_observations.py pro Bild → test/temp/ 2. pipeline/2_estimate_camera_from_observations.py pro Bild → test/temp/ 3. Positionen + Normalen per solvePnP aus Beobachtungen ableiten, über alle Kameras mitteln, nach ID sortieren 4. test/temp/detections.csv schreiben: id, set, x, y, z, nx, ny, nz Einheiten: x/y/z in mm (Weltframe des Roboters), nx/ny/nz dimensionslos. """ from __future__ import annotations import csv import glob import json import os import re import subprocess import sys from typing import Dict, List, Optional, Tuple import cv2 import numpy as np # --------------------------------------------------------------------------- # Pfade # --------------------------------------------------------------------------- SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) PROJECT_ROOT = os.path.dirname(SCRIPT_DIR) PIPELINE_DIR = os.path.join(PROJECT_ROOT, "pipeline") DATA_DIR = os.path.join(PROJECT_ROOT, "data", "testPictures") TEMP_DIR = os.path.join(SCRIPT_DIR, "temp") # --------------------------------------------------------------------------- # Hilfsfunktionen – Dateien # --------------------------------------------------------------------------- def find_images() -> List[str]: return sorted(glob.glob(os.path.join(DATA_DIR, "cam*_hires_*.jpg"))) def cam_id_from_path(img_path: str) -> str: m = re.match(r"(cam\d+)_", os.path.basename(img_path)) if not m: raise ValueError(f"Kein Camera-ID in Dateiname: {img_path}") return m.group(1) def npz_path(cam_id: str) -> str: p = os.path.join(DATA_DIR, f"calibration_{cam_id}.npz") if not os.path.exists(p): raise FileNotFoundError(f"Kalibrierung nicht gefunden: {p}") return p def find_robot_json() -> str: candidates = glob.glob(os.path.join(DATA_DIR, "robot*.json")) if not candidates: raise FileNotFoundError(f"Kein robot*.json in {DATA_DIR}") return sorted(candidates)[0] def detection_json_path(img_path: str) -> str: base = os.path.splitext(os.path.basename(img_path))[0] return os.path.join(TEMP_DIR, f"{base}_aruco_detection.json") def camera_pose_json_path(img_path: str) -> str: base = os.path.splitext(os.path.basename(img_path))[0] return os.path.join(TEMP_DIR, f"{base}_camera_pose.json") # --------------------------------------------------------------------------- # Subprocess-Wrapper # --------------------------------------------------------------------------- def run_step(cmd: List[str]) -> None: print(f"\n>>> {' '.join(cmd)}") r = subprocess.run(cmd, text=True) if r.returncode != 0: raise RuntimeError(f"Befehl fehlgeschlagen (exit {r.returncode})") # --------------------------------------------------------------------------- # Marker-Klassifizierung aus robot.json # --------------------------------------------------------------------------- def load_marker_set_map(robot_path: str) -> Dict[int, str]: """ Gibt marker_id -> Set-Label zurück. 'set'-Marker → set_name (z.B. 'Brett', 'A0') Arm/Loose → Link-Name """ if PIPELINE_DIR not in sys.path: sys.path.insert(0, PIPELINE_DIR) # Import erst nach sys.path-Erweiterung from marker_sets import classify_markers # noqa: PLC0415 with open(robot_path, "r", encoding="utf-8") as f: robot_data = json.load(f) classification = classify_markers(robot_data) result: Dict[int, str] = {} for mid, info in classification.items(): if info.role == "set" and info.set_name: result[mid] = info.set_name else: result[mid] = info.link return result def load_a0_reference(robot_path: str) -> Dict[int, np.ndarray]: """ Welt-Referenz (x, y) in mm für die A0-Marker aus robot.json. Nur die A0-Marker definieren (momentan) die Welt-Koordinaten, daher wird der dxy-Abgleich ausschließlich auf sie beschränkt. Position steht im Board-Frame = Welt-Frame (Board ist Wurzel-Link bei [0,0,0]). """ if PIPELINE_DIR not in sys.path: sys.path.insert(0, PIPELINE_DIR) from marker_sets import classify_markers # noqa: PLC0415 with open(robot_path, "r", encoding="utf-8") as f: robot_data = json.load(f) ref: Dict[int, np.ndarray] = {} for mid, info in classify_markers(robot_data).items(): if info.role == "set" and info.set_name == "A0": ref[mid] = np.array(info.position[:2], dtype=float) return ref # --------------------------------------------------------------------------- # Positionen + Normalen per solvePnP # --------------------------------------------------------------------------- def _marker_local_corners(marker_size_m: float) -> np.ndarray: h = marker_size_m / 2.0 return np.array([ [-h, h, 0.0], [ h, h, 0.0], [ h, -h, 0.0], [-h, -h, 0.0], ], dtype=np.float32) def estimate_marker_poses( det_json: dict, pose_json: dict, ) -> Dict[int, Tuple[np.ndarray, np.ndarray]]: """ Gibt pro erkannter Marker-ID: (pos_mm [3], normal_world [3]) zurück. pos_mm: Marker-Mittelpunkt im Weltframe, Einheit mm normal_world: normierter Normalenvektor (Marker-Z-Achse im Weltframe) """ K = np.array(det_json["camera"]["camera_matrix"], dtype=np.float64).reshape(3, 3) D = np.array(det_json["camera"]["distortion_coefficients"], dtype=np.float64).reshape(-1, 1) marker_size_m = float( det_json.get("vision_config", {}).get("MarkerSize", 0.025) ) w2c = pose_json["camera_pose"]["world_to_camera"] R_wc = np.array(w2c["rotation_matrix"], dtype=np.float64).reshape(3, 3) t_wc = np.array(w2c["translation_m"], dtype=np.float64).reshape(3) obj_corners = _marker_local_corners(marker_size_m) result: Dict[int, Tuple[np.ndarray, np.ndarray]] = {} for det in det_json.get("detections", []): mid = int(det["marker_id"]) corners_px = np.array( det["image_points_px"], dtype=np.float32 ).reshape(4, 2) ok, rvec, tvec = cv2.solvePnP( obj_corners, corners_px, K.astype(np.float32), D.astype(np.float32), flags=cv2.SOLVEPNP_IPPE_SQUARE, ) if not ok: ok, rvec, tvec = cv2.solvePnP( obj_corners, corners_px, K.astype(np.float32), D.astype(np.float32), flags=cv2.SOLVEPNP_ITERATIVE, ) if not ok: continue tvec_f = tvec.reshape(3).astype(np.float64) R_mc, _ = cv2.Rodrigues(rvec.reshape(3, 1)) R_mc = R_mc.astype(np.float64) # Weltframe-Position: x_world = R_wc.T @ (x_cam - t_wc) pos_world_m = R_wc.T @ (tvec_f - t_wc) # Marker-Normale (Z-Achse) im Weltframe normal_cam = R_mc[:, 2] normal_world = R_wc.T @ normal_cam nlen = np.linalg.norm(normal_world) if nlen > 1e-9: normal_world /= nlen result[mid] = (pos_world_m * 1000.0, normal_world) return result # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: os.makedirs(TEMP_DIR, exist_ok=True) images = find_images() if not images: print(f"[FEHLER] Keine cam*_hires_*.jpg in {DATA_DIR}") sys.exit(1) robot_path = find_robot_json() print(f"Robot: {os.path.basename(robot_path)}") print(f"Bilder: {[os.path.basename(i) for i in images]}") print(f"Ausgabe-Ordner: {TEMP_DIR}") # ------------------------------------------------------------------ # Schritt 1: ArUco-Erkennung # ------------------------------------------------------------------ print("\n" + "=" * 60) print("Schritt 1: ArUco-Erkennung") print("=" * 60) for img in images: cam_id = cam_id_from_path(img) run_step([ sys.executable, os.path.join(PIPELINE_DIR, "1_detect_aruco_observations.py"), "-i", img, "-npz", npz_path(cam_id), "-robot", robot_path, "-cameraId", cam_id, "-outDir", TEMP_DIR, "--saveDebugImage", ]) # ------------------------------------------------------------------ # Schritt 2: Kamera-Positionen # ------------------------------------------------------------------ print("\n" + "=" * 60) print("Schritt 2: Kamera-Poses") print("=" * 60) for img in images: run_step([ sys.executable, os.path.join(PIPELINE_DIR, "2_estimate_camera_from_observations.py"), "-i", detection_json_path(img), "-robot", robot_path, "-outDir", TEMP_DIR, ]) # ------------------------------------------------------------------ # Schritt 3: Beobachtungen zusammenführen # ------------------------------------------------------------------ print("\n" + "=" * 60) print("Schritt 3: Positionen + Normalen") print("=" * 60) marker_set_map = load_marker_set_map(robot_path) a0_ref = load_a0_reference(robot_path) # mid -> Liste von (pos_mm, normal) observations: Dict[int, List[Tuple[np.ndarray, np.ndarray]]] = {} # cam_id -> Welt-Position (mm) / QA-Metadaten camera_positions: Dict[str, np.ndarray] = {} camera_meta: Dict[str, dict] = {} for img in images: det_path = detection_json_path(img) pose_path = camera_pose_json_path(img) if not os.path.exists(det_path): print(f"[WARN] Fehlend: {det_path}") continue if not os.path.exists(pose_path): print(f"[WARN] Fehlend: {pose_path}") continue with open(det_path, "r", encoding="utf-8") as f: det_json = json.load(f) with open(pose_path, "r", encoding="utf-8") as f: pose_json = json.load(f) cam_id = cam_id_from_path(img) ciw = pose_json["camera_pose"]["camera_in_world"]["position_mm"] camera_positions[cam_id] = np.array(ciw, dtype=float) est = pose_json.get("estimation", {}) camera_meta[cam_id] = { "rms_px": est.get("residual_rms_px"), "num_markers": est.get("num_used_markers"), } cam_name = os.path.basename(img) poses = estimate_marker_poses(det_json, pose_json) print(f" {cam_name}: {len(poses)} Marker mit Pose") for mid, pn in poses.items(): observations.setdefault(mid, []).append(pn) # ------------------------------------------------------------------ # Mittelwert über alle Kameras + CSV schreiben # ------------------------------------------------------------------ rows = [] for mid in sorted(observations.keys()): obs_list = observations[mid] positions = np.array([p for p, _ in obs_list]) normals = np.array([n for _, n in obs_list]) pos_mean = positions.mean(axis=0) n_sum = normals.sum(axis=0) n_norm = np.linalg.norm(n_sum) normal_mean = n_sum / n_norm if n_norm > 1e-9 else np.array([0.0, 0.0, 1.0]) set_label = marker_set_map.get(mid, "?") # dxy: planarer Abgleich gegen robot.json — nur für A0 definiert if mid in a0_ref: d = pos_mean[:2] - a0_ref[mid] dxy = round(float(np.hypot(d[0], d[1])), 2) else: dxy = "" rows.append({ "id": mid, "set": set_label, "SeenByCount": len(obs_list), "x": round(float(pos_mean[0]), 2), "y": round(float(pos_mean[1]), 2), "z": round(float(pos_mean[2]), 2), "nx": round(float(normal_mean[0]), 4), "ny": round(float(normal_mean[1]), 4), "nz": round(float(normal_mean[2]), 4), "dxy": dxy, }) # Kamera-Positionen als eigene Zeilen (oben in der CSV) camera_rows = [] for cam_id in sorted(camera_positions.keys()): pos = camera_positions[cam_id] camera_rows.append({ "id": cam_id, "set": "CAMERA", "SeenByCount": "", "x": round(float(pos[0]), 2), "y": round(float(pos[1]), 2), "z": round(float(pos[2]), 2), "nx": "", "ny": "", "nz": "", "dxy": "", }) csv_path = os.path.join(TEMP_DIR, "detections.csv") fieldnames = ["id", "set", "SeenByCount", "x", "y", "z", "nx", "ny", "nz", "dxy"] with open(csv_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(camera_rows + rows) # ------------------------------------------------------------------ # Konsolenübersicht # ------------------------------------------------------------------ print(f"\nGeschrieben: {csv_path} " f"({len(rows)} Marker + {len(camera_rows)} Kameras)\n") # Kamera-Positionen + Reprojektions-RMS (schlechte Kamera erkennen) print("Kamera-Positionen (Welt, mm):") chdr = f"{'cam':>5} {'x':>9} {'y':>9} {'z':>9} {'rms_px':>7} {'#mk':>4}" print(chdr) print("-" * len(chdr)) for cam_id in sorted(camera_positions.keys()): pos = camera_positions[cam_id] meta = camera_meta.get(cam_id, {}) rms = meta.get("rms_px") nmk = meta.get("num_markers") rms_s = f"{rms:7.2f}" if rms is not None else " n/a" print(f"{cam_id:>5} {pos[0]:>9.1f} {pos[1]:>9.1f} {pos[2]:>9.1f} " f"{rms_s} {str(nmk):>4}") # Marker-Tabelle print() hdr = (f"{'id':>5} {'set':<12} {'cams':>4} {'x':>8} {'y':>8} {'z':>8} " f"{'nx':>7} {'ny':>7} {'nz':>7} {'dxy':>7}") print(hdr) print("-" * len(hdr)) for row in rows: dxy_s = f"{row['dxy']:7.2f}" if row['dxy'] != "" else " -" print( f"{row['id']:>5} {row['set']:<12} {row['SeenByCount']:>4} " f"{row['x']:>8.1f} {row['y']:>8.1f} {row['z']:>8.1f} " f"{row['nx']:>7.4f} {row['ny']:>7.4f} {row['nz']:>7.4f} {dxy_s}" ) # A0-Abgleich-Statistik (planarer Fehler gegen robot.json) dxys = np.array([row["dxy"] for row in rows if row["dxy"] != ""], dtype=float) if dxys.size: print(f"\nA0 dxy (Welt-Abgleich, mm): n={dxys.size} " f"mean={dxys.mean():.2f} median={np.median(dxys):.2f} " f"max={dxys.max():.2f}") if __name__ == "__main__": main()