#!/usr/bin/env python3 """ 3_multiview_bundle_adjustment.py Multi-view ArUco marker position optimization with geometric constraints. Strategy: 1. Triangulate markers from multi-view observations (rough initial estimate) 2. Optimize marker positions with constraints: - Same-link distance constraints: markers on same link keep fixed relative distance - Joint-axis constraints: markers on linked arms via joint keep distance along joint axis 3. Use bundle adjustment: minimize reprojection error + constraint violations Key insight: We do NOT use robot.json marker positions (they're in different joint config). We only use robot.json hierarchy to DEFINE constraints. Dependencies: numpy, opencv-python, scipy """ from __future__ import annotations import argparse import json import os import sys import time from typing import Any, Dict, List, Optional, Tuple import cv2 import numpy as np # =================================================================== # Path / JSON helpers # =================================================================== def resolve_path(path: str) -> str: path = os.path.expanduser(path) if os.path.isabs(path): return path return os.path.abspath(path) def load_json(path: str) -> Dict[str, Any]: with open(resolve_path(path), "r", encoding="utf-8") as f: return json.load(f) def save_json(path: str, data: Dict[str, Any]) -> None: with open(resolve_path(path), "w", encoding="utf-8") as f: json.dump(data, f, indent=2) # =================================================================== # Units # =================================================================== def get_length_scale(robot_data: Dict[str, Any]) -> float: units = robot_data.get("units", {}) length_unit = str(units.get("length", "")).strip().lower() if length_unit in ("mm", "millimeter", "millimeters"): return 1.0 / 1000.0 elif length_unit in ("cm", "centimeter", "centimeters"): return 1.0 / 100.0 return 1.0 # =================================================================== # Constraint definitions (from robot.json structure ONLY) # =================================================================== class MarkerDistanceConstraint: """ Two markers on the same link must maintain a fixed distance. We compute this distance from their relative positions in robot.json, but scaled to meters. The ACTUAL 3D positions will be optimized, but their distance must stay constant. """ def __init__(self, marker_id_a: int, marker_id_b: int, link_name: str, target_distance: float, weight: float = 1.0): self.marker_id_a = marker_id_a self.marker_id_b = marker_id_b self.link_name = link_name self.target_distance = target_distance self.weight = weight class JointAxisConstraint: """ Two markers on linked arms (connected by joint around axis). If joint axis is [1,0,0], then delta_x component must stay constant. """ def __init__(self, marker_id_parent: int, marker_id_child: int, parent_link: str, child_link: str, joint_axis: np.ndarray, # unit vector [x,y,z] target_delta_along_axis: float, weight: float = 1.0): self.marker_id_parent = marker_id_parent self.marker_id_child = marker_id_child self.parent_link = parent_link self.child_link = child_link self.joint_axis = joint_axis / (np.linalg.norm(joint_axis) + 1e-9) self.target_delta_along_axis = target_delta_along_axis self.weight = weight def extract_constraints_from_hierarchy(robot_data: Dict[str, Any], length_scale: float) -> Tuple[Dict, List]: """ Extract constraint definitions from robot.json. NOTE: We only extract CONSTRAINT TYPES, not actual marker positions. Marker positions will come from triangulation. Returns: marker_to_link: {marker_id -> link_name} constraints: List of constraint objects """ constraints = [] marker_to_link = {} links = robot_data.get("links", {}) # Build map: link_name -> markers on that link link_markers = {} for link_name, link_data in links.items(): markers = link_data.get("markers", []) marker_ids = [] marker_positions = {} for marker in markers: marker_id = int(marker.get("id", -1)) pos = marker.get("position", None) if marker_id >= 0 and pos is not None and len(pos) == 3: marker_ids.append(marker_id) marker_positions[marker_id] = np.array(pos, dtype=np.float32) * np.float32(length_scale) marker_to_link[marker_id] = link_name link_markers[link_name] = (marker_ids, marker_positions) # Extract same-link distance constraints for link_name, (marker_ids, positions) in link_markers.items(): if len(marker_ids) >= 2: # For each pair, compute their distance for i in range(len(marker_ids)): for j in range(i + 1, len(marker_ids)): mid_a = marker_ids[i] mid_b = marker_ids[j] pos_a = positions[mid_a] pos_b = positions[mid_b] target_dist = float(np.linalg.norm(pos_b - pos_a)) constraint = MarkerDistanceConstraint( mid_a, mid_b, link_name, target_dist, weight=1.0 ) constraints.append(constraint) # Extract joint-axis constraints (if we have markers on parent and child links) for link_name, link_data in links.items(): parent_link = link_data.get("parent", None) if parent_link is None: continue joint_info = link_data.get("jointToParent", {}) if not joint_info: continue joint_axis = joint_info.get("axis", [1, 0, 0]) joint_axis = np.array(joint_axis, dtype=np.float32) # Get markers on this link and parent link child_marker_ids, child_positions = link_markers.get(link_name, ([], {})) parent_marker_ids, parent_positions = link_markers.get(parent_link, ([], {})) if len(child_marker_ids) > 0 and len(parent_marker_ids) > 0: # Create constraint between first marker of parent and first marker of child for mid_parent in parent_marker_ids: for mid_child in child_marker_ids: if mid_parent == mid_child: continue pos_parent = parent_positions[mid_parent] pos_child = child_positions[mid_child] delta = pos_child - pos_parent target_delta_along_axis = float( np.dot(delta, joint_axis) ) constraint = JointAxisConstraint( mid_parent, mid_child, parent_link, link_name, joint_axis, target_delta_along_axis, weight=0.5 ) constraints.append(constraint) return marker_to_link, constraints # === # Helper Function # def print_constraints_with_errors( title: str, constraints: List, positions: Dict[int, np.ndarray] ) -> None: print(f"\n[INFO] {title}") for idx, constraint in enumerate(constraints): # ---------------------------------------------------------- # Distance constraint # ---------------------------------------------------------- if isinstance(constraint, MarkerDistanceConstraint): if ( constraint.marker_id_a not in positions or constraint.marker_id_b not in positions ): continue pos_a = positions[constraint.marker_id_a] pos_b = positions[constraint.marker_id_b] actual_dist = np.linalg.norm(pos_b - pos_a) error_m = actual_dist - constraint.target_distance print( f" [{idx:03d}] DISTANCE | " f"Link='{constraint.link_name}' | " f"M{constraint.marker_id_a} <-> M{constraint.marker_id_b} | " f"target={constraint.target_distance*1000:.2f} mm | " f"actual={actual_dist*1000:.2f} mm | " f"error={error_m*1000:+.2f} mm" ) # ---------------------------------------------------------- # Joint-axis constraint # ---------------------------------------------------------- elif isinstance(constraint, JointAxisConstraint): if ( constraint.marker_id_parent not in positions or constraint.marker_id_child not in positions ): continue pos_parent = positions[constraint.marker_id_parent] pos_child = positions[constraint.marker_id_child] delta = pos_child - pos_parent actual = np.dot(delta, constraint.joint_axis) error_m = actual - constraint.target_delta_along_axis axis_str = np.array2string( constraint.joint_axis, precision=2, suppress_small=True ) print( f" [{idx:03d}] JOINT_AXIS | " f"{constraint.parent_link}(M{constraint.marker_id_parent}) -> " f"{constraint.child_link}(M{constraint.marker_id_child}) | " f"axis={axis_str} | " f"target={constraint.target_delta_along_axis*1000:.2f} mm | " f"actual={actual*1000:.2f} mm | " f"error={error_m*1000:+.2f} mm" ) # =================================================================== # Multi-view loading # =================================================================== def load_observations_and_poses( detection_files: List[str], pose_files: List[str] ) -> Tuple[Dict, List, List]: """Load observations and camera poses.""" if len(detection_files) != len(pose_files): raise ValueError(f"Mismatch: {len(detection_files)} detections vs {len(pose_files)} poses") marker_observations = {} cameras = [] obs_metadata = [] for cam_idx, (det_file, pose_file) in enumerate(zip(detection_files, pose_files)): det = load_json(det_file) pose_data = load_json(pose_file) # Camera intrinsics cam_section = det.get("camera", {}) K = np.array(cam_section.get("camera_matrix", []), dtype=np.float32).reshape(3, 3) D = np.array(cam_section.get("distortion_coefficients", []), dtype=np.float32).reshape(-1, 1) # Camera pose pose_section = pose_data.get("camera_pose", {}) world_to_cam = pose_section.get("world_to_camera", {}) R_wc = np.array(world_to_cam.get("rotation_matrix", []), dtype=np.float32).reshape(3, 3) t_wc = np.array(world_to_cam.get("translation_m", []), dtype=np.float32).reshape(3) cameras.append((K, D, R_wc, t_wc)) # Extract observations detections = det.get("detections", []) for det_obj in detections: marker_id = int(det_obj.get("marker_id", -1)) if marker_id < 0: continue center_px = np.array(det_obj.get("center_px", []), dtype=np.float32) # Undistort to normalized coordinates pts = center_px.reshape(1, 1, 2).astype(np.float32) und = cv2.undistortPoints(pts, K, D, P=None) norm_coords = und.reshape(2).astype(np.float32) if marker_id not in marker_observations: marker_observations[marker_id] = [] marker_observations[marker_id].append((cam_idx, norm_coords)) return marker_observations, cameras, obs_metadata # =================================================================== # Initial triangulation # =================================================================== def triangulate_marker_initial( marker_id: int, observations: List[Tuple[int, np.ndarray]], cameras: List[Tuple] ) -> Optional[np.ndarray]: """Rough triangulation using first two views.""" if len(observations) < 2: return None K1, D1, R_wc_1, t_wc_1 = cameras[observations[0][0]] K2, D2, R_wc_2, t_wc_2 = cameras[observations[1][0]] norm_coords_1 = observations[0][1] norm_coords_2 = observations[1][1] # Convert to pixel coordinates x1_px = K1[0, 0] * norm_coords_1[0] + K1[0, 2] y1_px = K1[1, 1] * norm_coords_1[1] + K1[1, 2] x2_px = K2[0, 0] * norm_coords_2[0] + K2[0, 2] y2_px = K2[1, 1] * norm_coords_2[1] + K2[1, 2] P1 = K1 @ np.hstack([R_wc_1, t_wc_1.reshape(3, 1)]) P2 = K2 @ np.hstack([R_wc_2, t_wc_2.reshape(3, 1)]) try: X_h = cv2.triangulatePoints(P1, P2, np.array([[x1_px], [y1_px]]).astype(np.float32), np.array([[x2_px], [y2_px]]).astype(np.float32)) X = (X_h[:3] / X_h[3]).reshape(3).astype(np.float32) return X except Exception: return None def initial_triangulation( marker_observations: Dict[int, List[Tuple[int, np.ndarray]]], cameras: List[Tuple] ) -> Dict[int, np.ndarray]: """Compute initial marker positions via triangulation.""" triangulated = {} for marker_id, observations in marker_observations.items(): X = triangulate_marker_initial(marker_id, observations, cameras) if X is not None: triangulated[marker_id] = X return triangulated # =================================================================== # Bundle adjustment with constraints # =================================================================== def bundle_adjustment_residuals( marker_positions_flat: np.ndarray, marker_ids: List[int], marker_observations: Dict[int, List[Tuple[int, np.ndarray]]], cameras: List[Tuple], constraints: List, lambda_constraint: float = 100.0 ) -> np.ndarray: """ Compute residuals for bundle adjustment. marker_positions_flat: [x0, y0, z0, x1, y1, z1, ...] Returns: [reprojection_residuals, constraint_residuals] """ marker_dict = {} for i, marker_id in enumerate(marker_ids): marker_dict[marker_id] = marker_positions_flat[i*3:(i+1)*3] residuals = [] # Reprojection residuals for marker_id, observations in marker_observations.items(): if marker_id not in marker_dict: continue X_world = marker_dict[marker_id] for cam_idx, norm_coords_obs in observations: K, D, R_wc, t_wc = cameras[cam_idx] # Project X_cam = R_wc @ X_world + t_wc if X_cam[2] > 0.001: # in front of camera proj_norm = X_cam[:2] / X_cam[2] residual = proj_norm - norm_coords_obs residuals.append(residual[0]) residuals.append(residual[1]) # Constraint residuals for constraint in constraints: if isinstance(constraint, MarkerDistanceConstraint): if constraint.marker_id_a in marker_dict and constraint.marker_id_b in marker_dict: pos_a = marker_dict[constraint.marker_id_a] pos_b = marker_dict[constraint.marker_id_b] actual_dist = np.linalg.norm(pos_b - pos_a) residual = (actual_dist - constraint.target_distance) * constraint.weight * lambda_constraint residuals.append(residual) elif isinstance(constraint, JointAxisConstraint): if constraint.marker_id_parent in marker_dict and constraint.marker_id_child in marker_dict: pos_parent = marker_dict[constraint.marker_id_parent] pos_child = marker_dict[constraint.marker_id_child] delta = pos_child - pos_parent actual_delta_along_axis = np.dot(delta, constraint.joint_axis) residual = (actual_delta_along_axis - constraint.target_delta_along_axis) * constraint.weight * lambda_constraint residuals.append(residual) return np.array(residuals, dtype=np.float64) def optimize_with_constraints( initial_positions: Dict[int, np.ndarray], marker_observations: Dict[int, List[Tuple[int, np.ndarray]]], cameras: List[Tuple], constraints: List, lambda_constraint: float = 100.0, max_iterations: int = 50 ) -> Dict[int, np.ndarray]: """ Optimize marker positions using scipy least squares. """ try: from scipy.optimize import least_squares except ImportError: print("[WARN] scipy not available, skipping optimization. Using initial triangulation.") return initial_positions marker_ids = sorted(initial_positions.keys()) x0 = np.concatenate([initial_positions[mid] for mid in marker_ids]) def residuals_fn(x): return bundle_adjustment_residuals(x, marker_ids, marker_observations, cameras, constraints, lambda_constraint) print(f"[INFO] Starting optimization with {len(x0)} variables and {len(constraints)} constraints...") result = least_squares( residuals_fn, x0, max_nfev=max_iterations * len(marker_ids), verbose=1 ) optimized_flat = result.x optimized = {} for i, marker_id in enumerate(marker_ids): optimized[marker_id] = optimized_flat[i*3:(i+1)*3] print(f"[INFO] Optimization complete. Final cost: {np.sum(result.fun**2):.6f}") return optimized # =================================================================== # Main # =================================================================== def main() -> None: parser = argparse.ArgumentParser( description="Multi-view bundle adjustment with geometric constraints" ) parser.add_argument( "-det", "--detections", action="append", required=True, help="*_aruco_detection.json files" ) parser.add_argument( "-pose", "--poses", action="append", required=True, help="*_camera_pose.json files" ) parser.add_argument( "-robot", "--robot", required=True, help="robot.json" ) parser.add_argument( "-outDir", "--outDir", default=None, help="Output directory" ) parser.add_argument( "-lambdaWeight", "--lambdaWeight", type=float, default=100.0, help="Constraint weight" ) args = parser.parse_args() if len(args.detections) != len(args.poses): print(f"[ERROR] Mismatch: {len(args.detections)} vs {len(args.poses)}") sys.exit(1) # Load robot robot_data = load_json(args.robot) length_scale = get_length_scale(robot_data) print("[STEP 1] Extract constraint definitions from robot.json hierarchy...") marker_to_link, constraints = extract_constraints_from_hierarchy(robot_data, length_scale) print(f"[INFO] Extracted {len(constraints)} constraints") print("\n[INFO] Constraint list:") for idx, constraint in enumerate(constraints): if isinstance(constraint, MarkerDistanceConstraint): print( f" [{idx:03d}] DISTANCE | " f"Link='{constraint.link_name}' | " f"Marker {constraint.marker_id_a} <-> {constraint.marker_id_b} | " f"TargetDistance={constraint.target_distance:.6f} m | " f"Weight={constraint.weight}" ) elif isinstance(constraint, JointAxisConstraint): axis_str = np.array2string( constraint.joint_axis, precision=3, suppress_small=True ) print( f" [{idx:03d}] JOINT_AXIS | " f"{constraint.parent_link}({constraint.marker_id_parent}) -> " f"{constraint.child_link}({constraint.marker_id_child}) | " f"Axis={axis_str} | " f"TargetDelta={constraint.target_delta_along_axis:.6f} m | " f"Weight={constraint.weight}" ) print("\n[STEP 2] Load observations and camera poses...") marker_observations, cameras, _ = load_observations_and_poses(args.detections, args.poses) print(f"[INFO] {len(cameras)} cameras, {len(marker_observations)} observed markers") print("\n[STEP 3] Initial triangulation...") initial_pos = initial_triangulation(marker_observations, cameras) print(f"[INFO] Triangulated {len(initial_pos)} markers") # ------------------------------------------------------------ # Save initial triangulated positions (before optimization) # ------------------------------------------------------------ initial_output_markers = [] for marker_id, position in sorted(initial_pos.items()): initial_output_markers.append({ "marker_id": int(marker_id), "position_m": [float(x) for x in position], "position_mm": [float(x * 1000.0) for x in position], "link": marker_to_link.get(marker_id, "unknown") }) initial_output = { "schema_version": "1.0", "created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "summary": { "stage": "initial_triangulation", "num_cameras": len(cameras), "num_markers": len(initial_pos) }, "markers": initial_output_markers } out_dir = args.outDir or os.path.dirname(args.detections[0]) or "." os.makedirs(resolve_path(out_dir), exist_ok=True) initial_out_file = os.path.join( out_dir, "aruco_positions_initial.json" ) save_json(initial_out_file, initial_output) print(f"[INFO] Initial triangulation saved to {initial_out_file}") print("\n[STEP 4] Bundle adjustment with constraints...") optimized_pos = optimize_with_constraints( initial_pos, marker_observations, cameras, constraints, lambda_constraint=args.lambdaWeight ) print_constraints_with_errors( "Constraint list AFTER optimization", constraints, optimized_pos ) # Output output_markers = [] for marker_id, position in sorted(optimized_pos.items()): output_markers.append({ "marker_id": int(marker_id), "position_m": [float(x) for x in position], "position_mm": [float(x * 1000.0) for x in position], "link": marker_to_link.get(marker_id, "unknown") }) output = { "schema_version": "1.0", "created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "summary": { "num_cameras": len(cameras), "num_markers": len(optimized_pos), "num_constraints": len(constraints) }, "markers": output_markers } out_dir = args.outDir or os.path.dirname(args.detections[0]) or "." os.makedirs(resolve_path(out_dir), exist_ok=True) out_file = os.path.join(out_dir, "aruco_positions_optimized.json") save_json(out_file, output) print(f"\n[INFO] Saved to {out_file}") if __name__ == "__main__": main()