Scene Roadmap

This commit is contained in:
chk
2026-06-10 07:13:19 +02:00
parent f7358cea8d
commit 773e32c51c
10 changed files with 4959 additions and 68 deletions

261
pipeline/marker_sets.py Normal file
View File

@@ -0,0 +1,261 @@
#!/usr/bin/env python3
"""
marker_sets.py
==============
Klassifiziert die Marker aus robot.json in drei Rollen für die Szenen-Kalibrierung
(siehe doc/callibrate_scene_roadmap.md, Phase P0):
arm : Marker an BEWEGLICHEN Roboter-Links (Arm1, Ellbow, Arm2, Finger, ...).
Ihre Lage je Link ist bekannt -> Welt-Referenz, wird NICHT kalibriert.
set : Marker an einem STATISCHEN Link (Welt-Wurzel) MIT "set"-Feld. Gleiches
"set" = ein starres Objekt mit fixen relativen Bezügen -> es wird nur die
6-DoF-Platzierung des ganzen Sets kalibriert.
loose : Marker an einem statischen Link OHNE "set"-Feld. Lose, einzeln zu vermessen.
Die Set-Zugehörigkeit steht ausschließlich in robot.json (optionales Feld "set" am
Marker). Hier wird nichts hartkodiert — die Sets ergeben sich per Auto-Discovery aus
den vorhandenen "set"-Werten.
"Beweglich" = es liegt irgendwo auf der Kette bis zur Wurzel ein revolute/linear-Joint.
Damit zählt die Wurzel (Board) und jeder rein über "fixed"-Joints angebundene Link als
statisch (Welt), alles ab dem ersten Gelenk als Arm.
Public API
----------
data = load_robot("data/robot/robot.json")
cls = classify_markers(data) # id -> MarkerInfo
sets = get_sets(data) # set_name -> [MarkerInfo, ...]
loose = get_loose_markers(data) # [MarkerInfo, ...]
arm = get_arm_markers(data) # id -> MarkerInfo
rep = set_summary(data) # serialisierbarer Report (für QA / --json)
CLI
---
python pipeline/marker_sets.py -robot data/robot/robot.json
python pipeline/marker_sets.py -robot data/robot/robot.json --json
"""
from __future__ import annotations
import argparse
import json
import sys
from dataclasses import dataclass, asdict
from typing import Any, Dict, List, Optional
# ──────────────────────────────────────────────────────────────
# Datentyp
# ──────────────────────────────────────────────────────────────
@dataclass
class MarkerInfo:
id: int
link: str
role: str # "arm" | "set" | "loose"
set_name: Optional[str] # nur bei role == "set"
position: List[float] # im Link-Frame (mm), wie in robot.json
normal: Optional[List[float]]
spin: Optional[float]
# ──────────────────────────────────────────────────────────────
# Laden
# ──────────────────────────────────────────────────────────────
def load_robot(path: str) -> Dict[str, Any]:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
# ──────────────────────────────────────────────────────────────
# Link-Topologie: statisch (Welt) vs. beweglich (Arm)
# ──────────────────────────────────────────────────────────────
_MOVABLE_JOINTS = ("revolute", "linear")
def link_is_movable(links: Dict[str, Any], name: str) -> bool:
"""
True, wenn auf der Kette von `name` bis zur Wurzel mindestens ein
revolute/linear-Joint liegt (Marker dort gehören zum beweglichen Roboter).
"""
seen = set()
cur: Optional[str] = name
while cur and cur in links and cur not in seen:
seen.add(cur)
joint = links[cur].get("jointToParent") or {}
if str(joint.get("type", "")).lower() in _MOVABLE_JOINTS:
return True
cur = links[cur].get("parent")
return False
def root_links(links: Dict[str, Any]) -> List[str]:
return [n for n, l in links.items()
if not l.get("parent") or l.get("parent") not in links]
# ──────────────────────────────────────────────────────────────
# Klassifizierung
# ──────────────────────────────────────────────────────────────
def _set_name_of(marker: Dict[str, Any]) -> Optional[str]:
val = marker.get("set")
if val is None:
return None
s = str(val).strip()
return s or None
def classify_markers(robot_data: Dict[str, Any]) -> Dict[int, MarkerInfo]:
"""
Liefert id -> MarkerInfo über alle Links. Bei doppelten IDs gewinnt das erste
Vorkommen (zusätzlich als Warnung in set_summary sichtbar).
"""
links = robot_data.get("links", {}) or {}
out: Dict[int, MarkerInfo] = {}
for link_name, link in links.items():
movable = link_is_movable(links, link_name)
for mk in link.get("markers", []) or []:
if "id" not in mk or "position" not in mk:
continue
mid = int(mk["id"])
if mid in out:
continue # erstes Vorkommen behalten
set_name = _set_name_of(mk)
if movable:
role = "arm"
set_name = None
elif set_name is not None:
role = "set"
else:
role = "loose"
normal = mk.get("normal")
spin = mk.get("spin")
out[mid] = MarkerInfo(
id=mid,
link=link_name,
role=role,
set_name=set_name,
position=[float(v) for v in mk["position"]],
normal=[float(v) for v in normal] if normal is not None else None,
spin=float(spin) if spin is not None else None,
)
return out
def get_arm_markers(robot_data: Dict[str, Any]) -> Dict[int, MarkerInfo]:
return {m.id: m for m in classify_markers(robot_data).values() if m.role == "arm"}
def get_sets(robot_data: Dict[str, Any]) -> Dict[str, List[MarkerInfo]]:
"""set_name -> Liste der Marker (role == 'set'), gruppiert nach 'set'-Wert."""
sets: Dict[str, List[MarkerInfo]] = {}
for m in classify_markers(robot_data).values():
if m.role == "set" and m.set_name is not None:
sets.setdefault(m.set_name, []).append(m)
return sets
def get_loose_markers(robot_data: Dict[str, Any]) -> List[MarkerInfo]:
return [m for m in classify_markers(robot_data).values() if m.role == "loose"]
# ──────────────────────────────────────────────────────────────
# QA / Report
# ──────────────────────────────────────────────────────────────
def _duplicate_ids(robot_data: Dict[str, Any]) -> List[int]:
links = robot_data.get("links", {}) or {}
seen, dups = set(), set()
for link in links.values():
for mk in link.get("markers", []) or []:
if "id" not in mk:
continue
mid = int(mk["id"])
(dups if mid in seen else seen).add(mid)
return sorted(dups)
def set_summary(robot_data: Dict[str, Any]) -> Dict[str, Any]:
cls = classify_markers(robot_data)
sets = get_sets(robot_data)
loose = get_loose_markers(robot_data)
arm = [m for m in cls.values() if m.role == "arm"]
warnings: List[str] = []
for mid in _duplicate_ids(robot_data):
warnings.append(f"Marker-ID {mid} kommt mehrfach vor (erstes Vorkommen gewertet)")
for name, members in sets.items():
links_used = sorted({m.link for m in members})
if len(members) < 3:
warnings.append(
f"Set '{name}' hat nur {len(members)} Marker — 6-DoF-Platzierung "
f"nicht voll bestimmbar (>=3 nicht-kollineare nötig)")
if len(links_used) > 1:
warnings.append(f"Set '{name}' verteilt sich über mehrere Links {links_used} "
f"— ein Set sollte ein physisches Objekt sein")
return {
"counts": {
"total": len(cls),
"arm": len(arm),
"set": sum(len(v) for v in sets.values()),
"loose": len(loose),
"num_sets": len(sets),
},
"root_links": root_links(robot_data.get("links", {}) or {}),
"sets": {name: sorted(m.id for m in members) for name, members in sorted(sets.items())},
"loose_ids": sorted(m.id for m in loose),
"arm_ids": sorted(m.id for m in arm),
"warnings": warnings,
}
# ──────────────────────────────────────────────────────────────
# CLI
# ──────────────────────────────────────────────────────────────
def main() -> None:
ap = argparse.ArgumentParser(description="Marker aus robot.json in arm/set/loose klassifizieren")
ap.add_argument("-robot", "--robot", required=True, help="Pfad zu robot.json")
ap.add_argument("--json", action="store_true", help="Report als JSON ausgeben")
args = ap.parse_args()
data = load_robot(args.robot)
rep = set_summary(data)
if args.json:
print(json.dumps(rep, indent=2, ensure_ascii=False))
return
c = rep["counts"]
print(f"robot.json: {args.robot}")
print(f"Wurzel-Link(s): {rep['root_links']}")
print(f"\nMarker gesamt: {c['total']} | arm: {c['arm']} set: {c['set']} "
f"loose: {c['loose']} (Sets: {c['num_sets']})")
print("\nSets (starr, fixe interne Lage -> 6-DoF kalibrieren):")
if rep["sets"]:
for name, ids in rep["sets"].items():
print(f" {name:10s} ({len(ids):3d}): {ids}")
else:
print(" (keine)")
print(f"\nLose Marker (einzeln zu vermessen): {rep['loose_ids'] or '(keine)'}")
print(f"Arm-Marker (Welt-Referenz, nicht kalibriert): {rep['arm_ids']}")
if rep["warnings"]:
print("\n[WARN]")
for w in rep["warnings"]:
print(f" - {w}")
if __name__ == "__main__":
try:
main()
except Exception as exc: # pragma: no cover
print(f"[ERROR] {exc}", file=sys.stderr)
sys.exit(1)

View File

@@ -0,0 +1,424 @@
#!/usr/bin/env python3
"""
scene_reconstruct.py — Phase A der Szenen-Kalibrierung
========================================================
Ankerlose, metrische Rekonstruktion ALLER Kamera- und Marker-Posen aus den
ArUco-Eckbeobachtungen mehrerer Kameras — OHNE bekannten Welt-Anker, allein aus
der bekannten Marker-Kantenlänge.
Vollständig generisch: kein robot.json nötig, keine festen Marker-IDs/Namen.
Warum nicht einfach Einzel-Marker-PnP?
--------------------------------------
Ein planares Quadrat hat bei PnP eine 2-fache Flip-Ambiguität. Verkettet man solche
Einzelposen naiv, sind ~1/3 der Knoten gespiegelt -> unbrauchbar. Stattdessen:
1) je (Kamera, Marker): PnP (IPPE_SQUARE) als TENTATIVE Startpose.
2) Referenzkamera c0 (meiste Beobachtungen) definiert den Szenen-Frame S (E_{c0}=I).
3) Iteration (flip-robust):
a) Kameras per cv2.solvePnPRansac gegen die bereits platzierten 3D-Ecken
neu bestimmen -> geflippte Marker fallen als RANSAC-Ausreißer raus.
b) Marker-Ecken über alle platzierten Kameras TRIANGULIEREN (DLT, flip-frei)
und die Marker-Pose per Kabsch aus den triangulierten Ecken neu setzen.
4) globale Bündelausgleichung (least_squares, Huber, dünnbesetzte Jacobi) über
die Reprojektion ALLER Ecken. Gauge: c0 = Identität; Maßstab via Markergröße.
Geometrie-Konventionen
----------------------
- E_c = world(S) -> camera c (X_cam = E_c X_S)
- G_m = marker m local -> world(S) (X_S = G_m X_local)
- M_{c<-m} = E_c @ G_m (local -> camera)
- Ecken-Reihenfolge wie ArUco/Pipeline: TL, TR, BR, BL.
Ein-/Ausgabe
------------
--evalDir : Ordner mit render_*_aruco_detection.json (Eckpunkte + Intrinsik)
--out : scene_reconstruction.json (Default: <evalDir>/scene_reconstruction.json)
Das Ergebnis (Posen in S) ist Eingang für Phase B (robot_register.py).
"""
from __future__ import annotations
import argparse
import glob
import json
import os
import re
import sys
import time
from typing import Dict, List, Optional, Tuple
import numpy as np
import cv2
try:
from scipy.optimize import least_squares
from scipy.sparse import lil_matrix
HAVE_SCIPY = True
except ImportError: # pragma: no cover
HAVE_SCIPY = False
# ──────────────────────────────────────────────────────────────
# SE(3) / Geometrie-Helfer
# ──────────────────────────────────────────────────────────────
def rt_to_T(rvec, tvec) -> np.ndarray:
R, _ = cv2.Rodrigues(np.asarray(rvec, dtype=float).reshape(3, 1))
T = np.eye(4)
T[:3, :3] = R
T[:3, 3] = np.asarray(tvec, dtype=float).reshape(3)
return T
def T_to_rt(T: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
rvec, _ = cv2.Rodrigues(np.ascontiguousarray(T[:3, :3]))
return rvec.reshape(3), T[:3, 3].copy()
def inv_T(T: np.ndarray) -> np.ndarray:
R, t = T[:3, :3], T[:3, 3]
Ti = np.eye(4)
Ti[:3, :3] = R.T
Ti[:3, 3] = -R.T @ t
return Ti
def local_corners(size_m: float) -> np.ndarray:
"""Marker-Ecken im lokalen Frame (TL, TR, BR, BL), z=0."""
h = size_m / 2.0
return np.array([[-h, h, 0.0], [h, h, 0.0], [h, -h, 0.0], [-h, -h, 0.0]], dtype=float)
def kabsch(P: np.ndarray, Q: np.ndarray) -> np.ndarray:
"""R,t mit Q ≈ R P + t (ohne Skalierung); P,Q: Nx3. Liefert 4x4."""
pc, qc = P.mean(0), Q.mean(0)
H = (P - pc).T @ (Q - qc)
U, _, Vt = np.linalg.svd(H)
d = np.sign(np.linalg.det(Vt.T @ U.T))
R = Vt.T @ np.diag([1.0, 1.0, d]) @ U.T
T = np.eye(4)
T[:3, :3] = R
T[:3, 3] = qc - R @ pc
return T
def triangulate_point(obs: List[Tuple]) -> Optional[np.ndarray]:
"""Mehrbild-DLT eines 3D-Punkts. obs: [(K,D,R,t,uv), ...] mit X_cam=R X+t."""
A = []
for K, D, R, t, uv in obs:
und = cv2.undistortPoints(np.array([[uv]], dtype=np.float64), K, D).reshape(2)
x, y = float(und[0]), float(und[1])
P = np.hstack([R, t.reshape(3, 1)])
A.append(x * P[2] - P[0])
A.append(y * P[2] - P[1])
_, _, Vt = np.linalg.svd(np.asarray(A, dtype=float))
X = Vt[-1]
return X[:3] / X[3] if abs(X[3]) > 1e-12 else None
# ──────────────────────────────────────────────────────────────
# Laden der Detektionen
# ──────────────────────────────────────────────────────────────
class Cam:
__slots__ = ("id", "K", "D", "obs")
def __init__(self, cid, K, D):
self.id = cid
self.K = K
self.D = D
self.obs: Dict[int, np.ndarray] = {} # marker_id -> (4,2) px
def load_detections(eval_dir: str, cameras: Optional[List[str]] = None,
default_size: Optional[float] = None
) -> Tuple[Dict[str, Cam], Dict[int, float]]:
cams: Dict[str, Cam] = {}
sizes: Dict[int, float] = {}
for det_path in sorted(glob.glob(os.path.join(eval_dir, "*_aruco_detection.json"))):
m = re.match(r"render_([A-Za-z0-9]+)_aruco_detection\.json", os.path.basename(det_path))
if not m:
continue
cid = m.group(1)
if cameras and cid not in cameras:
continue
det = json.load(open(det_path, "r", encoding="utf-8"))
K = np.array(det["camera"]["camera_matrix"], dtype=float).reshape(3, 3)
D = np.array(det["camera"]["distortion_coefficients"], dtype=float).reshape(-1, 1)
cam = Cam(cid, K, D)
det_size = (det.get("vision_config", {}) or {}).get("MarkerSize", None)
for d in det.get("detections", []):
if d.get("type", "aruco") != "aruco":
continue
pts = d.get("image_points_px")
if pts is None:
continue
mid = int(d["marker_id"])
cam.obs[mid] = np.array(pts, dtype=float).reshape(4, 2)
s = d.get("marker_size_m", det_size if det_size is not None else default_size)
if s is not None:
sizes[mid] = float(s)
if cam.obs:
cams[cid] = cam
return cams, sizes
# ──────────────────────────────────────────────────────────────
# Schritt 1: Per-Marker-PnP (tentative Startposen)
# ──────────────────────────────────────────────────────────────
def pnp_all(cams: Dict[str, Cam], sizes: Dict[int, float], default_size: float
) -> Dict[Tuple[str, int], Dict]:
out: Dict[Tuple[str, int], Dict] = {}
for cid, cam in cams.items():
for mid, corners_px in cam.obs.items():
size = sizes.get(mid, default_size)
ok, rvec, tvec = cv2.solvePnP(local_corners(size), corners_px, cam.K, cam.D,
flags=cv2.SOLVEPNP_IPPE_SQUARE)
if not ok:
ok, rvec, tvec = cv2.solvePnP(local_corners(size), corners_px, cam.K, cam.D,
flags=cv2.SOLVEPNP_ITERATIVE)
if ok:
out[(cid, mid)] = {"M": rt_to_T(rvec, tvec)}
return out
# ──────────────────────────────────────────────────────────────
# Schritt 2+3: flip-robuste Initialisierung
# ──────────────────────────────────────────────────────────────
def initialize_poses(cams, sizes, default_size, pnp, n_iter=6, min_pts_pnp=8,
ransac_px=3.0) -> Tuple[Dict[str, np.ndarray], Dict[int, np.ndarray], str]:
cam_ids = sorted(cams)
all_mk = sorted({m for _, m in pnp})
# Referenzkamera = meiste Beobachtungen
ref = max(cam_ids, key=lambda c: len(cams[c].obs))
E: Dict[str, np.ndarray] = {ref: np.eye(4)}
G: Dict[int, np.ndarray] = {}
for m in cams[ref].obs: # tentative Startmarker aus c0
if (ref, m) in pnp:
G[m] = pnp[(ref, m)]["M"].copy()
def corners3D(m):
return (G[m][:3, :3] @ local_corners(sizes.get(m, default_size)).T).T + G[m][:3, 3]
for _ in range(n_iter):
# (a) Kameras gegen platzierte 3D-Ecken (RANSAC -> Flip-Ausreißer raus)
for c in cam_ids:
if c == ref:
continue
obj, img = [], []
for m in cams[c].obs:
if m in G:
obj.append(corners3D(m))
img.append(cams[c].obs[m])
if sum(len(o) for o in obj) < min_pts_pnp:
continue
O = np.vstack(obj).astype(np.float64)
I = np.vstack(img).astype(np.float64)
ok, rvec, tvec, inl = cv2.solvePnPRansac(
O, I, cams[c].K, cams[c].D, reprojectionError=ransac_px,
iterationsCount=200, flags=cv2.SOLVEPNP_ITERATIVE)
if ok and inl is not None and len(inl) >= 6:
E[c] = rt_to_T(rvec, tvec)
# (b) Marker triangulieren (flip-frei) bzw. einfügen
for m in all_mk:
placed = [c for c in cam_ids if c in E and (c, m) in pnp]
if len(placed) >= 2:
tri, ok = [], True
for ci in range(4):
o = [(cams[c].K, cams[c].D, E[c][:3, :3], E[c][:3, 3], cams[c].obs[m][ci])
for c in placed]
X = triangulate_point(o)
if X is None:
ok = False
break
tri.append(X)
if ok:
G[m] = kabsch(local_corners(sizes.get(m, default_size)), np.array(tri))
elif placed and m not in G:
c = placed[0]
G[m] = inv_T(E[c]) @ pnp[(c, m)]["M"]
return E, G, ref
# ──────────────────────────────────────────────────────────────
# Schritt 4: Globale Bündelausgleichung (Referenzkamera fix)
# ──────────────────────────────────────────────────────────────
def reproj_stats(obs_pairs, cams, sizes, default_size, E, G) -> Tuple[float, float]:
"""(RMS, Median) der Per-Beobachtungs-Reprojektionsfehler (px)."""
per = []
for (c, m) in obs_pairs:
rvec, tvec = T_to_rt(E[c] @ G[m])
proj, _ = cv2.projectPoints(local_corners(sizes.get(m, default_size)),
rvec, tvec, cams[c].K, cams[c].D)
d = proj.reshape(4, 2) - cams[c].obs[m]
per.append(float(np.sqrt(np.mean(d * d))))
if not per:
return 0.0, 0.0
a = np.asarray(per)
return float(np.sqrt(np.mean(a * a))), float(np.median(a))
def bundle_adjust(pnp, cams, sizes, default_size, E, G, ref,
huber_px=2.0, max_nfev=120, verbose=0):
if not HAVE_SCIPY:
print("[WARN] scipy fehlt -> BA uebersprungen")
return E, G
cam_opt = [c for c in sorted(E) if c != ref]
mk_ids = sorted(G)
ci = {c: i for i, c in enumerate(cam_opt)}
mi = {m: j for j, m in enumerate(mk_ids)}
ncam, nmk = len(cam_opt), len(mk_ids)
obs = [(c, m) for (c, m) in pnp if c in E and m in G]
def pack():
x = np.zeros(6 * ncam + 6 * nmk)
for c in cam_opt:
r, t = T_to_rt(E[c]); x[6*ci[c]:6*ci[c]+3] = r; x[6*ci[c]+3:6*ci[c]+6] = t
for m in mk_ids:
r, t = T_to_rt(G[m]); o = 6*ncam + 6*mi[m]; x[o:o+3] = r; x[o+3:o+6] = t
return x
def unpack(x):
Ee = {ref: np.eye(4)}
for c in cam_opt:
Ee[c] = rt_to_T(x[6*ci[c]:6*ci[c]+3], x[6*ci[c]+3:6*ci[c]+6])
Gg = {}
for m in mk_ids:
o = 6*ncam + 6*mi[m]
Gg[m] = rt_to_T(x[o:o+3], x[o+3:o+6])
return Ee, Gg
def residuals(x):
Ee, Gg = unpack(x)
res = np.empty(8 * len(obs))
for k, (c, m) in enumerate(obs):
rvec, tvec = T_to_rt(Ee[c] @ Gg[m])
proj, _ = cv2.projectPoints(local_corners(sizes.get(m, default_size)),
rvec, tvec, cams[c].K, cams[c].D)
res[8*k:8*k+8] = (proj.reshape(4, 2) - cams[c].obs[m]).ravel()
return res
Sp = lil_matrix((8 * len(obs), 6 * ncam + 6 * nmk), dtype=int)
for k, (c, m) in enumerate(obs):
rows = slice(8*k, 8*k+8)
if c in ci:
Sp[rows, 6*ci[c]:6*ci[c]+6] = 1
o = 6*ncam + 6*mi[m]
Sp[rows, o:o+6] = 1
sol = least_squares(residuals, pack(), jac_sparsity=Sp, method="trf",
loss="huber", f_scale=huber_px, max_nfev=max_nfev, verbose=verbose)
return unpack(sol.x)
# ──────────────────────────────────────────────────────────────
# Hauptlauf
# ──────────────────────────────────────────────────────────────
def reconstruct(eval_dir, cameras=None, default_size=0.025,
huber_px=2.0, max_nfev=120, verbose=0) -> Dict:
cams, sizes = load_detections(eval_dir, cameras, default_size)
if len(cams) < 2:
raise RuntimeError(f"brauche >=2 Kameras, gefunden: {sorted(cams)}")
pnp = pnp_all(cams, sizes, default_size)
E0, G0, ref = initialize_poses(cams, sizes, default_size, pnp)
# Marker je #platzierter Kameras; nur >=2 sind triangulierbar/zuverlässig.
ncams_of = {}
for (c, m) in pnp:
if c in E0 and m in G0:
ncams_of[m] = ncams_of.get(m, 0) + 1
weak = sorted(m for m in G0 if ncams_of.get(m, 0) < 2) # Einzelbild -> unbeobachtbar
G0 = {m: T for m, T in G0.items() if ncams_of.get(m, 0) >= 2}
pairs0 = [(c, m) for (c, m) in pnp if c in E0 and m in G0]
rms0, med0 = reproj_stats(pairs0, cams, sizes, default_size, E0, G0)
E, G = bundle_adjust(pnp, cams, sizes, default_size, E0, G0, ref, huber_px, max_nfev, verbose)
pairs = [(c, m) for (c, m) in pnp if c in E and m in G]
rms1, med1 = reproj_stats(pairs, cams, sizes, default_size, E, G)
dropped = [c for c in cams if c not in E]
cameras_out = []
for c in sorted(E):
Ec = E[c]
rvec, _ = T_to_rt(Ec)
cameras_out.append({
"camera_id": c,
"world_to_camera": {"rotation_matrix": Ec[:3, :3].tolist(),
"translation_m": Ec[:3, 3].tolist(), "rvec_rad": rvec.tolist()},
"center_m": (-Ec[:3, :3].T @ Ec[:3, 3]).tolist(),
"num_markers": len(cams[c].obs),
"is_reference": bool(c == ref),
})
markers_out = []
for m in sorted(G):
Gm = G[m]
size = sizes.get(m, default_size)
corners_S = (Gm[:3, :3] @ local_corners(size).T).T + Gm[:3, 3]
rvec, _ = T_to_rt(Gm)
markers_out.append({
"marker_id": int(m),
"pose_in_S": {"rotation_matrix": Gm[:3, :3].tolist(),
"translation_m": Gm[:3, 3].tolist(), "rvec_rad": rvec.tolist()},
"center_m": Gm[:3, 3].tolist(),
"normal": (Gm[:3, :3] @ np.array([0.0, 0.0, 1.0])).tolist(),
"corners_m": corners_S.tolist(),
"num_cameras": int(ncams_of.get(m, 0)),
})
return {
"schema_version": "1.0",
"stage": "scene_reconstruct_A",
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"frame": "arbitrary S (reference camera pose = identity)",
"reference_camera": ref,
"summary": {"num_cameras": len(E), "num_markers": len(G), "num_observations": len(pairs),
"reproj_rms_px_init": rms0, "reproj_median_px_init": med0,
"reproj_rms_px_final": rms1, "reproj_median_px_final": med1,
"dropped_cameras": dropped,
"insufficient_view_markers": weak},
"cameras": cameras_out,
"markers": markers_out,
}
def main() -> None:
ap = argparse.ArgumentParser(description="Phase A: ankerlose metrische Szenen-Rekonstruktion")
ap.add_argument("--evalDir", required=True, help="Ordner mit render_*_aruco_detection.json")
ap.add_argument("--cameras", default=None, help="Kommagetrennte Kamera-IDs (Default: alle)")
ap.add_argument("--markerSize", type=float, default=0.025, help="Fallback-Kantenlänge (m)")
ap.add_argument("--huberPx", type=float, default=2.0)
ap.add_argument("--maxNfev", type=int, default=120)
ap.add_argument("--out", default=None)
ap.add_argument("-v", "--verbose", action="count", default=0)
args = ap.parse_args()
cams = args.cameras.split(",") if args.cameras else None
res = reconstruct(args.evalDir, cams, args.markerSize, args.huberPx, args.maxNfev, args.verbose)
out = args.out or os.path.join(args.evalDir, "scene_reconstruction.json")
json.dump(res, open(out, "w", encoding="utf-8"), indent=2)
s = res["summary"]
print(f"[INFO] Kameras={s['num_cameras']} Marker(>=2 Views)={s['num_markers']} "
f"Obs={s['num_observations']} | RMS {s['reproj_rms_px_init']:.3f}->{s['reproj_rms_px_final']:.3f}px "
f"(median {s['reproj_median_px_final']:.3f}px) | ref-Kamera={res['reference_camera']}")
if s["dropped_cameras"]:
print(f"[WARN] nicht verbundene Kameras verworfen: {s['dropped_cameras']}")
if s["insufficient_view_markers"]:
print(f"[INFO] {len(s['insufficient_view_markers'])} Marker mit <2 Views (unbeobachtbar, "
f"nicht rekonstruiert): {s['insufficient_view_markers']}")
print(f"[INFO] -> {out}")
if __name__ == "__main__":
try:
main()
except Exception as exc:
print(f"[ERROR] {exc}", file=sys.stderr)
sys.exit(1)