#!/usr/bin/env python3 """ 3_fuse_markers_world.py PHASE 1B --------- Fusioniert Marker-Weltkoordinaten aus mehreren Kameras. EINGABE: --json *.camera_pose.json (mehrfach möglich) --robots robot.json Das Script findet automatisch: *.aruco_detection.json Beispiel: snapshot_video0_1779690911822_aruco_detection.camera_pose.json -> snapshot_video0_1779690911822_aruco_detection.json FEATURES: - mehrere Kameras (2..5) - automatische Detection-Datei-Erkennung - bekannte Marker aus robot.json - unbekannte Marker triangulieren - gewichtete Marker-Fusion - Qualitätsmetriken - CSV Export - JSON Export - Kamera Export - robuste Fehlerbehandlung - vorbereitet für spätere Rigid-Body Constraints OUTPUT: fused_markers.csv fused_markers.json Benötigt: pip install opencv-python numpy """ import argparse import csv import json import math from collections import defaultdict from pathlib import Path import cv2 import numpy as np # ============================================================ # JSON HELPERS # ============================================================ def load_json(path): with open(path, "r", encoding="utf-8") as f: return json.load(f) def save_json(path, data): with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) # ============================================================ # FILE MATCHING # ============================================================ def find_detection_json(camera_pose_json_path): """ Findet automatisch die passende *_aruco_detection.json Datei. Beispiel: INPUT: snapshot_video0_123_aruco_detection.camera_pose.json OUTPUT: snapshot_video0_123_aruco_detection.json """ pose_path = Path(camera_pose_json_path) name = pose_path.name if not name.endswith(".camera_pose.json"): raise ValueError( f"Expected .camera_pose.json: {pose_path}" ) detection_name = name.replace( ".camera_pose.json", ".json" ) detection_path = pose_path.with_name( detection_name ) if not detection_path.exists(): raise FileNotFoundError( "Matching detection JSON not found:\n" f"{detection_path}" ) return detection_path # ============================================================ # CAMERA POSE # ============================================================ def extract_camera_pose(camera_pose_data): if "camera_pose" not in camera_pose_data: raise ValueError("camera_pose missing") pose = camera_pose_data["camera_pose"] pos = pose["position_m"] cam_pos = np.array([ pos["x"], pos["y"], pos["z"] ], dtype=np.float32) if "rotation_matrix_world_from_camera" in pose: R_wc = np.array( pose["rotation_matrix_world_from_camera"], dtype=np.float32 ) else: ori = pose["orientation_deg"] R_wc = euler_to_rotation_matrix( ori["roll"], ori["pitch"], ori["yaw"] ) return R_wc, cam_pos # ============================================================ # INTRINSICS # ============================================================ def extract_intrinsics(detection_data): if "camera" not in detection_data: raise ValueError("camera section missing") camera = detection_data["camera"] if "camera_matrix" not in camera: raise ValueError("camera_matrix missing") if "distortion_coefficients" not in camera: raise ValueError("distortion_coefficients missing") K = np.array( camera["camera_matrix"], dtype=np.float32 ) D = np.array( camera["distortion_coefficients"], dtype=np.float32 ).reshape(-1, 1) return K, D # ============================================================ # ROTATION HELPERS # ============================================================ def euler_to_rotation_matrix( roll_deg, pitch_deg, yaw_deg ): r = math.radians(roll_deg) p = math.radians(pitch_deg) y = math.radians(yaw_deg) Rx = np.array([ [1, 0, 0], [0, math.cos(r), -math.sin(r)], [0, math.sin(r), math.cos(r)] ]) Ry = np.array([ [math.cos(p), 0, math.sin(p)], [0, 1, 0], [-math.sin(p), 0, math.cos(p)] ]) Rz = np.array([ [math.cos(y), -math.sin(y), 0], [math.sin(y), math.cos(y), 0], [0, 0, 1] ]) return Rz @ Ry @ Rx # ============================================================ # ROBOT MARKERS # ============================================================ def build_known_marker_lookup(robot_data): """ Nur Marker mit ABSOLUTER Weltposition. relPos wird in Phase 2 verwendet. """ lookup = {} for marker in robot_data.get("Marker", []): marker_id = int(marker.get("id", -1)) if marker_id < 0: continue if "position" not in marker: continue pos = marker["position"] if pos is None: continue if len(pos) != 3: continue lookup[marker_id] = np.array( pos, dtype=np.float32 ) return lookup # ============================================================ # MARKER POSE REL CAMERA # ============================================================ def estimate_marker_pose_camera( image_points, marker_size, K, D ): half = marker_size / 2.0 object_points = np.array([ [-half, half, 0], [half, half, 0], [half, -half, 0], [-half, -half, 0] ], dtype=np.float32) image_points = np.array( image_points, dtype=np.float32 ) success, rvec, tvec = cv2.solvePnP( object_points, image_points, K, D, flags=cv2.SOLVEPNP_IPPE_SQUARE ) if not success: return None R_mc, _ = cv2.Rodrigues(rvec) return { "rvec": rvec, "tvec": tvec.reshape(3), "R_mc": R_mc } # ============================================================ # MARKER WORLD TRANSFORM # ============================================================ def marker_world_position( cam_world_pos, R_wc, t_mc ): """ Marker Mittelpunkt in Weltkoordinaten. X_world = R_wc * X_cam + C """ return ( R_wc @ t_mc.reshape(3) ) + cam_world_pos # ============================================================ # WEIGHTING # ============================================================ def compute_marker_weight( detection, camera_pose_data ): """ Qualitätsgewicht. Verwendet: - confidence - reprojection RMS - Bildzentrum - Markerfläche - Sharpness """ confidence = float( detection.get("confidence", 0.5) ) quality = detection.get("quality", {}) area_px = float( quality.get("area_px", 1000) ) sharpness = quality.get( "sharpness", {} ) lap_var = float( sharpness.get( "laplacian_var", 500 ) ) geometry = quality.get( "geometry", {} ) dist_center = float( geometry.get( "distance_to_center_norm", 0.5 ) ) pose_quality = camera_pose_data.get( "quality", {} ) reproj = float( pose_quality.get( "reprojection_rms_px", 10.0 ) ) reproj_weight = 1.0 / (1.0 + reproj) area_weight = min( area_px / 2000.0, 1.0 ) sharpness_weight = min( lap_var / 5000.0, 1.0 ) center_weight = 1.0 - dist_center weight = ( confidence * reproj_weight * area_weight * sharpness_weight * center_weight ) return max(weight, 1e-6) # ============================================================ # FUSION # ============================================================ def weighted_average(points, weights): points = np.array(points) weights = np.array(weights) if len(points) == 1: return points[0] total_weight = np.sum(weights) if total_weight < 1e-9: return np.mean(points, axis=0) return np.sum( points * weights[:, None], axis=0 ) / total_weight # ============================================================ # MAIN # ============================================================ def main(): parser = argparse.ArgumentParser() parser.add_argument( "--json", action="append", required=True, help="*.camera_pose.json" ) parser.add_argument( "--robots", required=True, help="robot.json" ) parser.add_argument( "--outdir", default="." ) args = parser.parse_args() outdir = Path(args.outdir) outdir.mkdir( parents=True, exist_ok=True ) # ======================================================== # robot.json # ======================================================== robot_data = load_json( args.robots ) known_markers = build_known_marker_lookup( robot_data ) # ======================================================== # globale Observationen # ======================================================== observations = defaultdict(list) camera_exports = [] # ======================================================== # alle Kameras # ======================================================== for json_file in args.json: print() print("================================================") print("LOAD CAMERA") print("================================================") print(json_file) # ---------------------------------------------------- # camera pose json # ---------------------------------------------------- camera_pose_data = load_json( json_file ) # ---------------------------------------------------- # detection json automatisch finden # ---------------------------------------------------- detection_json = find_detection_json( json_file ) print( f"Detection JSON:\n{detection_json}" ) detection_data = load_json( detection_json ) # ---------------------------------------------------- # intrinsics # ---------------------------------------------------- K, D = extract_intrinsics( detection_data ) # ---------------------------------------------------- # kamerapose # ---------------------------------------------------- R_wc, cam_world_pos = extract_camera_pose( camera_pose_data ) camera_name = Path( json_file ).stem # ---------------------------------------------------- # camera export # ---------------------------------------------------- camera_exports.append({ "camera": camera_name, "x": float(cam_world_pos[0]), "y": float(cam_world_pos[1]), "z": float(cam_world_pos[2]) }) # ---------------------------------------------------- # detections # ---------------------------------------------------- detections = detection_data.get( "detections", [] ) print( f"Detections: {len(detections)}" ) # ---------------------------------------------------- # marker durchlaufen # ---------------------------------------------------- for det in detections: marker_id = int( det["marker_id"] ) marker_size = float( det["marker_size_m"] ) pose = estimate_marker_pose_camera( det["image_points_px"], marker_size, K, D ) if pose is None: continue world_pos = marker_world_position( cam_world_pos, R_wc, pose["tvec"] ) weight = compute_marker_weight( det, camera_pose_data ) observations[marker_id].append({ "world_pos": world_pos, "weight": weight, "camera": camera_name, "confidence": float( det.get("confidence", 0.5) ), "known_marker": marker_id in known_markers }) # ======================================================== # fusion # ======================================================== fused_markers = [] print() print("================================================") print("FUSE MARKERS") print("================================================") for marker_id, obs_list in observations.items(): points = [ o["world_pos"] for o in obs_list ] weights = [ o["weight"] for o in obs_list ] fused = weighted_average( points, weights ) spread = 0.0 if len(points) > 1: dists = [ np.linalg.norm(p - fused) for p in points ] spread = float( np.mean(dists) ) known = marker_id in known_markers mean_conf = float(np.mean([ o["confidence"] for o in obs_list ])) mean_weight = float(np.mean(weights)) print( f"Marker {marker_id:3d} | " f"cams={len(obs_list)} | " f"spread={spread:.4f}m | " f"known={known}" ) fused_markers.append({ "marker_id": marker_id, "x": float(fused[0]), "y": float(fused[1]), "z": float(fused[2]), "num_cameras": len(obs_list), "spread_m": spread, "known_marker": known, "mean_confidence": mean_conf, "mean_weight": mean_weight }) # ======================================================== # CSV EXPORT # ======================================================== csv_file = outdir / "fused_markers.csv" with open( csv_file, "w", newline="", encoding="utf-8" ) as f: writer = csv.DictWriter( f, fieldnames=[ "marker_id", "x", "y", "z", "num_cameras", "spread_m", "known_marker", "mean_confidence", "mean_weight" ] ) writer.writeheader() for row in fused_markers: writer.writerow(row) # ======================================================== # JSON EXPORT # ======================================================== export_json = { "fused_markers": fused_markers, "cameras": camera_exports } json_file = outdir / "fused_markers.json" save_json( json_file, export_json ) # ======================================================== # DONE # ======================================================== print() print("================================================") print("EXPORT") print("================================================") print(csv_file) print(json_file) print() # ============================================================ # ENTRY # ============================================================ if __name__ == "__main__": main()