425 lines
15 KiB
Python
425 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
jRun.py – Test-Pipeline: detect → camera-pose → ArUco-CSV
|
||
|
||
Ablauf:
|
||
1. pipeline/1_detect_aruco_observations.py pro Bild → test/temp/
|
||
2. pipeline/2_estimate_camera_from_observations.py pro Bild → test/temp/
|
||
3. Positionen + Normalen per solvePnP aus Beobachtungen ableiten,
|
||
über alle Kameras mitteln, nach ID sortieren
|
||
4. test/temp/detections.csv schreiben: id, set, x, y, z, nx, ny, nz
|
||
|
||
Einheiten: x/y/z in mm (Weltframe des Roboters), nx/ny/nz dimensionslos.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import csv
|
||
import glob
|
||
import json
|
||
import os
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
from typing import Dict, List, Optional, Tuple
|
||
|
||
import cv2
|
||
import numpy as np
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Pfade
|
||
# ---------------------------------------------------------------------------
|
||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
PROJECT_ROOT = os.path.dirname(SCRIPT_DIR)
|
||
PIPELINE_DIR = os.path.join(PROJECT_ROOT, "pipeline")
|
||
DATA_DIR = os.path.join(PROJECT_ROOT, "data", "testPictures")
|
||
TEMP_DIR = os.path.join(SCRIPT_DIR, "temp")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Hilfsfunktionen – Dateien
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def find_images() -> List[str]:
|
||
return sorted(glob.glob(os.path.join(DATA_DIR, "cam*_hires_*.jpg")))
|
||
|
||
|
||
def cam_id_from_path(img_path: str) -> str:
|
||
m = re.match(r"(cam\d+)_", os.path.basename(img_path))
|
||
if not m:
|
||
raise ValueError(f"Kein Camera-ID in Dateiname: {img_path}")
|
||
return m.group(1)
|
||
|
||
|
||
def npz_path(cam_id: str) -> str:
|
||
p = os.path.join(DATA_DIR, f"calibration_{cam_id}.npz")
|
||
if not os.path.exists(p):
|
||
raise FileNotFoundError(f"Kalibrierung nicht gefunden: {p}")
|
||
return p
|
||
|
||
|
||
def find_robot_json() -> str:
|
||
candidates = glob.glob(os.path.join(DATA_DIR, "robot*.json"))
|
||
if not candidates:
|
||
raise FileNotFoundError(f"Kein robot*.json in {DATA_DIR}")
|
||
return sorted(candidates)[0]
|
||
|
||
|
||
def detection_json_path(img_path: str) -> str:
|
||
base = os.path.splitext(os.path.basename(img_path))[0]
|
||
return os.path.join(TEMP_DIR, f"{base}_aruco_detection.json")
|
||
|
||
|
||
def camera_pose_json_path(img_path: str) -> str:
|
||
base = os.path.splitext(os.path.basename(img_path))[0]
|
||
return os.path.join(TEMP_DIR, f"{base}_camera_pose.json")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Subprocess-Wrapper
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def run_step(cmd: List[str]) -> None:
|
||
print(f"\n>>> {' '.join(cmd)}")
|
||
r = subprocess.run(cmd, text=True)
|
||
if r.returncode != 0:
|
||
raise RuntimeError(f"Befehl fehlgeschlagen (exit {r.returncode})")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Marker-Klassifizierung aus robot.json
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def load_marker_set_map(robot_path: str) -> Dict[int, str]:
|
||
"""
|
||
Gibt marker_id -> Set-Label zurück.
|
||
'set'-Marker → set_name (z.B. 'Brett', 'A0')
|
||
Arm/Loose → Link-Name
|
||
"""
|
||
if PIPELINE_DIR not in sys.path:
|
||
sys.path.insert(0, PIPELINE_DIR)
|
||
|
||
# Import erst nach sys.path-Erweiterung
|
||
from marker_sets import classify_markers # noqa: PLC0415
|
||
|
||
with open(robot_path, "r", encoding="utf-8") as f:
|
||
robot_data = json.load(f)
|
||
|
||
classification = classify_markers(robot_data)
|
||
result: Dict[int, str] = {}
|
||
for mid, info in classification.items():
|
||
if info.role == "set" and info.set_name:
|
||
result[mid] = info.set_name
|
||
else:
|
||
result[mid] = info.link
|
||
return result
|
||
|
||
|
||
def load_a0_reference(robot_path: str) -> Dict[int, np.ndarray]:
|
||
"""
|
||
Welt-Referenz (x, y) in mm für die A0-Marker aus robot.json.
|
||
|
||
Nur die A0-Marker definieren (momentan) die Welt-Koordinaten, daher wird
|
||
der dxy-Abgleich ausschließlich auf sie beschränkt. Position steht im
|
||
Board-Frame = Welt-Frame (Board ist Wurzel-Link bei [0,0,0]).
|
||
"""
|
||
if PIPELINE_DIR not in sys.path:
|
||
sys.path.insert(0, PIPELINE_DIR)
|
||
|
||
from marker_sets import classify_markers # noqa: PLC0415
|
||
|
||
with open(robot_path, "r", encoding="utf-8") as f:
|
||
robot_data = json.load(f)
|
||
|
||
ref: Dict[int, np.ndarray] = {}
|
||
for mid, info in classify_markers(robot_data).items():
|
||
if info.role == "set" and info.set_name == "A0":
|
||
ref[mid] = np.array(info.position[:2], dtype=float)
|
||
return ref
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Positionen + Normalen per solvePnP
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _marker_local_corners(marker_size_m: float) -> np.ndarray:
|
||
h = marker_size_m / 2.0
|
||
return np.array([
|
||
[-h, h, 0.0],
|
||
[ h, h, 0.0],
|
||
[ h, -h, 0.0],
|
||
[-h, -h, 0.0],
|
||
], dtype=np.float32)
|
||
|
||
|
||
def estimate_marker_poses(
|
||
det_json: dict,
|
||
pose_json: dict,
|
||
) -> Dict[int, Tuple[np.ndarray, np.ndarray]]:
|
||
"""
|
||
Gibt pro erkannter Marker-ID: (pos_mm [3], normal_world [3]) zurück.
|
||
|
||
pos_mm: Marker-Mittelpunkt im Weltframe, Einheit mm
|
||
normal_world: normierter Normalenvektor (Marker-Z-Achse im Weltframe)
|
||
"""
|
||
K = np.array(det_json["camera"]["camera_matrix"],
|
||
dtype=np.float64).reshape(3, 3)
|
||
D = np.array(det_json["camera"]["distortion_coefficients"],
|
||
dtype=np.float64).reshape(-1, 1)
|
||
marker_size_m = float(
|
||
det_json.get("vision_config", {}).get("MarkerSize", 0.025)
|
||
)
|
||
|
||
w2c = pose_json["camera_pose"]["world_to_camera"]
|
||
R_wc = np.array(w2c["rotation_matrix"], dtype=np.float64).reshape(3, 3)
|
||
t_wc = np.array(w2c["translation_m"], dtype=np.float64).reshape(3)
|
||
|
||
obj_corners = _marker_local_corners(marker_size_m)
|
||
result: Dict[int, Tuple[np.ndarray, np.ndarray]] = {}
|
||
|
||
for det in det_json.get("detections", []):
|
||
mid = int(det["marker_id"])
|
||
corners_px = np.array(
|
||
det["image_points_px"], dtype=np.float32
|
||
).reshape(4, 2)
|
||
|
||
ok, rvec, tvec = cv2.solvePnP(
|
||
obj_corners, corners_px,
|
||
K.astype(np.float32), D.astype(np.float32),
|
||
flags=cv2.SOLVEPNP_IPPE_SQUARE,
|
||
)
|
||
if not ok:
|
||
ok, rvec, tvec = cv2.solvePnP(
|
||
obj_corners, corners_px,
|
||
K.astype(np.float32), D.astype(np.float32),
|
||
flags=cv2.SOLVEPNP_ITERATIVE,
|
||
)
|
||
if not ok:
|
||
continue
|
||
|
||
tvec_f = tvec.reshape(3).astype(np.float64)
|
||
R_mc, _ = cv2.Rodrigues(rvec.reshape(3, 1))
|
||
R_mc = R_mc.astype(np.float64)
|
||
|
||
# Weltframe-Position: x_world = R_wc.T @ (x_cam - t_wc)
|
||
pos_world_m = R_wc.T @ (tvec_f - t_wc)
|
||
|
||
# Marker-Normale (Z-Achse) im Weltframe
|
||
normal_cam = R_mc[:, 2]
|
||
normal_world = R_wc.T @ normal_cam
|
||
nlen = np.linalg.norm(normal_world)
|
||
if nlen > 1e-9:
|
||
normal_world /= nlen
|
||
|
||
result[mid] = (pos_world_m * 1000.0, normal_world)
|
||
|
||
return result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
os.makedirs(TEMP_DIR, exist_ok=True)
|
||
|
||
images = find_images()
|
||
if not images:
|
||
print(f"[FEHLER] Keine cam*_hires_*.jpg in {DATA_DIR}")
|
||
sys.exit(1)
|
||
|
||
robot_path = find_robot_json()
|
||
print(f"Robot: {os.path.basename(robot_path)}")
|
||
print(f"Bilder: {[os.path.basename(i) for i in images]}")
|
||
print(f"Ausgabe-Ordner: {TEMP_DIR}")
|
||
|
||
# ------------------------------------------------------------------
|
||
# Schritt 1: ArUco-Erkennung
|
||
# ------------------------------------------------------------------
|
||
print("\n" + "=" * 60)
|
||
print("Schritt 1: ArUco-Erkennung")
|
||
print("=" * 60)
|
||
for img in images:
|
||
cam_id = cam_id_from_path(img)
|
||
run_step([
|
||
sys.executable,
|
||
os.path.join(PIPELINE_DIR, "1_detect_aruco_observations.py"),
|
||
"-i", img,
|
||
"-npz", npz_path(cam_id),
|
||
"-robot", robot_path,
|
||
"-cameraId", cam_id,
|
||
"-outDir", TEMP_DIR,
|
||
"--saveDebugImage",
|
||
])
|
||
|
||
# ------------------------------------------------------------------
|
||
# Schritt 2: Kamera-Positionen
|
||
# ------------------------------------------------------------------
|
||
print("\n" + "=" * 60)
|
||
print("Schritt 2: Kamera-Poses")
|
||
print("=" * 60)
|
||
for img in images:
|
||
run_step([
|
||
sys.executable,
|
||
os.path.join(PIPELINE_DIR, "2_estimate_camera_from_observations.py"),
|
||
"-i", detection_json_path(img),
|
||
"-robot", robot_path,
|
||
"-outDir", TEMP_DIR,
|
||
])
|
||
|
||
# ------------------------------------------------------------------
|
||
# Schritt 3: Beobachtungen zusammenführen
|
||
# ------------------------------------------------------------------
|
||
print("\n" + "=" * 60)
|
||
print("Schritt 3: Positionen + Normalen")
|
||
print("=" * 60)
|
||
|
||
marker_set_map = load_marker_set_map(robot_path)
|
||
a0_ref = load_a0_reference(robot_path)
|
||
|
||
# mid -> Liste von (pos_mm, normal)
|
||
observations: Dict[int, List[Tuple[np.ndarray, np.ndarray]]] = {}
|
||
|
||
# cam_id -> Welt-Position (mm) / QA-Metadaten
|
||
camera_positions: Dict[str, np.ndarray] = {}
|
||
camera_meta: Dict[str, dict] = {}
|
||
|
||
for img in images:
|
||
det_path = detection_json_path(img)
|
||
pose_path = camera_pose_json_path(img)
|
||
|
||
if not os.path.exists(det_path):
|
||
print(f"[WARN] Fehlend: {det_path}")
|
||
continue
|
||
if not os.path.exists(pose_path):
|
||
print(f"[WARN] Fehlend: {pose_path}")
|
||
continue
|
||
|
||
with open(det_path, "r", encoding="utf-8") as f:
|
||
det_json = json.load(f)
|
||
with open(pose_path, "r", encoding="utf-8") as f:
|
||
pose_json = json.load(f)
|
||
|
||
cam_id = cam_id_from_path(img)
|
||
ciw = pose_json["camera_pose"]["camera_in_world"]["position_mm"]
|
||
camera_positions[cam_id] = np.array(ciw, dtype=float)
|
||
est = pose_json.get("estimation", {})
|
||
camera_meta[cam_id] = {
|
||
"rms_px": est.get("residual_rms_px"),
|
||
"num_markers": est.get("num_used_markers"),
|
||
}
|
||
|
||
cam_name = os.path.basename(img)
|
||
poses = estimate_marker_poses(det_json, pose_json)
|
||
print(f" {cam_name}: {len(poses)} Marker mit Pose")
|
||
|
||
for mid, pn in poses.items():
|
||
observations.setdefault(mid, []).append(pn)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Mittelwert über alle Kameras + CSV schreiben
|
||
# ------------------------------------------------------------------
|
||
rows = []
|
||
for mid in sorted(observations.keys()):
|
||
obs_list = observations[mid]
|
||
positions = np.array([p for p, _ in obs_list])
|
||
normals = np.array([n for _, n in obs_list])
|
||
|
||
pos_mean = positions.mean(axis=0)
|
||
|
||
n_sum = normals.sum(axis=0)
|
||
n_norm = np.linalg.norm(n_sum)
|
||
normal_mean = n_sum / n_norm if n_norm > 1e-9 else np.array([0.0, 0.0, 1.0])
|
||
|
||
set_label = marker_set_map.get(mid, "?")
|
||
|
||
# dxy: planarer Abgleich gegen robot.json — nur für A0 definiert
|
||
if mid in a0_ref:
|
||
d = pos_mean[:2] - a0_ref[mid]
|
||
dxy = round(float(np.hypot(d[0], d[1])), 2)
|
||
else:
|
||
dxy = ""
|
||
|
||
rows.append({
|
||
"id": mid,
|
||
"set": set_label,
|
||
"SeenByCount": len(obs_list),
|
||
"x": round(float(pos_mean[0]), 2),
|
||
"y": round(float(pos_mean[1]), 2),
|
||
"z": round(float(pos_mean[2]), 2),
|
||
"nx": round(float(normal_mean[0]), 4),
|
||
"ny": round(float(normal_mean[1]), 4),
|
||
"nz": round(float(normal_mean[2]), 4),
|
||
"dxy": dxy,
|
||
})
|
||
|
||
# Kamera-Positionen als eigene Zeilen (oben in der CSV)
|
||
camera_rows = []
|
||
for cam_id in sorted(camera_positions.keys()):
|
||
pos = camera_positions[cam_id]
|
||
camera_rows.append({
|
||
"id": cam_id,
|
||
"set": "CAMERA",
|
||
"SeenByCount": "",
|
||
"x": round(float(pos[0]), 2),
|
||
"y": round(float(pos[1]), 2),
|
||
"z": round(float(pos[2]), 2),
|
||
"nx": "",
|
||
"ny": "",
|
||
"nz": "",
|
||
"dxy": "",
|
||
})
|
||
|
||
csv_path = os.path.join(TEMP_DIR, "detections.csv")
|
||
fieldnames = ["id", "set", "SeenByCount", "x", "y", "z", "nx", "ny", "nz", "dxy"]
|
||
|
||
with open(csv_path, "w", newline="", encoding="utf-8") as f:
|
||
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
||
writer.writeheader()
|
||
writer.writerows(camera_rows + rows)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Konsolenübersicht
|
||
# ------------------------------------------------------------------
|
||
print(f"\nGeschrieben: {csv_path} "
|
||
f"({len(rows)} Marker + {len(camera_rows)} Kameras)\n")
|
||
|
||
# Kamera-Positionen + Reprojektions-RMS (schlechte Kamera erkennen)
|
||
print("Kamera-Positionen (Welt, mm):")
|
||
chdr = f"{'cam':>5} {'x':>9} {'y':>9} {'z':>9} {'rms_px':>7} {'#mk':>4}"
|
||
print(chdr)
|
||
print("-" * len(chdr))
|
||
for cam_id in sorted(camera_positions.keys()):
|
||
pos = camera_positions[cam_id]
|
||
meta = camera_meta.get(cam_id, {})
|
||
rms = meta.get("rms_px")
|
||
nmk = meta.get("num_markers")
|
||
rms_s = f"{rms:7.2f}" if rms is not None else " n/a"
|
||
print(f"{cam_id:>5} {pos[0]:>9.1f} {pos[1]:>9.1f} {pos[2]:>9.1f} "
|
||
f"{rms_s} {str(nmk):>4}")
|
||
|
||
# Marker-Tabelle
|
||
print()
|
||
hdr = (f"{'id':>5} {'set':<12} {'cams':>4} {'x':>8} {'y':>8} {'z':>8} "
|
||
f"{'nx':>7} {'ny':>7} {'nz':>7} {'dxy':>7}")
|
||
print(hdr)
|
||
print("-" * len(hdr))
|
||
for row in rows:
|
||
dxy_s = f"{row['dxy']:7.2f}" if row['dxy'] != "" else " -"
|
||
print(
|
||
f"{row['id']:>5} {row['set']:<12} {row['SeenByCount']:>4} "
|
||
f"{row['x']:>8.1f} {row['y']:>8.1f} {row['z']:>8.1f} "
|
||
f"{row['nx']:>7.4f} {row['ny']:>7.4f} {row['nz']:>7.4f} {dxy_s}"
|
||
)
|
||
|
||
# A0-Abgleich-Statistik (planarer Fehler gegen robot.json)
|
||
dxys = np.array([row["dxy"] for row in rows if row["dxy"] != ""], dtype=float)
|
||
if dxys.size:
|
||
print(f"\nA0 dxy (Welt-Abgleich, mm): n={dxys.size} "
|
||
f"mean={dxys.mean():.2f} median={np.median(dxys):.2f} "
|
||
f"max={dxys.max():.2f}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|