Files
appRobotVideoControls/programs/3_fuse_markers_world____.py
chk 5a7176920a Phase 1 abgeschlossen: Positionen werden erkannt.
Positionen aus den Merkern heraus erkennbar. Viele Bilder gleichzeitig verarbeitbar.
2026-05-25 22:16:11 +02:00

1136 lines
38 KiB
Python

#!/usr/bin/env python3
"""
3_fuse_markers_world.py
Phase 1:
- Liest 2 bis 5 JSON-Dateien ein, die jeweils eine Kamera-Pose und ArUco-Detections enthalten.
- Rekonstruiert für jede Marker-Beobachtung eine Marker-Pose in Weltkoordinaten.
- Fusiert alle Beobachtungen pro Marker gewichtet.
- Gibt eine CSV-Liste mit Kameras und Markern aus.
- Optional wird robots.json nur für Metadaten verwendet (nicht für die Schätzung).
Typische Eingabe-Dateien:
snapshot_video1_1779690911822_aruco_detection.camera_pose.json
snapshot_video2_1779690911822_aruco_detection.camera_pose.json
...
python 3_fuse_markers_world.py \
--json snapshot_video1_1779690911822_aruco_detection.camera_pose.json \
--json snapshot_video2_1779690911822_aruco_detection.camera_pose.json \
--json snapshot_video3_1779690911822_aruco_detection.camera_pose.json \
--robots robots.json
Benötigt:
pip install numpy opencv-python
Hinweis:
- Das Skript nutzt die Kameraposen als fest.
- Die Markerkoordinaten werden pro Detektion aus SolvePnP bestimmt.
- Danach werden alle Beobachtungen eines Markers robust gemittelt.
"""
from __future__ import annotations
import argparse
import csv
import json
import math
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
import cv2
import numpy as np
# ============================================================
# Datenklassen
# ============================================================
@dataclass
class CameraPose:
camera_id: str
source_file: str
position_w: np.ndarray # (3,)
rotation_w_from_c: np.ndarray # (3,3)
orientation_rpy_deg: Tuple[float, float, float]
quality: Dict[str, Any] = field(default_factory=dict)
@dataclass
class MarkerObservation:
marker_id: int
camera_id: str
source_file: str
image_points_px: np.ndarray # (4,2)
marker_size_m: float
confidence: float
quality_score: float
camera_weight: float
observation_weight: float
rvec_cm: np.ndarray # (3,)
tvec_cm: np.ndarray # (3,)
position_w: np.ndarray # (3,)
rotation_w_from_m: np.ndarray # (3,3)
reprojection_rms_px: float
reprojection_max_px: float
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class FusedMarkerEstimate:
marker_id: int
position_w: np.ndarray
rotation_w_from_m: np.ndarray
orientation_rpy_deg: Tuple[float, float, float]
observations: List[MarkerObservation]
num_cameras: int
camera_ids: List[str]
weight_sum: float
position_std_m: float
angular_std_deg: float
confidence_mean: float
confidence_min: float
confidence_max: float
quality_mean: float
quality_min: float
quality_max: float
reproj_rms_mean_px: float
reproj_rms_max_px: float
reliability_0_1: float
metadata: Dict[str, Any] = field(default_factory=dict)
# ============================================================
# Basisfunktionen: Rotationen / Euler / Quaternions
# ============================================================
def clamp(value: float, lo: float = 0.0, hi: float = 1.0) -> float:
return float(max(lo, min(hi, value)))
def normalize_vec(v: np.ndarray, eps: float = 1e-12) -> np.ndarray:
n = float(np.linalg.norm(v))
if n < eps:
return v.astype(np.float64, copy=True)
return (v / n).astype(np.float64)
def rotation_matrix_to_euler_zyx(R: np.ndarray) -> Tuple[float, float, float]:
"""Returns roll, pitch, yaw in degrees.
Convention:
R = Rz(yaw) @ Ry(pitch) @ Rx(roll)
"""
yaw = math.degrees(math.atan2(R[1, 0], R[0, 0]))
sp = math.sqrt(R[2, 1] ** 2 + R[2, 2] ** 2)
pitch = math.degrees(math.atan2(-R[2, 0], sp))
roll = math.degrees(math.atan2(R[2, 1], R[2, 2]))
return float(roll), float(pitch), float(yaw)
def euler_zyx_to_rotation_matrix(roll_deg: float, pitch_deg: float, yaw_deg: float) -> np.ndarray:
roll = math.radians(roll_deg)
pitch = math.radians(pitch_deg)
yaw = math.radians(yaw_deg)
cr, sr = math.cos(roll), math.sin(roll)
cp, sp = math.cos(pitch), math.sin(pitch)
cy, sy = math.cos(yaw), math.sin(yaw)
Rx = np.array([[1, 0, 0], [0, cr, -sr], [0, sr, cr]], dtype=np.float64)
Ry = np.array([[cp, 0, sp], [0, 1, 0], [-sp, 0, cp]], dtype=np.float64)
Rz = np.array([[cy, -sy, 0], [sy, cy, 0], [0, 0, 1]], dtype=np.float64)
return (Rz @ Ry @ Rx).astype(np.float64)
def rotation_matrix_to_quaternion(R: np.ndarray) -> np.ndarray:
"""Returns quaternion as [w, x, y, z]."""
m = R.astype(np.float64)
t = np.trace(m)
if t > 0.0:
s = math.sqrt(t + 1.0) * 2.0
w = 0.25 * s
x = (m[2, 1] - m[1, 2]) / s
y = (m[0, 2] - m[2, 0]) / s
z = (m[1, 0] - m[0, 1]) / s
elif m[0, 0] > m[1, 1] and m[0, 0] > m[2, 2]:
s = math.sqrt(1.0 + m[0, 0] - m[1, 1] - m[2, 2]) * 2.0
w = (m[2, 1] - m[1, 2]) / s
x = 0.25 * s
y = (m[0, 1] + m[1, 0]) / s
z = (m[0, 2] + m[2, 0]) / s
elif m[1, 1] > m[2, 2]:
s = math.sqrt(1.0 + m[1, 1] - m[0, 0] - m[2, 2]) * 2.0
w = (m[0, 2] - m[2, 0]) / s
x = (m[0, 1] + m[1, 0]) / s
y = 0.25 * s
z = (m[1, 2] + m[2, 1]) / s
else:
s = math.sqrt(1.0 + m[2, 2] - m[0, 0] - m[1, 1]) * 2.0
w = (m[1, 0] - m[0, 1]) / s
x = (m[0, 2] + m[2, 0]) / s
y = (m[1, 2] + m[2, 1]) / s
z = 0.25 * s
q = np.array([w, x, y, z], dtype=np.float64)
return normalize_vec(q)
def quaternion_to_rotation_matrix(q: np.ndarray) -> np.ndarray:
q = normalize_vec(q)
w, x, y, z = q
R = np.array([
[1 - 2 * (y * y + z * z), 2 * (x * y - z * w), 2 * (x * z + y * w)],
[2 * (x * y + z * w), 1 - 2 * (x * x + z * z), 2 * (y * z - x * w)],
[2 * (x * z - y * w), 2 * (y * z + x * w), 1 - 2 * (x * x + y * y)],
], dtype=np.float64)
return R
def average_rotations_weighted(rotations: Sequence[np.ndarray], weights: Sequence[float]) -> np.ndarray:
"""Averages rotations by weighted matrix sum + projection to SO(3)."""
if not rotations:
return np.eye(3, dtype=np.float64)
M = np.zeros((3, 3), dtype=np.float64)
for R, w in zip(rotations, weights):
M += float(w) * R.astype(np.float64)
U, _, Vt = np.linalg.svd(M)
R = U @ Vt
if np.linalg.det(R) < 0:
U[:, -1] *= -1
R = U @ Vt
return R.astype(np.float64)
def angular_distance_deg(R_a: np.ndarray, R_b: np.ndarray) -> float:
"""Smallest angle between two rotations."""
R = R_a.T @ R_b
tr = float(np.trace(R))
cos_theta = clamp((tr - 1.0) / 2.0, -1.0, 1.0)
return float(math.degrees(math.acos(cos_theta)))
# ============================================================
# JSON / Metadaten
# ============================================================
def load_json(path: str) -> Dict[str, Any]:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def detect_camera_id(data: Dict[str, Any], source_file: str) -> str:
camera = data.get("camera", {})
camera_pose = data.get("camera_pose", {})
for key in ("camera_id", "id", "name"):
if isinstance(camera, dict) and camera.get(key):
return str(camera[key])
if isinstance(camera_pose, dict) and camera_pose.get(key):
return str(camera_pose[key])
return Path(source_file).stem
def extract_camera_pose(data: Dict[str, Any], source_file: str) -> CameraPose:
camera_id = detect_camera_id(data, source_file)
# Camera pose may appear in different layouts.
pose_block = data.get("camera_pose")
if pose_block is None:
pose_block = data.get("camera", {})
if not isinstance(pose_block, dict):
raise ValueError(f"Camera pose block missing or invalid in: {source_file}")
quality = pose_block.get("quality", {}) if isinstance(pose_block.get("quality", {}), dict) else {}
# Position
pos = None
if isinstance(pose_block.get("position_m"), dict):
pm = pose_block["position_m"]
pos = np.array([pm.get("x", 0.0), pm.get("y", 0.0), pm.get("z", 0.0)], dtype=np.float64)
elif isinstance(pose_block.get("position_mm"), (list, tuple)) and len(pose_block["position_mm"]) == 3:
pos = np.array(pose_block["position_mm"], dtype=np.float64) / 1000.0
elif isinstance(pose_block.get("position"), (list, tuple)) and len(pose_block["position"]) == 3:
pos = np.array(pose_block["position"], dtype=np.float64)
if pos is None:
raise ValueError(f"Camera position missing in: {source_file}")
# Rotation
if isinstance(pose_block.get("rotation_matrix_world_from_camera"), (list, tuple)):
R_wc = np.array(pose_block["rotation_matrix_world_from_camera"], dtype=np.float64)
elif isinstance(pose_block.get("rotation_matrix"), (list, tuple)):
R_wc = np.array(pose_block["rotation_matrix"], dtype=np.float64)
elif isinstance(pose_block.get("orientation_deg"), dict):
od = pose_block["orientation_deg"]
roll = float(od.get("roll", 0.0))
pitch = float(od.get("pitch", 0.0))
yaw = float(od.get("yaw", 0.0))
R_wc = euler_zyx_to_rotation_matrix(roll, pitch, yaw)
else:
R_wc = np.eye(3, dtype=np.float64)
if R_wc.shape != (3, 3):
raise ValueError(f"Invalid camera rotation matrix in: {source_file}")
rpy = rotation_matrix_to_euler_zyx(R_wc)
return CameraPose(
camera_id=camera_id,
source_file=source_file,
position_w=pos,
rotation_w_from_c=R_wc,
orientation_rpy_deg=rpy,
quality=quality,
)
def load_robot_metadata(robot_path: Optional[str]) -> Dict[int, List[Dict[str, Any]]]:
"""Optional metadata only. Duplicate IDs are kept as a list."""
if not robot_path:
return {}
data = load_json(robot_path)
marker_meta: Dict[int, List[Dict[str, Any]]] = {}
for entry in data.get("Marker", []):
try:
mid = int(entry.get("id"))
except Exception:
continue
marker_meta.setdefault(mid, []).append({
"on": entry.get("on"),
"position": entry.get("position"),
"relPos": entry.get("relPos"),
"name": entry.get("name"),
"relPosSource": entry.get("relPosSource"),
})
return marker_meta
def summarize_robot_metadata(marker_id: int, robot_meta: Dict[int, List[Dict[str, Any]]]) -> Dict[str, Any]:
entries = robot_meta.get(marker_id, [])
if not entries:
return {
"robot_known": False,
"robot_on": None,
"robot_positions": None,
"robot_relpos_count": 0,
}
on_list = []
abs_pos_list = []
rel_pos_list = []
for e in entries:
if e.get("on") is not None:
on_list.append(str(e.get("on")))
if e.get("position") is not None:
abs_pos_list.append(e.get("position"))
if e.get("relPos") is not None:
rel_pos_list.append(e.get("relPos"))
return {
"robot_known": True,
"robot_on": "|".join(dict.fromkeys(on_list)) if on_list else None,
"robot_positions": abs_pos_list if abs_pos_list else None,
"robot_relpos_count": len(rel_pos_list),
}
# ============================================================
# Detection / Gewichtung
# ============================================================
def get_camera_quality_score(camera_pose: CameraPose) -> float:
q = camera_pose.quality or {}
if "pose_quality_score" in q:
try:
return clamp(float(q["pose_quality_score"]) / 100.0)
except Exception:
pass
if "reprojection_rms_px" in q:
try:
rms = float(q["reprojection_rms_px"])
# Heuristik: 0 px -> 1.0, 5 px -> 0.5, 10 px -> ~0.33
return clamp(1.0 / (1.0 + 0.2 * rms))
except Exception:
pass
return 1.0
def get_marker_size_from_detection(det: Dict[str, Any], default_size_m: float) -> float:
try:
return float(det.get("marker_size_m", default_size_m))
except Exception:
return float(default_size_m)
def score_from_range(value: Optional[float], low: float, high: float, invert: bool = False) -> float:
if value is None:
return 1.0
if high <= low:
return 1.0
t = (float(value) - low) / (high - low)
t = clamp(t)
return 1.0 - t if invert else t
def detection_quality_score(det: Dict[str, Any]) -> float:
"""Heuristischer Score in [0,1]. Falls Felder fehlen, wird 1.0 angenommen."""
confidence = float(det.get("confidence", 1.0))
quality = det.get("quality", {}) if isinstance(det.get("quality", {}), dict) else {}
# Subscores
area_px = quality.get("area_px")
sharpness = (((quality.get("sharpness") or {}).get("laplacian_var")) if isinstance(quality.get("sharpness"), dict) else None)
contrast = (((quality.get("contrast") or {}).get("dynamic_range")) if isinstance(quality.get("contrast"), dict) else None)
dist_border = (((quality.get("geometry") or {}).get("distance_to_border_px")) if isinstance(quality.get("geometry"), dict) else None)
dist_center = (((quality.get("geometry") or {}).get("distance_to_center_norm")) if isinstance(quality.get("geometry"), dict) else None)
edge_ratio = quality.get("edge_ratio")
area_score = score_from_range(area_px, 150.0, 3000.0)
sharp_score = score_from_range(sharpness, 500.0, 5000.0)
contrast_score = score_from_range(contrast, 40.0, 180.0)
border_score = score_from_range(dist_border, 20.0, 250.0)
center_score = score_from_range(dist_center, 0.75, 0.0, invert=True) # closer to center -> higher
edge_ratio_score = 1.0
if edge_ratio is not None:
try:
edge_ratio_score = clamp(1.0 - min(abs(float(edge_ratio) - 1.0) / 1.5, 1.0))
except Exception:
edge_ratio_score = 1.0
# Weighted sum, confidence strongest
score = (
0.35 * clamp(confidence)
+ 0.15 * area_score
+ 0.15 * sharp_score
+ 0.10 * contrast_score
+ 0.10 * border_score
+ 0.10 * center_score
+ 0.05 * edge_ratio_score
)
return clamp(score)
def marker_frontality_score(rotation_m_from_cam: np.ndarray) -> float:
"""Marker normal relative to camera optical axis (+Z camera). 1.0 is frontal."""
normal_cam = rotation_m_from_cam[:, 2]
# absolute because the sign depends on marker axis convention; we only want frontality
return clamp(abs(float(normal_cam[2])))
def compute_observation_weight(camera_pose: CameraPose, det: Dict[str, Any]) -> Tuple[float, float, float]:
det_score = detection_quality_score(det)
cam_score = get_camera_quality_score(camera_pose)
# frontality will be added after pose estimation; here only base score.
base = clamp(det_score * cam_score)
return base, det_score, cam_score
# ============================================================
# Marker Pose Schätzung pro Observation
# ============================================================
def build_marker_object_points(marker_size_m: float) -> np.ndarray:
half = marker_size_m / 2.0
# OpenCV ArUco corner order: TL, TR, BR, BL
return np.array([
[-half, half, 0.0],
[ half, half, 0.0],
[ half, -half, 0.0],
[-half, -half, 0.0],
], dtype=np.float32)
def solve_marker_pose_in_camera(img_pts: np.ndarray, marker_size_m: float, K: np.ndarray, D: np.ndarray) -> Tuple[np.ndarray, np.ndarray, float, float]:
obj_pts = build_marker_object_points(marker_size_m)
success, rvec, tvec = cv2.solvePnP(
obj_pts,
img_pts.astype(np.float32),
K,
D,
flags=cv2.SOLVEPNP_IPPE_SQUARE,
)
if not success:
success, rvec, tvec = cv2.solvePnP(
obj_pts,
img_pts.astype(np.float32),
K,
D,
flags=cv2.SOLVEPNP_ITERATIVE,
)
if not success:
raise RuntimeError("solvePnP failed for one marker observation")
projected, _ = cv2.projectPoints(obj_pts, rvec, tvec, K, D)
projected = projected.reshape(-1, 2)
err = np.linalg.norm(projected - img_pts, axis=1)
rms = float(np.sqrt(np.mean(err ** 2)))
mx = float(np.max(err))
return rvec.reshape(3), tvec.reshape(3), rms, mx
def observation_to_world(
camera_pose: CameraPose,
marker_id: int,
det: Dict[str, Any],
K: np.ndarray,
D: np.ndarray,
robot_meta: Dict[int, List[Dict[str, Any]]],
default_marker_size_m: float,
) -> Optional[MarkerObservation]:
if det.get("image_points_px") is None:
return None
img_pts = np.array(det["image_points_px"], dtype=np.float32)
if img_pts.shape != (4, 2):
return None
marker_size_m = get_marker_size_from_detection(det, default_marker_size_m)
try:
rvec_cm, tvec_cm, reproj_rms_px, reproj_max_px = solve_marker_pose_in_camera(img_pts, marker_size_m, K, D)
except Exception:
return None
R_cm, _ = cv2.Rodrigues(rvec_cm)
cam_weight_base, det_score, cam_score = compute_observation_weight(camera_pose, det)
frontality = marker_frontality_score(R_cm)
# Final observation weight: base quality * frontality
obs_weight = clamp(cam_weight_base * (0.25 + 0.75 * frontality))
if obs_weight <= 1e-9:
obs_weight = 1e-9
# World transform
R_wc = camera_pose.rotation_w_from_c
t_wc = camera_pose.position_w
position_w = R_wc @ tvec_cm + t_wc
rotation_w_from_m = R_wc @ R_cm
metadata = summarize_robot_metadata(marker_id, robot_meta)
return MarkerObservation(
marker_id=marker_id,
camera_id=camera_pose.camera_id,
source_file=camera_pose.source_file,
image_points_px=img_pts,
marker_size_m=float(marker_size_m),
confidence=float(det.get("confidence", 1.0)),
quality_score=float(det_score),
camera_weight=float(cam_score),
observation_weight=float(obs_weight),
rvec_cm=rvec_cm.astype(np.float64),
tvec_cm=tvec_cm.astype(np.float64),
position_w=position_w.astype(np.float64),
rotation_w_from_m=rotation_w_from_m.astype(np.float64),
reprojection_rms_px=reproj_rms_px,
reprojection_max_px=reproj_max_px,
metadata=metadata,
)
# ============================================================
# Fusion pro Marker
# ============================================================
def fuse_marker_observations(marker_id: int, observations: List[MarkerObservation], robot_meta: Dict[int, List[Dict[str, Any]]]) -> FusedMarkerEstimate:
if not observations:
raise ValueError("No observations to fuse")
# Optional outlier suppression: one soft reweighting pass based on position and angle residuals.
pos_stack = np.stack([o.position_w for o in observations], axis=0)
weights = np.array([o.observation_weight for o in observations], dtype=np.float64)
weights = np.maximum(weights, 1e-9)
# Initial estimate
pos0 = np.average(pos_stack, axis=0, weights=weights)
rot0 = average_rotations_weighted([o.rotation_w_from_m for o in observations], weights)
# Residual-based robustification
pos_err = np.linalg.norm(pos_stack - pos0.reshape(1, 3), axis=1)
ang_err = np.array([angular_distance_deg(rot0, o.rotation_w_from_m) for o in observations], dtype=np.float64)
# Huber-like soft downweighting
pos_scale = max(0.01, float(np.median(pos_err) + 1e-9))
ang_scale = max(2.0, float(np.median(ang_err) + 1e-9))
robust = 1.0 / (1.0 + (pos_err / pos_scale) ** 2 + (ang_err / ang_scale) ** 2)
final_w = weights * robust
final_w = np.maximum(final_w, 1e-9)
pos = np.average(pos_stack, axis=0, weights=final_w)
rot = average_rotations_weighted([o.rotation_w_from_m for o in observations], final_w)
rpy = rotation_matrix_to_euler_zyx(rot)
camera_ids = sorted({o.camera_id for o in observations})
source_files = sorted({Path(o.source_file).name for o in observations})
weight_sum = float(np.sum(final_w))
weight_mean = float(np.mean(final_w))
confidence_vals = np.array([o.confidence for o in observations], dtype=np.float64)
quality_vals = np.array([o.quality_score for o in observations], dtype=np.float64)
reproj_rms_vals = np.array([o.reprojection_rms_px for o in observations], dtype=np.float64)
pos_std_m = float(np.sqrt(np.average(np.sum((pos_stack - pos.reshape(1, 3)) ** 2, axis=1), weights=final_w)))
angular_std_deg = float(np.sqrt(np.average(np.array([angular_distance_deg(rot, o.rotation_w_from_m) ** 2 for o in observations]), weights=final_w)))
# Reliability heuristic in [0,1]
num_obs = len(observations)
num_cams = len(camera_ids)
obs_factor = clamp(num_obs / 5.0)
cam_factor = clamp(num_cams / 3.0)
conf_factor = clamp(float(np.mean(confidence_vals)))
qual_factor = clamp(float(np.mean(quality_vals)))
spread_factor = clamp(1.0 - min(pos_std_m / 0.08, 1.0))
reproj_factor = clamp(1.0 - min(float(np.mean(reproj_rms_vals)) / 5.0, 1.0))
reliability = clamp(
0.18 * obs_factor +
0.15 * cam_factor +
0.22 * conf_factor +
0.15 * qual_factor +
0.15 * spread_factor +
0.15 * reproj_factor
)
metadata = summarize_robot_metadata(marker_id, robot_meta)
return FusedMarkerEstimate(
marker_id=marker_id,
position_w=pos.astype(np.float64),
rotation_w_from_m=rot.astype(np.float64),
orientation_rpy_deg=rpy,
observations=observations,
num_cameras=num_cams,
camera_ids=camera_ids,
weight_sum=weight_sum,
position_std_m=pos_std_m,
angular_std_deg=angular_std_deg,
confidence_mean=float(np.mean(confidence_vals)),
confidence_min=float(np.min(confidence_vals)),
confidence_max=float(np.max(confidence_vals)),
quality_mean=float(np.mean(quality_vals)),
quality_min=float(np.min(quality_vals)),
quality_max=float(np.max(quality_vals)),
reproj_rms_mean_px=float(np.mean(reproj_rms_vals)),
reproj_rms_max_px=float(np.max([o.reprojection_max_px for o in observations])),
reliability_0_1=reliability,
metadata=metadata,
)
# ============================================================
# Input Handling
# ============================================================
def collect_input_files(args: argparse.Namespace) -> List[str]:
files: List[str] = []
if args.json:
files.extend(args.json)
if args.input_dir and args.timestamp:
base = Path(args.input_dir)
ts = str(args.timestamp)
patterns = [
f"*_{ts}_aruco_detection.camera_pose.json",
f"*_{ts}_aruco_detection.json",
f"*_{ts}.camera_pose.json",
f"*_{ts}.json",
]
for pattern in patterns:
files.extend(str(p) for p in sorted(base.glob(pattern)))
# de-duplicate, keep order
seen = set()
uniq = []
for f in files:
ff = str(Path(f))
if ff not in seen:
seen.add(ff)
uniq.append(ff)
return uniq
def find_detection_json(camera_pose_json_path):
"""
Findet automatisch die passende Detection-JSON.
Beispiel:
xxx_aruco_detection.camera_pose.json
->
xxx_aruco_detection.json
"""
pose_path = Path(camera_pose_json_path)
name = pose_path.name
if not name.endswith(".camera_pose.json"):
raise ValueError(
f"Not a .camera_pose.json file: {pose_path}"
)
detection_name = name.replace(
".camera_pose.json",
".json"
)
detection_path = pose_path.with_name(detection_name)
if not detection_path.exists():
raise FileNotFoundError(
f"Detection JSON not found: {detection_path}"
)
return detection_path
def load_detection_json(camera_pose_json_path):
"""
Lädt automatisch die passende Detection-Datei.
"""
detection_path = find_detection_json(
camera_pose_json_path
)
with open(detection_path, "r", encoding="utf-8") as f:
return json.load(f)
def extract_intrinsics_from_detection(detection_data):
if "camera" not in detection_data:
raise ValueError("camera section missing")
camera = detection_data["camera"]
if "camera_matrix" not in camera:
raise ValueError("camera_matrix missing")
if "distortion_coefficients" not in camera:
raise ValueError("distortion_coefficients missing")
K = np.array(
camera["camera_matrix"],
dtype=np.float32
)
D = np.array(
camera["distortion_coefficients"],
dtype=np.float32
).reshape(-1, 1)
return K, D
def extract_default_marker_size(data: Dict[str, Any], robot_data: Dict[str, Any]) -> float:
# Priority: detection JSON -> robots.json -> fallback 25 mm
try:
vc = data.get("vision_config", {})
if isinstance(vc, dict) and vc.get("MarkerSize") is not None:
return float(vc["MarkerSize"])
except Exception:
pass
try:
vc = robot_data.get("vision_config", {})
if isinstance(vc, dict) and vc.get("MarkerSize") is not None:
return float(vc["MarkerSize"])
except Exception:
pass
return 0.025
# ============================================================
# CSV / JSON Export
# ============================================================
def fmt_float(x: Optional[float], nd: int = 6) -> str:
if x is None:
return ""
try:
if isinstance(x, float) and (math.isnan(x) or math.isinf(x)):
return ""
return f"{float(x):.{nd}f}"
except Exception:
return ""
def marker_row_from_estimate(est: FusedMarkerEstimate) -> Dict[str, Any]:
robot_on = est.metadata.get("robot_on") if est.metadata else None
robot_known = est.metadata.get("robot_known") if est.metadata else None
robot_relpos_count = est.metadata.get("robot_relpos_count") if est.metadata else 0
return {
"kind": "marker",
"id": str(est.marker_id),
"source_file": "",
"camera_id": "",
"robot_on": robot_on or "",
"robot_known": int(bool(robot_known)),
"robot_relpos_count": int(robot_relpos_count or 0),
"x_m": est.position_w[0],
"y_m": est.position_w[1],
"z_m": est.position_w[2],
"roll_deg": est.orientation_rpy_deg[0],
"pitch_deg": est.orientation_rpy_deg[1],
"yaw_deg": est.orientation_rpy_deg[2],
"num_observations": len(est.observations),
"num_cameras": est.num_cameras,
"cameras_seen_by": "|".join(est.camera_ids),
"weight_sum": est.weight_sum,
"weight_mean": est.weight_sum / max(1, len(est.observations)),
"confidence_mean": est.confidence_mean,
"confidence_min": est.confidence_min,
"confidence_max": est.confidence_max,
"quality_mean": est.quality_mean,
"quality_min": est.quality_min,
"quality_max": est.quality_max,
"reproj_rms_mean_px": est.reproj_rms_mean_px,
"reproj_rms_max_px": est.reproj_rms_max_px,
"position_std_m": est.position_std_m,
"angular_std_deg": est.angular_std_deg,
"reliability_0_1": est.reliability_0_1,
"source_files": "|".join(est.metadata.get("source_files", []) if est.metadata else []),
"note": "",
}
def camera_row_from_pose(cam: CameraPose) -> Dict[str, Any]:
roll, pitch, yaw = cam.orientation_rpy_deg
return {
"kind": "camera",
"id": cam.camera_id,
"source_file": Path(cam.source_file).name,
"camera_id": cam.camera_id,
"robot_on": "",
"robot_known": 0,
"robot_relpos_count": 0,
"x_m": cam.position_w[0],
"y_m": cam.position_w[1],
"z_m": cam.position_w[2],
"roll_deg": roll,
"pitch_deg": pitch,
"yaw_deg": yaw,
"num_observations": 0,
"num_cameras": 0,
"cameras_seen_by": cam.camera_id,
"weight_sum": "",
"weight_mean": "",
"confidence_mean": "",
"confidence_min": "",
"confidence_max": "",
"quality_mean": cam.quality.get("pose_quality_score", ""),
"quality_min": "",
"quality_max": "",
"reproj_rms_mean_px": cam.quality.get("reprojection_rms_px", ""),
"reproj_rms_max_px": cam.quality.get("reprojection_max_px", ""),
"position_std_m": "",
"angular_std_deg": "",
"reliability_0_1": get_camera_quality_score(cam),
"source_files": Path(cam.source_file).name,
"note": "",
}
def write_csv(rows: List[Dict[str, Any]], out_csv: str) -> None:
if not rows:
return
fieldnames = [
"kind", "id", "source_file", "camera_id",
"robot_on", "robot_known", "robot_relpos_count",
"x_m", "y_m", "z_m",
"roll_deg", "pitch_deg", "yaw_deg",
"num_observations", "num_cameras", "cameras_seen_by",
"weight_sum", "weight_mean",
"confidence_mean", "confidence_min", "confidence_max",
"quality_mean", "quality_min", "quality_max",
"reproj_rms_mean_px", "reproj_rms_max_px",
"position_std_m", "angular_std_deg",
"reliability_0_1", "source_files", "note",
]
with open(out_csv, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
cleaned = {}
for key in fieldnames:
value = row.get(key, "")
if isinstance(value, np.generic):
value = value.item()
cleaned[key] = value
writer.writerow(cleaned)
def write_summary_json(summary: Dict[str, Any], out_json: str) -> None:
with open(out_json, "w", encoding="utf-8") as f:
json.dump(summary, f, indent=2, ensure_ascii=False)
# ============================================================
# Main Pipeline
# ============================================================
def main() -> None:
parser = argparse.ArgumentParser(description="Phase 1: multi-camera world marker fusion from camera pose JSONs")
parser.add_argument("--json", action="append", help="Input JSON file (repeatable). Expected 2 to 5 files.")
parser.add_argument("--input-dir", type=str, default=None, help="Optional directory to scan")
parser.add_argument("--timestamp", type=str, default=None, help="Optional timestamp used with --input-dir")
parser.add_argument("--robots", type=str, default=None, help="Optional robots.json for metadata only")
parser.add_argument("--out-csv", type=str, default=None, help="Output CSV path")
parser.add_argument("--out-json", type=str, default=None, help="Optional summary JSON path")
parser.add_argument("--min-confidence", type=float, default=0.0, help="Filter observations below this confidence")
parser.add_argument("--verbose", action="store_true", help="Verbose output")
args = parser.parse_args()
files = collect_input_files(args)
if len(files) < 1:
raise SystemExit("No input JSON files provided. Use --json and/or --input-dir + --timestamp.")
if len(files) > 5:
print(f"[WARN] More than 5 input JSONs found ({len(files)}). All will be processed.")
elif len(files) < 2:
print(f"[WARN] Only {len(files)} input JSON file(s) found. The pipeline still runs.")
robot_meta = load_robot_metadata(args.robots)
# Parse all files first
cameras: List[CameraPose] = []
all_observations: List[MarkerObservation] = []
all_detections_count = 0
total_rejected = 0
for file_path in files:
data = load_json(file_path)
cam = extract_camera_pose(data, file_path)
cameras.append(cam)
detection_data = load_detection_json(json_file)
K, D = extract_intrinsics_from_detection(detection_data)
default_marker_size_m = extract_default_marker_size(data, load_json(args.robots) if args.robots else {})
detections = data.get("detections", []) if isinstance(data.get("detections", []), list) else []
all_detections_count += len(detections)
if args.verbose:
print(f"[INFO] {Path(file_path).name}: camera={cam.camera_id}, detections={len(detections)}")
for det in detections:
marker_id = det.get("marker_id")
if marker_id is None:
total_rejected += 1
continue
try:
marker_id_int = int(marker_id)
except Exception:
total_rejected += 1
continue
confidence = float(det.get("confidence", 1.0))
if confidence < args.min_confidence:
total_rejected += 1
continue
obs = observation_to_world(
camera_pose=cam,
marker_id=marker_id_int,
det=det,
K=K,
D=D,
robot_meta=robot_meta,
default_marker_size_m=default_marker_size_m,
)
if obs is None:
total_rejected += 1
continue
all_observations.append(obs)
# Group observations by marker id
obs_by_marker: Dict[int, List[MarkerObservation]] = {}
for obs in all_observations:
obs_by_marker.setdefault(obs.marker_id, []).append(obs)
# Build fused estimates
fused_markers: List[FusedMarkerEstimate] = []
for mid in sorted(obs_by_marker.keys()):
fused = fuse_marker_observations(mid, obs_by_marker[mid], robot_meta)
# attach source files to metadata for CSV convenience
fused.metadata = dict(fused.metadata)
fused.metadata["source_files"] = sorted({Path(o.source_file).name for o in fused.observations})
fused_markers.append(fused)
# CSV rows: cameras + markers
rows: List[Dict[str, Any]] = []
for cam in cameras:
rows.append(camera_row_from_pose(cam))
for est in fused_markers:
rows.append(marker_row_from_estimate(est))
# Sort: cameras first, then markers by numeric id if possible
def sort_key(row: Dict[str, Any]):
kind = row.get("kind", "")
if kind == "camera":
return (0, str(row.get("id", "")))
try:
mid = int(row.get("id", 10**9))
except Exception:
mid = 10**9
return (1, mid)
rows.sort(key=sort_key)
# Output paths
if args.out_csv:
out_csv = args.out_csv
else:
stem = Path(files[0]).stem
if "camera_pose" in stem:
stem = stem.replace(".camera_pose", "")
out_csv = str(Path(files[0]).with_name(f"{stem}_world_markers.csv"))
if args.out_json:
out_json = args.out_json
else:
out_json = str(Path(out_csv).with_suffix(".json"))
write_csv(rows, out_csv)
# Summary JSON: contains the fused marker estimates plus cameras and stats.
summary = {
"summary": {
"input_files": [Path(f).name for f in files],
"num_files": len(files),
"num_cameras": len(cameras),
"num_detections_total": all_detections_count,
"num_valid_observations": len(all_observations),
"num_markers_fused": len(fused_markers),
"num_rejected_detections": total_rejected,
},
"cameras": [
{
"camera_id": cam.camera_id,
"source_file": cam.source_file,
"position_m": cam.position_w.tolist(),
"rotation_matrix_world_from_camera": cam.rotation_w_from_c.tolist(),
"orientation_deg": {
"roll": cam.orientation_rpy_deg[0],
"pitch": cam.orientation_rpy_deg[1],
"yaw": cam.orientation_rpy_deg[2],
},
"quality": cam.quality,
}
for cam in cameras
],
"markers": [
{
"marker_id": est.marker_id,
"position_m": est.position_w.tolist(),
"rotation_matrix_world_from_marker": est.rotation_w_from_m.tolist(),
"orientation_deg": {
"roll": est.orientation_rpy_deg[0],
"pitch": est.orientation_rpy_deg[1],
"yaw": est.orientation_rpy_deg[2],
},
"num_observations": len(est.observations),
"num_cameras": est.num_cameras,
"camera_ids": est.camera_ids,
"weight_sum": est.weight_sum,
"position_std_m": est.position_std_m,
"angular_std_deg": est.angular_std_deg,
"confidence_mean": est.confidence_mean,
"quality_mean": est.quality_mean,
"reprojection_rms_mean_px": est.reproj_rms_mean_px,
"reliability_0_1": est.reliability_0_1,
"metadata": est.metadata,
"observations": [
{
"camera_id": o.camera_id,
"source_file": o.source_file,
"confidence": o.confidence,
"quality_score": o.quality_score,
"camera_weight": o.camera_weight,
"observation_weight": o.observation_weight,
"position_m": o.position_w.tolist(),
"rotation_matrix_world_from_marker": o.rotation_w_from_m.tolist(),
"reprojection_rms_px": o.reprojection_rms_px,
"reprojection_max_px": o.reprojection_max_px,
"metadata": o.metadata,
}
for o in est.observations
],
}
for est in fused_markers
],
}
write_summary_json(summary, out_json)
print(f"[OK] CSV written: {out_csv}")
print(f"[OK] JSON written: {out_json}")
print(f"[OK] Cameras: {len(cameras)}, markers fused: {len(fused_markers)}, observations: {len(all_observations)}")
if __name__ == "__main__":
main()