#!/usr/bin/env python3 """ 3_fuse_markers_world.py Phase 1: - Liest 2 bis 5 JSON-Dateien ein, die jeweils eine Kamera-Pose und ArUco-Detections enthalten. - Rekonstruiert für jede Marker-Beobachtung eine Marker-Pose in Weltkoordinaten. - Fusiert alle Beobachtungen pro Marker gewichtet. - Gibt eine CSV-Liste mit Kameras und Markern aus. - Optional wird robots.json nur für Metadaten verwendet (nicht für die Schätzung). Typische Eingabe-Dateien: snapshot_video1_1779690911822_aruco_detection.camera_pose.json snapshot_video2_1779690911822_aruco_detection.camera_pose.json ... python 3_fuse_markers_world.py \ --json snapshot_video1_1779690911822_aruco_detection.camera_pose.json \ --json snapshot_video2_1779690911822_aruco_detection.camera_pose.json \ --json snapshot_video3_1779690911822_aruco_detection.camera_pose.json \ --robots robots.json Benötigt: pip install numpy opencv-python Hinweis: - Das Skript nutzt die Kameraposen als fest. - Die Markerkoordinaten werden pro Detektion aus SolvePnP bestimmt. - Danach werden alle Beobachtungen eines Markers robust gemittelt. """ from __future__ import annotations import argparse import csv import json import math from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple import cv2 import numpy as np # ============================================================ # Datenklassen # ============================================================ @dataclass class CameraPose: camera_id: str source_file: str position_w: np.ndarray # (3,) rotation_w_from_c: np.ndarray # (3,3) orientation_rpy_deg: Tuple[float, float, float] quality: Dict[str, Any] = field(default_factory=dict) @dataclass class MarkerObservation: marker_id: int camera_id: str source_file: str image_points_px: np.ndarray # (4,2) marker_size_m: float confidence: float quality_score: float camera_weight: float observation_weight: float rvec_cm: np.ndarray # (3,) tvec_cm: np.ndarray # (3,) position_w: np.ndarray # (3,) rotation_w_from_m: np.ndarray # (3,3) reprojection_rms_px: float reprojection_max_px: float metadata: Dict[str, Any] = field(default_factory=dict) @dataclass class FusedMarkerEstimate: marker_id: int position_w: np.ndarray rotation_w_from_m: np.ndarray orientation_rpy_deg: Tuple[float, float, float] observations: List[MarkerObservation] num_cameras: int camera_ids: List[str] weight_sum: float position_std_m: float angular_std_deg: float confidence_mean: float confidence_min: float confidence_max: float quality_mean: float quality_min: float quality_max: float reproj_rms_mean_px: float reproj_rms_max_px: float reliability_0_1: float metadata: Dict[str, Any] = field(default_factory=dict) # ============================================================ # Basisfunktionen: Rotationen / Euler / Quaternions # ============================================================ def clamp(value: float, lo: float = 0.0, hi: float = 1.0) -> float: return float(max(lo, min(hi, value))) def normalize_vec(v: np.ndarray, eps: float = 1e-12) -> np.ndarray: n = float(np.linalg.norm(v)) if n < eps: return v.astype(np.float64, copy=True) return (v / n).astype(np.float64) def rotation_matrix_to_euler_zyx(R: np.ndarray) -> Tuple[float, float, float]: """Returns roll, pitch, yaw in degrees. Convention: R = Rz(yaw) @ Ry(pitch) @ Rx(roll) """ yaw = math.degrees(math.atan2(R[1, 0], R[0, 0])) sp = math.sqrt(R[2, 1] ** 2 + R[2, 2] ** 2) pitch = math.degrees(math.atan2(-R[2, 0], sp)) roll = math.degrees(math.atan2(R[2, 1], R[2, 2])) return float(roll), float(pitch), float(yaw) def euler_zyx_to_rotation_matrix(roll_deg: float, pitch_deg: float, yaw_deg: float) -> np.ndarray: roll = math.radians(roll_deg) pitch = math.radians(pitch_deg) yaw = math.radians(yaw_deg) cr, sr = math.cos(roll), math.sin(roll) cp, sp = math.cos(pitch), math.sin(pitch) cy, sy = math.cos(yaw), math.sin(yaw) Rx = np.array([[1, 0, 0], [0, cr, -sr], [0, sr, cr]], dtype=np.float64) Ry = np.array([[cp, 0, sp], [0, 1, 0], [-sp, 0, cp]], dtype=np.float64) Rz = np.array([[cy, -sy, 0], [sy, cy, 0], [0, 0, 1]], dtype=np.float64) return (Rz @ Ry @ Rx).astype(np.float64) def rotation_matrix_to_quaternion(R: np.ndarray) -> np.ndarray: """Returns quaternion as [w, x, y, z].""" m = R.astype(np.float64) t = np.trace(m) if t > 0.0: s = math.sqrt(t + 1.0) * 2.0 w = 0.25 * s x = (m[2, 1] - m[1, 2]) / s y = (m[0, 2] - m[2, 0]) / s z = (m[1, 0] - m[0, 1]) / s elif m[0, 0] > m[1, 1] and m[0, 0] > m[2, 2]: s = math.sqrt(1.0 + m[0, 0] - m[1, 1] - m[2, 2]) * 2.0 w = (m[2, 1] - m[1, 2]) / s x = 0.25 * s y = (m[0, 1] + m[1, 0]) / s z = (m[0, 2] + m[2, 0]) / s elif m[1, 1] > m[2, 2]: s = math.sqrt(1.0 + m[1, 1] - m[0, 0] - m[2, 2]) * 2.0 w = (m[0, 2] - m[2, 0]) / s x = (m[0, 1] + m[1, 0]) / s y = 0.25 * s z = (m[1, 2] + m[2, 1]) / s else: s = math.sqrt(1.0 + m[2, 2] - m[0, 0] - m[1, 1]) * 2.0 w = (m[1, 0] - m[0, 1]) / s x = (m[0, 2] + m[2, 0]) / s y = (m[1, 2] + m[2, 1]) / s z = 0.25 * s q = np.array([w, x, y, z], dtype=np.float64) return normalize_vec(q) def quaternion_to_rotation_matrix(q: np.ndarray) -> np.ndarray: q = normalize_vec(q) w, x, y, z = q R = np.array([ [1 - 2 * (y * y + z * z), 2 * (x * y - z * w), 2 * (x * z + y * w)], [2 * (x * y + z * w), 1 - 2 * (x * x + z * z), 2 * (y * z - x * w)], [2 * (x * z - y * w), 2 * (y * z + x * w), 1 - 2 * (x * x + y * y)], ], dtype=np.float64) return R def average_rotations_weighted(rotations: Sequence[np.ndarray], weights: Sequence[float]) -> np.ndarray: """Averages rotations by weighted matrix sum + projection to SO(3).""" if not rotations: return np.eye(3, dtype=np.float64) M = np.zeros((3, 3), dtype=np.float64) for R, w in zip(rotations, weights): M += float(w) * R.astype(np.float64) U, _, Vt = np.linalg.svd(M) R = U @ Vt if np.linalg.det(R) < 0: U[:, -1] *= -1 R = U @ Vt return R.astype(np.float64) def angular_distance_deg(R_a: np.ndarray, R_b: np.ndarray) -> float: """Smallest angle between two rotations.""" R = R_a.T @ R_b tr = float(np.trace(R)) cos_theta = clamp((tr - 1.0) / 2.0, -1.0, 1.0) return float(math.degrees(math.acos(cos_theta))) # ============================================================ # JSON / Metadaten # ============================================================ def load_json(path: str) -> Dict[str, Any]: with open(path, "r", encoding="utf-8") as f: return json.load(f) def detect_camera_id(data: Dict[str, Any], source_file: str) -> str: camera = data.get("camera", {}) camera_pose = data.get("camera_pose", {}) for key in ("camera_id", "id", "name"): if isinstance(camera, dict) and camera.get(key): return str(camera[key]) if isinstance(camera_pose, dict) and camera_pose.get(key): return str(camera_pose[key]) return Path(source_file).stem def extract_camera_pose(data: Dict[str, Any], source_file: str) -> CameraPose: camera_id = detect_camera_id(data, source_file) # Camera pose may appear in different layouts. pose_block = data.get("camera_pose") if pose_block is None: pose_block = data.get("camera", {}) if not isinstance(pose_block, dict): raise ValueError(f"Camera pose block missing or invalid in: {source_file}") quality = pose_block.get("quality", {}) if isinstance(pose_block.get("quality", {}), dict) else {} # Position pos = None if isinstance(pose_block.get("position_m"), dict): pm = pose_block["position_m"] pos = np.array([pm.get("x", 0.0), pm.get("y", 0.0), pm.get("z", 0.0)], dtype=np.float64) elif isinstance(pose_block.get("position_mm"), (list, tuple)) and len(pose_block["position_mm"]) == 3: pos = np.array(pose_block["position_mm"], dtype=np.float64) / 1000.0 elif isinstance(pose_block.get("position"), (list, tuple)) and len(pose_block["position"]) == 3: pos = np.array(pose_block["position"], dtype=np.float64) if pos is None: raise ValueError(f"Camera position missing in: {source_file}") # Rotation if isinstance(pose_block.get("rotation_matrix_world_from_camera"), (list, tuple)): R_wc = np.array(pose_block["rotation_matrix_world_from_camera"], dtype=np.float64) elif isinstance(pose_block.get("rotation_matrix"), (list, tuple)): R_wc = np.array(pose_block["rotation_matrix"], dtype=np.float64) elif isinstance(pose_block.get("orientation_deg"), dict): od = pose_block["orientation_deg"] roll = float(od.get("roll", 0.0)) pitch = float(od.get("pitch", 0.0)) yaw = float(od.get("yaw", 0.0)) R_wc = euler_zyx_to_rotation_matrix(roll, pitch, yaw) else: R_wc = np.eye(3, dtype=np.float64) if R_wc.shape != (3, 3): raise ValueError(f"Invalid camera rotation matrix in: {source_file}") rpy = rotation_matrix_to_euler_zyx(R_wc) return CameraPose( camera_id=camera_id, source_file=source_file, position_w=pos, rotation_w_from_c=R_wc, orientation_rpy_deg=rpy, quality=quality, ) def load_robot_metadata(robot_path: Optional[str]) -> Dict[int, List[Dict[str, Any]]]: """Optional metadata only. Duplicate IDs are kept as a list.""" if not robot_path: return {} data = load_json(robot_path) marker_meta: Dict[int, List[Dict[str, Any]]] = {} for entry in data.get("Marker", []): try: mid = int(entry.get("id")) except Exception: continue marker_meta.setdefault(mid, []).append({ "on": entry.get("on"), "position": entry.get("position"), "relPos": entry.get("relPos"), "name": entry.get("name"), "relPosSource": entry.get("relPosSource"), }) return marker_meta def summarize_robot_metadata(marker_id: int, robot_meta: Dict[int, List[Dict[str, Any]]]) -> Dict[str, Any]: entries = robot_meta.get(marker_id, []) if not entries: return { "robot_known": False, "robot_on": None, "robot_positions": None, "robot_relpos_count": 0, } on_list = [] abs_pos_list = [] rel_pos_list = [] for e in entries: if e.get("on") is not None: on_list.append(str(e.get("on"))) if e.get("position") is not None: abs_pos_list.append(e.get("position")) if e.get("relPos") is not None: rel_pos_list.append(e.get("relPos")) return { "robot_known": True, "robot_on": "|".join(dict.fromkeys(on_list)) if on_list else None, "robot_positions": abs_pos_list if abs_pos_list else None, "robot_relpos_count": len(rel_pos_list), } # ============================================================ # Detection / Gewichtung # ============================================================ def get_camera_quality_score(camera_pose: CameraPose) -> float: q = camera_pose.quality or {} if "pose_quality_score" in q: try: return clamp(float(q["pose_quality_score"]) / 100.0) except Exception: pass if "reprojection_rms_px" in q: try: rms = float(q["reprojection_rms_px"]) # Heuristik: 0 px -> 1.0, 5 px -> 0.5, 10 px -> ~0.33 return clamp(1.0 / (1.0 + 0.2 * rms)) except Exception: pass return 1.0 def get_marker_size_from_detection(det: Dict[str, Any], default_size_m: float) -> float: try: return float(det.get("marker_size_m", default_size_m)) except Exception: return float(default_size_m) def score_from_range(value: Optional[float], low: float, high: float, invert: bool = False) -> float: if value is None: return 1.0 if high <= low: return 1.0 t = (float(value) - low) / (high - low) t = clamp(t) return 1.0 - t if invert else t def detection_quality_score(det: Dict[str, Any]) -> float: """Heuristischer Score in [0,1]. Falls Felder fehlen, wird 1.0 angenommen.""" confidence = float(det.get("confidence", 1.0)) quality = det.get("quality", {}) if isinstance(det.get("quality", {}), dict) else {} # Subscores area_px = quality.get("area_px") sharpness = (((quality.get("sharpness") or {}).get("laplacian_var")) if isinstance(quality.get("sharpness"), dict) else None) contrast = (((quality.get("contrast") or {}).get("dynamic_range")) if isinstance(quality.get("contrast"), dict) else None) dist_border = (((quality.get("geometry") or {}).get("distance_to_border_px")) if isinstance(quality.get("geometry"), dict) else None) dist_center = (((quality.get("geometry") or {}).get("distance_to_center_norm")) if isinstance(quality.get("geometry"), dict) else None) edge_ratio = quality.get("edge_ratio") area_score = score_from_range(area_px, 150.0, 3000.0) sharp_score = score_from_range(sharpness, 500.0, 5000.0) contrast_score = score_from_range(contrast, 40.0, 180.0) border_score = score_from_range(dist_border, 20.0, 250.0) center_score = score_from_range(dist_center, 0.75, 0.0, invert=True) # closer to center -> higher edge_ratio_score = 1.0 if edge_ratio is not None: try: edge_ratio_score = clamp(1.0 - min(abs(float(edge_ratio) - 1.0) / 1.5, 1.0)) except Exception: edge_ratio_score = 1.0 # Weighted sum, confidence strongest score = ( 0.35 * clamp(confidence) + 0.15 * area_score + 0.15 * sharp_score + 0.10 * contrast_score + 0.10 * border_score + 0.10 * center_score + 0.05 * edge_ratio_score ) return clamp(score) def marker_frontality_score(rotation_m_from_cam: np.ndarray) -> float: """Marker normal relative to camera optical axis (+Z camera). 1.0 is frontal.""" normal_cam = rotation_m_from_cam[:, 2] # absolute because the sign depends on marker axis convention; we only want frontality return clamp(abs(float(normal_cam[2]))) def compute_observation_weight(camera_pose: CameraPose, det: Dict[str, Any]) -> Tuple[float, float, float]: det_score = detection_quality_score(det) cam_score = get_camera_quality_score(camera_pose) # frontality will be added after pose estimation; here only base score. base = clamp(det_score * cam_score) return base, det_score, cam_score # ============================================================ # Marker Pose Schätzung pro Observation # ============================================================ def build_marker_object_points(marker_size_m: float) -> np.ndarray: half = marker_size_m / 2.0 # OpenCV ArUco corner order: TL, TR, BR, BL return np.array([ [-half, half, 0.0], [ half, half, 0.0], [ half, -half, 0.0], [-half, -half, 0.0], ], dtype=np.float32) def solve_marker_pose_in_camera(img_pts: np.ndarray, marker_size_m: float, K: np.ndarray, D: np.ndarray) -> Tuple[np.ndarray, np.ndarray, float, float]: obj_pts = build_marker_object_points(marker_size_m) success, rvec, tvec = cv2.solvePnP( obj_pts, img_pts.astype(np.float32), K, D, flags=cv2.SOLVEPNP_IPPE_SQUARE, ) if not success: success, rvec, tvec = cv2.solvePnP( obj_pts, img_pts.astype(np.float32), K, D, flags=cv2.SOLVEPNP_ITERATIVE, ) if not success: raise RuntimeError("solvePnP failed for one marker observation") projected, _ = cv2.projectPoints(obj_pts, rvec, tvec, K, D) projected = projected.reshape(-1, 2) err = np.linalg.norm(projected - img_pts, axis=1) rms = float(np.sqrt(np.mean(err ** 2))) mx = float(np.max(err)) return rvec.reshape(3), tvec.reshape(3), rms, mx def observation_to_world( camera_pose: CameraPose, marker_id: int, det: Dict[str, Any], K: np.ndarray, D: np.ndarray, robot_meta: Dict[int, List[Dict[str, Any]]], default_marker_size_m: float, ) -> Optional[MarkerObservation]: if det.get("image_points_px") is None: return None img_pts = np.array(det["image_points_px"], dtype=np.float32) if img_pts.shape != (4, 2): return None marker_size_m = get_marker_size_from_detection(det, default_marker_size_m) try: rvec_cm, tvec_cm, reproj_rms_px, reproj_max_px = solve_marker_pose_in_camera(img_pts, marker_size_m, K, D) except Exception: return None R_cm, _ = cv2.Rodrigues(rvec_cm) cam_weight_base, det_score, cam_score = compute_observation_weight(camera_pose, det) frontality = marker_frontality_score(R_cm) # Final observation weight: base quality * frontality obs_weight = clamp(cam_weight_base * (0.25 + 0.75 * frontality)) if obs_weight <= 1e-9: obs_weight = 1e-9 # World transform R_wc = camera_pose.rotation_w_from_c t_wc = camera_pose.position_w position_w = R_wc @ tvec_cm + t_wc rotation_w_from_m = R_wc @ R_cm metadata = summarize_robot_metadata(marker_id, robot_meta) return MarkerObservation( marker_id=marker_id, camera_id=camera_pose.camera_id, source_file=camera_pose.source_file, image_points_px=img_pts, marker_size_m=float(marker_size_m), confidence=float(det.get("confidence", 1.0)), quality_score=float(det_score), camera_weight=float(cam_score), observation_weight=float(obs_weight), rvec_cm=rvec_cm.astype(np.float64), tvec_cm=tvec_cm.astype(np.float64), position_w=position_w.astype(np.float64), rotation_w_from_m=rotation_w_from_m.astype(np.float64), reprojection_rms_px=reproj_rms_px, reprojection_max_px=reproj_max_px, metadata=metadata, ) # ============================================================ # Fusion pro Marker # ============================================================ def fuse_marker_observations(marker_id: int, observations: List[MarkerObservation], robot_meta: Dict[int, List[Dict[str, Any]]]) -> FusedMarkerEstimate: if not observations: raise ValueError("No observations to fuse") # Optional outlier suppression: one soft reweighting pass based on position and angle residuals. pos_stack = np.stack([o.position_w for o in observations], axis=0) weights = np.array([o.observation_weight for o in observations], dtype=np.float64) weights = np.maximum(weights, 1e-9) # Initial estimate pos0 = np.average(pos_stack, axis=0, weights=weights) rot0 = average_rotations_weighted([o.rotation_w_from_m for o in observations], weights) # Residual-based robustification pos_err = np.linalg.norm(pos_stack - pos0.reshape(1, 3), axis=1) ang_err = np.array([angular_distance_deg(rot0, o.rotation_w_from_m) for o in observations], dtype=np.float64) # Huber-like soft downweighting pos_scale = max(0.01, float(np.median(pos_err) + 1e-9)) ang_scale = max(2.0, float(np.median(ang_err) + 1e-9)) robust = 1.0 / (1.0 + (pos_err / pos_scale) ** 2 + (ang_err / ang_scale) ** 2) final_w = weights * robust final_w = np.maximum(final_w, 1e-9) pos = np.average(pos_stack, axis=0, weights=final_w) rot = average_rotations_weighted([o.rotation_w_from_m for o in observations], final_w) rpy = rotation_matrix_to_euler_zyx(rot) camera_ids = sorted({o.camera_id for o in observations}) source_files = sorted({Path(o.source_file).name for o in observations}) weight_sum = float(np.sum(final_w)) weight_mean = float(np.mean(final_w)) confidence_vals = np.array([o.confidence for o in observations], dtype=np.float64) quality_vals = np.array([o.quality_score for o in observations], dtype=np.float64) reproj_rms_vals = np.array([o.reprojection_rms_px for o in observations], dtype=np.float64) pos_std_m = float(np.sqrt(np.average(np.sum((pos_stack - pos.reshape(1, 3)) ** 2, axis=1), weights=final_w))) angular_std_deg = float(np.sqrt(np.average(np.array([angular_distance_deg(rot, o.rotation_w_from_m) ** 2 for o in observations]), weights=final_w))) # Reliability heuristic in [0,1] num_obs = len(observations) num_cams = len(camera_ids) obs_factor = clamp(num_obs / 5.0) cam_factor = clamp(num_cams / 3.0) conf_factor = clamp(float(np.mean(confidence_vals))) qual_factor = clamp(float(np.mean(quality_vals))) spread_factor = clamp(1.0 - min(pos_std_m / 0.08, 1.0)) reproj_factor = clamp(1.0 - min(float(np.mean(reproj_rms_vals)) / 5.0, 1.0)) reliability = clamp( 0.18 * obs_factor + 0.15 * cam_factor + 0.22 * conf_factor + 0.15 * qual_factor + 0.15 * spread_factor + 0.15 * reproj_factor ) metadata = summarize_robot_metadata(marker_id, robot_meta) return FusedMarkerEstimate( marker_id=marker_id, position_w=pos.astype(np.float64), rotation_w_from_m=rot.astype(np.float64), orientation_rpy_deg=rpy, observations=observations, num_cameras=num_cams, camera_ids=camera_ids, weight_sum=weight_sum, position_std_m=pos_std_m, angular_std_deg=angular_std_deg, confidence_mean=float(np.mean(confidence_vals)), confidence_min=float(np.min(confidence_vals)), confidence_max=float(np.max(confidence_vals)), quality_mean=float(np.mean(quality_vals)), quality_min=float(np.min(quality_vals)), quality_max=float(np.max(quality_vals)), reproj_rms_mean_px=float(np.mean(reproj_rms_vals)), reproj_rms_max_px=float(np.max([o.reprojection_max_px for o in observations])), reliability_0_1=reliability, metadata=metadata, ) # ============================================================ # Input Handling # ============================================================ def collect_input_files(args: argparse.Namespace) -> List[str]: files: List[str] = [] if args.json: files.extend(args.json) if args.input_dir and args.timestamp: base = Path(args.input_dir) ts = str(args.timestamp) patterns = [ f"*_{ts}_aruco_detection.camera_pose.json", f"*_{ts}_aruco_detection.json", f"*_{ts}.camera_pose.json", f"*_{ts}.json", ] for pattern in patterns: files.extend(str(p) for p in sorted(base.glob(pattern))) # de-duplicate, keep order seen = set() uniq = [] for f in files: ff = str(Path(f)) if ff not in seen: seen.add(ff) uniq.append(ff) return uniq def find_detection_json(camera_pose_json_path): """ Findet automatisch die passende Detection-JSON. Beispiel: xxx_aruco_detection.camera_pose.json -> xxx_aruco_detection.json """ pose_path = Path(camera_pose_json_path) name = pose_path.name if not name.endswith(".camera_pose.json"): raise ValueError( f"Not a .camera_pose.json file: {pose_path}" ) detection_name = name.replace( ".camera_pose.json", ".json" ) detection_path = pose_path.with_name(detection_name) if not detection_path.exists(): raise FileNotFoundError( f"Detection JSON not found: {detection_path}" ) return detection_path def load_detection_json(camera_pose_json_path): """ Lädt automatisch die passende Detection-Datei. """ detection_path = find_detection_json( camera_pose_json_path ) with open(detection_path, "r", encoding="utf-8") as f: return json.load(f) def extract_intrinsics_from_detection(detection_data): if "camera" not in detection_data: raise ValueError("camera section missing") camera = detection_data["camera"] if "camera_matrix" not in camera: raise ValueError("camera_matrix missing") if "distortion_coefficients" not in camera: raise ValueError("distortion_coefficients missing") K = np.array( camera["camera_matrix"], dtype=np.float32 ) D = np.array( camera["distortion_coefficients"], dtype=np.float32 ).reshape(-1, 1) return K, D def extract_default_marker_size(data: Dict[str, Any], robot_data: Dict[str, Any]) -> float: # Priority: detection JSON -> robots.json -> fallback 25 mm try: vc = data.get("vision_config", {}) if isinstance(vc, dict) and vc.get("MarkerSize") is not None: return float(vc["MarkerSize"]) except Exception: pass try: vc = robot_data.get("vision_config", {}) if isinstance(vc, dict) and vc.get("MarkerSize") is not None: return float(vc["MarkerSize"]) except Exception: pass return 0.025 # ============================================================ # CSV / JSON Export # ============================================================ def fmt_float(x: Optional[float], nd: int = 6) -> str: if x is None: return "" try: if isinstance(x, float) and (math.isnan(x) or math.isinf(x)): return "" return f"{float(x):.{nd}f}" except Exception: return "" def marker_row_from_estimate(est: FusedMarkerEstimate) -> Dict[str, Any]: robot_on = est.metadata.get("robot_on") if est.metadata else None robot_known = est.metadata.get("robot_known") if est.metadata else None robot_relpos_count = est.metadata.get("robot_relpos_count") if est.metadata else 0 return { "kind": "marker", "id": str(est.marker_id), "source_file": "", "camera_id": "", "robot_on": robot_on or "", "robot_known": int(bool(robot_known)), "robot_relpos_count": int(robot_relpos_count or 0), "x_m": est.position_w[0], "y_m": est.position_w[1], "z_m": est.position_w[2], "roll_deg": est.orientation_rpy_deg[0], "pitch_deg": est.orientation_rpy_deg[1], "yaw_deg": est.orientation_rpy_deg[2], "num_observations": len(est.observations), "num_cameras": est.num_cameras, "cameras_seen_by": "|".join(est.camera_ids), "weight_sum": est.weight_sum, "weight_mean": est.weight_sum / max(1, len(est.observations)), "confidence_mean": est.confidence_mean, "confidence_min": est.confidence_min, "confidence_max": est.confidence_max, "quality_mean": est.quality_mean, "quality_min": est.quality_min, "quality_max": est.quality_max, "reproj_rms_mean_px": est.reproj_rms_mean_px, "reproj_rms_max_px": est.reproj_rms_max_px, "position_std_m": est.position_std_m, "angular_std_deg": est.angular_std_deg, "reliability_0_1": est.reliability_0_1, "source_files": "|".join(est.metadata.get("source_files", []) if est.metadata else []), "note": "", } def camera_row_from_pose(cam: CameraPose) -> Dict[str, Any]: roll, pitch, yaw = cam.orientation_rpy_deg return { "kind": "camera", "id": cam.camera_id, "source_file": Path(cam.source_file).name, "camera_id": cam.camera_id, "robot_on": "", "robot_known": 0, "robot_relpos_count": 0, "x_m": cam.position_w[0], "y_m": cam.position_w[1], "z_m": cam.position_w[2], "roll_deg": roll, "pitch_deg": pitch, "yaw_deg": yaw, "num_observations": 0, "num_cameras": 0, "cameras_seen_by": cam.camera_id, "weight_sum": "", "weight_mean": "", "confidence_mean": "", "confidence_min": "", "confidence_max": "", "quality_mean": cam.quality.get("pose_quality_score", ""), "quality_min": "", "quality_max": "", "reproj_rms_mean_px": cam.quality.get("reprojection_rms_px", ""), "reproj_rms_max_px": cam.quality.get("reprojection_max_px", ""), "position_std_m": "", "angular_std_deg": "", "reliability_0_1": get_camera_quality_score(cam), "source_files": Path(cam.source_file).name, "note": "", } def write_csv(rows: List[Dict[str, Any]], out_csv: str) -> None: if not rows: return fieldnames = [ "kind", "id", "source_file", "camera_id", "robot_on", "robot_known", "robot_relpos_count", "x_m", "y_m", "z_m", "roll_deg", "pitch_deg", "yaw_deg", "num_observations", "num_cameras", "cameras_seen_by", "weight_sum", "weight_mean", "confidence_mean", "confidence_min", "confidence_max", "quality_mean", "quality_min", "quality_max", "reproj_rms_mean_px", "reproj_rms_max_px", "position_std_m", "angular_std_deg", "reliability_0_1", "source_files", "note", ] with open(out_csv, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for row in rows: cleaned = {} for key in fieldnames: value = row.get(key, "") if isinstance(value, np.generic): value = value.item() cleaned[key] = value writer.writerow(cleaned) def write_summary_json(summary: Dict[str, Any], out_json: str) -> None: with open(out_json, "w", encoding="utf-8") as f: json.dump(summary, f, indent=2, ensure_ascii=False) # ============================================================ # Main Pipeline # ============================================================ def main() -> None: parser = argparse.ArgumentParser(description="Phase 1: multi-camera world marker fusion from camera pose JSONs") parser.add_argument("--json", action="append", help="Input JSON file (repeatable). Expected 2 to 5 files.") parser.add_argument("--input-dir", type=str, default=None, help="Optional directory to scan") parser.add_argument("--timestamp", type=str, default=None, help="Optional timestamp used with --input-dir") parser.add_argument("--robots", type=str, default=None, help="Optional robots.json for metadata only") parser.add_argument("--out-csv", type=str, default=None, help="Output CSV path") parser.add_argument("--out-json", type=str, default=None, help="Optional summary JSON path") parser.add_argument("--min-confidence", type=float, default=0.0, help="Filter observations below this confidence") parser.add_argument("--verbose", action="store_true", help="Verbose output") args = parser.parse_args() files = collect_input_files(args) if len(files) < 1: raise SystemExit("No input JSON files provided. Use --json and/or --input-dir + --timestamp.") if len(files) > 5: print(f"[WARN] More than 5 input JSONs found ({len(files)}). All will be processed.") elif len(files) < 2: print(f"[WARN] Only {len(files)} input JSON file(s) found. The pipeline still runs.") robot_meta = load_robot_metadata(args.robots) # Parse all files first cameras: List[CameraPose] = [] all_observations: List[MarkerObservation] = [] all_detections_count = 0 total_rejected = 0 for file_path in files: data = load_json(file_path) cam = extract_camera_pose(data, file_path) cameras.append(cam) detection_data = load_detection_json(json_file) K, D = extract_intrinsics_from_detection(detection_data) default_marker_size_m = extract_default_marker_size(data, load_json(args.robots) if args.robots else {}) detections = data.get("detections", []) if isinstance(data.get("detections", []), list) else [] all_detections_count += len(detections) if args.verbose: print(f"[INFO] {Path(file_path).name}: camera={cam.camera_id}, detections={len(detections)}") for det in detections: marker_id = det.get("marker_id") if marker_id is None: total_rejected += 1 continue try: marker_id_int = int(marker_id) except Exception: total_rejected += 1 continue confidence = float(det.get("confidence", 1.0)) if confidence < args.min_confidence: total_rejected += 1 continue obs = observation_to_world( camera_pose=cam, marker_id=marker_id_int, det=det, K=K, D=D, robot_meta=robot_meta, default_marker_size_m=default_marker_size_m, ) if obs is None: total_rejected += 1 continue all_observations.append(obs) # Group observations by marker id obs_by_marker: Dict[int, List[MarkerObservation]] = {} for obs in all_observations: obs_by_marker.setdefault(obs.marker_id, []).append(obs) # Build fused estimates fused_markers: List[FusedMarkerEstimate] = [] for mid in sorted(obs_by_marker.keys()): fused = fuse_marker_observations(mid, obs_by_marker[mid], robot_meta) # attach source files to metadata for CSV convenience fused.metadata = dict(fused.metadata) fused.metadata["source_files"] = sorted({Path(o.source_file).name for o in fused.observations}) fused_markers.append(fused) # CSV rows: cameras + markers rows: List[Dict[str, Any]] = [] for cam in cameras: rows.append(camera_row_from_pose(cam)) for est in fused_markers: rows.append(marker_row_from_estimate(est)) # Sort: cameras first, then markers by numeric id if possible def sort_key(row: Dict[str, Any]): kind = row.get("kind", "") if kind == "camera": return (0, str(row.get("id", ""))) try: mid = int(row.get("id", 10**9)) except Exception: mid = 10**9 return (1, mid) rows.sort(key=sort_key) # Output paths if args.out_csv: out_csv = args.out_csv else: stem = Path(files[0]).stem if "camera_pose" in stem: stem = stem.replace(".camera_pose", "") out_csv = str(Path(files[0]).with_name(f"{stem}_world_markers.csv")) if args.out_json: out_json = args.out_json else: out_json = str(Path(out_csv).with_suffix(".json")) write_csv(rows, out_csv) # Summary JSON: contains the fused marker estimates plus cameras and stats. summary = { "summary": { "input_files": [Path(f).name for f in files], "num_files": len(files), "num_cameras": len(cameras), "num_detections_total": all_detections_count, "num_valid_observations": len(all_observations), "num_markers_fused": len(fused_markers), "num_rejected_detections": total_rejected, }, "cameras": [ { "camera_id": cam.camera_id, "source_file": cam.source_file, "position_m": cam.position_w.tolist(), "rotation_matrix_world_from_camera": cam.rotation_w_from_c.tolist(), "orientation_deg": { "roll": cam.orientation_rpy_deg[0], "pitch": cam.orientation_rpy_deg[1], "yaw": cam.orientation_rpy_deg[2], }, "quality": cam.quality, } for cam in cameras ], "markers": [ { "marker_id": est.marker_id, "position_m": est.position_w.tolist(), "rotation_matrix_world_from_marker": est.rotation_w_from_m.tolist(), "orientation_deg": { "roll": est.orientation_rpy_deg[0], "pitch": est.orientation_rpy_deg[1], "yaw": est.orientation_rpy_deg[2], }, "num_observations": len(est.observations), "num_cameras": est.num_cameras, "camera_ids": est.camera_ids, "weight_sum": est.weight_sum, "position_std_m": est.position_std_m, "angular_std_deg": est.angular_std_deg, "confidence_mean": est.confidence_mean, "quality_mean": est.quality_mean, "reprojection_rms_mean_px": est.reproj_rms_mean_px, "reliability_0_1": est.reliability_0_1, "metadata": est.metadata, "observations": [ { "camera_id": o.camera_id, "source_file": o.source_file, "confidence": o.confidence, "quality_score": o.quality_score, "camera_weight": o.camera_weight, "observation_weight": o.observation_weight, "position_m": o.position_w.tolist(), "rotation_matrix_world_from_marker": o.rotation_w_from_m.tolist(), "reprojection_rms_px": o.reprojection_rms_px, "reprojection_max_px": o.reprojection_max_px, "metadata": o.metadata, } for o in est.observations ], } for est in fused_markers ], } write_summary_json(summary, out_json) print(f"[OK] CSV written: {out_csv}") print(f"[OK] JSON written: {out_json}") print(f"[OK] Cameras: {len(cameras)}, markers fused: {len(fused_markers)}, observations: {len(all_observations)}") if __name__ == "__main__": main()