Files
appRobotRender/test/jRun.py
2026-06-19 06:44:46 +02:00

425 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
jRun.py Test-Pipeline: detect → camera-pose → ArUco-CSV
Ablauf:
1. pipeline/1_detect_aruco_observations.py pro Bild → test/temp/
2. pipeline/2_estimate_camera_from_observations.py pro Bild → test/temp/
3. Positionen + Normalen per solvePnP aus Beobachtungen ableiten,
über alle Kameras mitteln, nach ID sortieren
4. test/temp/detections.csv schreiben: id, set, x, y, z, nx, ny, nz
Einheiten: x/y/z in mm (Weltframe des Roboters), nx/ny/nz dimensionslos.
"""
from __future__ import annotations
import csv
import glob
import json
import os
import re
import subprocess
import sys
from typing import Dict, List, Optional, Tuple
import cv2
import numpy as np
# ---------------------------------------------------------------------------
# Pfade
# ---------------------------------------------------------------------------
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.dirname(SCRIPT_DIR)
PIPELINE_DIR = os.path.join(PROJECT_ROOT, "pipeline")
DATA_DIR = os.path.join(PROJECT_ROOT, "data", "testPictures")
TEMP_DIR = os.path.join(SCRIPT_DIR, "temp")
# ---------------------------------------------------------------------------
# Hilfsfunktionen Dateien
# ---------------------------------------------------------------------------
def find_images() -> List[str]:
return sorted(glob.glob(os.path.join(DATA_DIR, "cam*_hires_*.jpg")))
def cam_id_from_path(img_path: str) -> str:
m = re.match(r"(cam\d+)_", os.path.basename(img_path))
if not m:
raise ValueError(f"Kein Camera-ID in Dateiname: {img_path}")
return m.group(1)
def npz_path(cam_id: str) -> str:
p = os.path.join(DATA_DIR, f"calibration_{cam_id}.npz")
if not os.path.exists(p):
raise FileNotFoundError(f"Kalibrierung nicht gefunden: {p}")
return p
def find_robot_json() -> str:
candidates = glob.glob(os.path.join(DATA_DIR, "robot*.json"))
if not candidates:
raise FileNotFoundError(f"Kein robot*.json in {DATA_DIR}")
return sorted(candidates)[0]
def detection_json_path(img_path: str) -> str:
base = os.path.splitext(os.path.basename(img_path))[0]
return os.path.join(TEMP_DIR, f"{base}_aruco_detection.json")
def camera_pose_json_path(img_path: str) -> str:
base = os.path.splitext(os.path.basename(img_path))[0]
return os.path.join(TEMP_DIR, f"{base}_camera_pose.json")
# ---------------------------------------------------------------------------
# Subprocess-Wrapper
# ---------------------------------------------------------------------------
def run_step(cmd: List[str]) -> None:
print(f"\n>>> {' '.join(cmd)}")
r = subprocess.run(cmd, text=True)
if r.returncode != 0:
raise RuntimeError(f"Befehl fehlgeschlagen (exit {r.returncode})")
# ---------------------------------------------------------------------------
# Marker-Klassifizierung aus robot.json
# ---------------------------------------------------------------------------
def load_marker_set_map(robot_path: str) -> Dict[int, str]:
"""
Gibt marker_id -> Set-Label zurück.
'set'-Marker → set_name (z.B. 'Brett', 'A0')
Arm/Loose → Link-Name
"""
if PIPELINE_DIR not in sys.path:
sys.path.insert(0, PIPELINE_DIR)
# Import erst nach sys.path-Erweiterung
from marker_sets import classify_markers # noqa: PLC0415
with open(robot_path, "r", encoding="utf-8") as f:
robot_data = json.load(f)
classification = classify_markers(robot_data)
result: Dict[int, str] = {}
for mid, info in classification.items():
if info.role == "set" and info.set_name:
result[mid] = info.set_name
else:
result[mid] = info.link
return result
def load_a0_reference(robot_path: str) -> Dict[int, np.ndarray]:
"""
Welt-Referenz (x, y) in mm für die A0-Marker aus robot.json.
Nur die A0-Marker definieren (momentan) die Welt-Koordinaten, daher wird
der dxy-Abgleich ausschließlich auf sie beschränkt. Position steht im
Board-Frame = Welt-Frame (Board ist Wurzel-Link bei [0,0,0]).
"""
if PIPELINE_DIR not in sys.path:
sys.path.insert(0, PIPELINE_DIR)
from marker_sets import classify_markers # noqa: PLC0415
with open(robot_path, "r", encoding="utf-8") as f:
robot_data = json.load(f)
ref: Dict[int, np.ndarray] = {}
for mid, info in classify_markers(robot_data).items():
if info.role == "set" and info.set_name == "A0":
ref[mid] = np.array(info.position[:2], dtype=float)
return ref
# ---------------------------------------------------------------------------
# Positionen + Normalen per solvePnP
# ---------------------------------------------------------------------------
def _marker_local_corners(marker_size_m: float) -> np.ndarray:
h = marker_size_m / 2.0
return np.array([
[-h, h, 0.0],
[ h, h, 0.0],
[ h, -h, 0.0],
[-h, -h, 0.0],
], dtype=np.float32)
def estimate_marker_poses(
det_json: dict,
pose_json: dict,
) -> Dict[int, Tuple[np.ndarray, np.ndarray]]:
"""
Gibt pro erkannter Marker-ID: (pos_mm [3], normal_world [3]) zurück.
pos_mm: Marker-Mittelpunkt im Weltframe, Einheit mm
normal_world: normierter Normalenvektor (Marker-Z-Achse im Weltframe)
"""
K = np.array(det_json["camera"]["camera_matrix"],
dtype=np.float64).reshape(3, 3)
D = np.array(det_json["camera"]["distortion_coefficients"],
dtype=np.float64).reshape(-1, 1)
marker_size_m = float(
det_json.get("vision_config", {}).get("MarkerSize", 0.025)
)
w2c = pose_json["camera_pose"]["world_to_camera"]
R_wc = np.array(w2c["rotation_matrix"], dtype=np.float64).reshape(3, 3)
t_wc = np.array(w2c["translation_m"], dtype=np.float64).reshape(3)
obj_corners = _marker_local_corners(marker_size_m)
result: Dict[int, Tuple[np.ndarray, np.ndarray]] = {}
for det in det_json.get("detections", []):
mid = int(det["marker_id"])
corners_px = np.array(
det["image_points_px"], dtype=np.float32
).reshape(4, 2)
ok, rvec, tvec = cv2.solvePnP(
obj_corners, corners_px,
K.astype(np.float32), D.astype(np.float32),
flags=cv2.SOLVEPNP_IPPE_SQUARE,
)
if not ok:
ok, rvec, tvec = cv2.solvePnP(
obj_corners, corners_px,
K.astype(np.float32), D.astype(np.float32),
flags=cv2.SOLVEPNP_ITERATIVE,
)
if not ok:
continue
tvec_f = tvec.reshape(3).astype(np.float64)
R_mc, _ = cv2.Rodrigues(rvec.reshape(3, 1))
R_mc = R_mc.astype(np.float64)
# Weltframe-Position: x_world = R_wc.T @ (x_cam - t_wc)
pos_world_m = R_wc.T @ (tvec_f - t_wc)
# Marker-Normale (Z-Achse) im Weltframe
normal_cam = R_mc[:, 2]
normal_world = R_wc.T @ normal_cam
nlen = np.linalg.norm(normal_world)
if nlen > 1e-9:
normal_world /= nlen
result[mid] = (pos_world_m * 1000.0, normal_world)
return result
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
os.makedirs(TEMP_DIR, exist_ok=True)
images = find_images()
if not images:
print(f"[FEHLER] Keine cam*_hires_*.jpg in {DATA_DIR}")
sys.exit(1)
robot_path = find_robot_json()
print(f"Robot: {os.path.basename(robot_path)}")
print(f"Bilder: {[os.path.basename(i) for i in images]}")
print(f"Ausgabe-Ordner: {TEMP_DIR}")
# ------------------------------------------------------------------
# Schritt 1: ArUco-Erkennung
# ------------------------------------------------------------------
print("\n" + "=" * 60)
print("Schritt 1: ArUco-Erkennung")
print("=" * 60)
for img in images:
cam_id = cam_id_from_path(img)
run_step([
sys.executable,
os.path.join(PIPELINE_DIR, "1_detect_aruco_observations.py"),
"-i", img,
"-npz", npz_path(cam_id),
"-robot", robot_path,
"-cameraId", cam_id,
"-outDir", TEMP_DIR,
"--saveDebugImage",
])
# ------------------------------------------------------------------
# Schritt 2: Kamera-Positionen
# ------------------------------------------------------------------
print("\n" + "=" * 60)
print("Schritt 2: Kamera-Poses")
print("=" * 60)
for img in images:
run_step([
sys.executable,
os.path.join(PIPELINE_DIR, "2_estimate_camera_from_observations.py"),
"-i", detection_json_path(img),
"-robot", robot_path,
"-outDir", TEMP_DIR,
])
# ------------------------------------------------------------------
# Schritt 3: Beobachtungen zusammenführen
# ------------------------------------------------------------------
print("\n" + "=" * 60)
print("Schritt 3: Positionen + Normalen")
print("=" * 60)
marker_set_map = load_marker_set_map(robot_path)
a0_ref = load_a0_reference(robot_path)
# mid -> Liste von (pos_mm, normal)
observations: Dict[int, List[Tuple[np.ndarray, np.ndarray]]] = {}
# cam_id -> Welt-Position (mm) / QA-Metadaten
camera_positions: Dict[str, np.ndarray] = {}
camera_meta: Dict[str, dict] = {}
for img in images:
det_path = detection_json_path(img)
pose_path = camera_pose_json_path(img)
if not os.path.exists(det_path):
print(f"[WARN] Fehlend: {det_path}")
continue
if not os.path.exists(pose_path):
print(f"[WARN] Fehlend: {pose_path}")
continue
with open(det_path, "r", encoding="utf-8") as f:
det_json = json.load(f)
with open(pose_path, "r", encoding="utf-8") as f:
pose_json = json.load(f)
cam_id = cam_id_from_path(img)
ciw = pose_json["camera_pose"]["camera_in_world"]["position_mm"]
camera_positions[cam_id] = np.array(ciw, dtype=float)
est = pose_json.get("estimation", {})
camera_meta[cam_id] = {
"rms_px": est.get("residual_rms_px"),
"num_markers": est.get("num_used_markers"),
}
cam_name = os.path.basename(img)
poses = estimate_marker_poses(det_json, pose_json)
print(f" {cam_name}: {len(poses)} Marker mit Pose")
for mid, pn in poses.items():
observations.setdefault(mid, []).append(pn)
# ------------------------------------------------------------------
# Mittelwert über alle Kameras + CSV schreiben
# ------------------------------------------------------------------
rows = []
for mid in sorted(observations.keys()):
obs_list = observations[mid]
positions = np.array([p for p, _ in obs_list])
normals = np.array([n for _, n in obs_list])
pos_mean = positions.mean(axis=0)
n_sum = normals.sum(axis=0)
n_norm = np.linalg.norm(n_sum)
normal_mean = n_sum / n_norm if n_norm > 1e-9 else np.array([0.0, 0.0, 1.0])
set_label = marker_set_map.get(mid, "?")
# dxy: planarer Abgleich gegen robot.json — nur für A0 definiert
if mid in a0_ref:
d = pos_mean[:2] - a0_ref[mid]
dxy = round(float(np.hypot(d[0], d[1])), 2)
else:
dxy = ""
rows.append({
"id": mid,
"set": set_label,
"SeenByCount": len(obs_list),
"x": round(float(pos_mean[0]), 2),
"y": round(float(pos_mean[1]), 2),
"z": round(float(pos_mean[2]), 2),
"nx": round(float(normal_mean[0]), 4),
"ny": round(float(normal_mean[1]), 4),
"nz": round(float(normal_mean[2]), 4),
"dxy": dxy,
})
# Kamera-Positionen als eigene Zeilen (oben in der CSV)
camera_rows = []
for cam_id in sorted(camera_positions.keys()):
pos = camera_positions[cam_id]
camera_rows.append({
"id": cam_id,
"set": "CAMERA",
"SeenByCount": "",
"x": round(float(pos[0]), 2),
"y": round(float(pos[1]), 2),
"z": round(float(pos[2]), 2),
"nx": "",
"ny": "",
"nz": "",
"dxy": "",
})
csv_path = os.path.join(TEMP_DIR, "detections.csv")
fieldnames = ["id", "set", "SeenByCount", "x", "y", "z", "nx", "ny", "nz", "dxy"]
with open(csv_path, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(camera_rows + rows)
# ------------------------------------------------------------------
# Konsolenübersicht
# ------------------------------------------------------------------
print(f"\nGeschrieben: {csv_path} "
f"({len(rows)} Marker + {len(camera_rows)} Kameras)\n")
# Kamera-Positionen + Reprojektions-RMS (schlechte Kamera erkennen)
print("Kamera-Positionen (Welt, mm):")
chdr = f"{'cam':>5} {'x':>9} {'y':>9} {'z':>9} {'rms_px':>7} {'#mk':>4}"
print(chdr)
print("-" * len(chdr))
for cam_id in sorted(camera_positions.keys()):
pos = camera_positions[cam_id]
meta = camera_meta.get(cam_id, {})
rms = meta.get("rms_px")
nmk = meta.get("num_markers")
rms_s = f"{rms:7.2f}" if rms is not None else " n/a"
print(f"{cam_id:>5} {pos[0]:>9.1f} {pos[1]:>9.1f} {pos[2]:>9.1f} "
f"{rms_s} {str(nmk):>4}")
# Marker-Tabelle
print()
hdr = (f"{'id':>5} {'set':<12} {'cams':>4} {'x':>8} {'y':>8} {'z':>8} "
f"{'nx':>7} {'ny':>7} {'nz':>7} {'dxy':>7}")
print(hdr)
print("-" * len(hdr))
for row in rows:
dxy_s = f"{row['dxy']:7.2f}" if row['dxy'] != "" else " -"
print(
f"{row['id']:>5} {row['set']:<12} {row['SeenByCount']:>4} "
f"{row['x']:>8.1f} {row['y']:>8.1f} {row['z']:>8.1f} "
f"{row['nx']:>7.4f} {row['ny']:>7.4f} {row['nz']:>7.4f} {dxy_s}"
)
# A0-Abgleich-Statistik (planarer Fehler gegen robot.json)
dxys = np.array([row["dxy"] for row in rows if row["dxy"] != ""], dtype=float)
if dxys.size:
print(f"\nA0 dxy (Welt-Abgleich, mm): n={dxys.size} "
f"mean={dxys.mean():.2f} median={np.median(dxys):.2f} "
f"max={dxys.max():.2f}")
if __name__ == "__main__":
main()