#!/usr/bin/env python3 """ 3_multiview_bundle_adjustment_v4.py Multi-view ArUco marker position optimization with explicit, switchable degrees-of-freedom constraints. Mathematical model ------------------ We estimate 3D marker positions X_i ∈ R^3 by minimizing E(X) = Σ_{i,c} w_ic || π_c(X_i) - u_ic ||² + λ_r Σ_j w_j^r || ||X_a - X_b|| - d_j ||² + λ_rev Σ_k w_k^rev || (X_b - X_a)·a_k - t_k ||² + λ_pri Σ_m w_m^pri ( ||(X_b - X_a)·u_m - t_u||² + ||(X_b - X_a)·v_m - t_v||² ) where: - u_ic are observed normalized image coordinates for marker i in camera c - π_c(.) is the normalized reprojection model - w_ic are observation weights from detection quality / marker priors / range - rigid-link constraints preserve internal marker geometry of a link - revolute joints keep the projection along the joint axis constant - prismatic joints keep the two orthogonal projection components constant Important design choices ------------------------ - robot.json is used as a kinematic description, not as a direct source of world-space marker positions. - constraint families are explicit, switchable, and easy to compare across versions. - legacy chain-propagation constraints are retained only as an optional family and are OFF by default. - observation weighting remains separate from constraint weighting so both can be tested independently. Dependencies: numpy, opencv-python, scipy (optional for optimization) Example: python 3_multiview_bundle_adjustment_v4.py ^ -det cam1_aruco_detection.json cam2_aruco_detection.json cam3_aruco_detection.json ^ -pose cam1_camera_pose.json cam2_camera_pose.json cam3_camera_pose.json ^ -robot robot.json ^ -lambdaWeight 100.0 """ from __future__ import annotations import argparse import json import os import sys import time from dataclasses import dataclass from itertools import combinations from typing import Any, Dict, List, Optional, Tuple import cv2 import numpy as np # =================================================================== # Path / JSON helpers # =================================================================== def resolve_path(path: str) -> str: path = os.path.expanduser(path) if os.path.isabs(path): return path return os.path.abspath(path) def load_json(path: str) -> Dict[str, Any]: with open(resolve_path(path), "r", encoding="utf-8") as f: return json.load(f) def save_json(path: str, data: Dict[str, Any]) -> None: with open(resolve_path(path), "w", encoding="utf-8") as f: json.dump(data, f, indent=2) # =================================================================== # Units # =================================================================== def get_length_scale(robot_data: Dict[str, Any]) -> float: units = robot_data.get("units", {}) or {} length_unit = str(units.get("length", "")).strip().lower() if length_unit in ("mm", "millimeter", "millimeters"): return 1.0 / 1000.0 if length_unit in ("cm", "centimeter", "centimeters"): return 1.0 / 100.0 return 1.0 # =================================================================== # Small geometry helpers # =================================================================== def safe_norm(v: np.ndarray, eps: float = 1e-12) -> float: return float(np.linalg.norm(v) + eps) def normalize_vector(v: np.ndarray, eps: float = 1e-12) -> np.ndarray: return np.asarray(v, dtype=np.float64) / safe_norm(v, eps) def clamp(v: float, lo: float, hi: float) -> float: return float(max(lo, min(hi, v))) def principal_axis_id(axis: np.ndarray, threshold: float = 0.95) -> Optional[int]: """Return 0,1,2 for x,y,z if axis is close enough to a principal axis.""" a = normalize_vector(np.asarray(axis, dtype=np.float64)) idx = int(np.argmax(np.abs(a))) if abs(a[idx]) >= threshold: return idx return None def camera_center_from_world_to_cam(R_wc: np.ndarray, t_wc: np.ndarray) -> np.ndarray: """world_to_camera: X_cam = R_wc * X_world + t_wc; camera center is -R^T t.""" return -R_wc.T @ t_wc def principal_axis_vector(axis: np.ndarray) -> np.ndarray: """Convert a near-principal axis to an exact signed principal axis vector.""" a = normalize_vector(axis) idx = int(np.argmax(np.abs(a))) out = np.zeros(3, dtype=np.float64) out[idx] = 1.0 if a[idx] >= 0 else -1.0 return normalize_vector(out) def orthonormal_basis_from_axis(axis: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ Build two unit vectors orthogonal to axis, with a deterministic orientation. """ a = normalize_vector(axis) ref = np.array([1.0, 0.0, 0.0], dtype=np.float64) if abs(float(np.dot(a, ref))) > 0.90: ref = np.array([0.0, 1.0, 0.0], dtype=np.float64) u = np.cross(a, ref) if np.linalg.norm(u) < 1e-12: ref = np.array([0.0, 0.0, 1.0], dtype=np.float64) u = np.cross(a, ref) u = normalize_vector(u) v = normalize_vector(np.cross(a, u)) return u, v # =================================================================== # Configuration # =================================================================== @dataclass class ConstraintRuleConfig: rigid_distance_enabled: bool = True rigid_distance_mode: str = "mst" # mst | star | full rigid_distance_weight: float = 1.0 # Revolute joints: keep the projection along the axis constant. revolute_axis_enabled: bool = True revolute_axis_max_pairs: int = 2 revolute_axis_weight: float = 0.5 # Prismatic joints: keep the two orthogonal projection components constant. prismatic_orthogonal_enabled: bool = True prismatic_orthogonal_max_pairs: int = 2 prismatic_orthogonal_weight: float = 0.35 # Legacy / optional chain propagation, disabled by default. chain_axis_enabled: bool = False chain_axis_max_depth: int = 3 chain_axis_max_pairs: int = 2 chain_axis_weight: float = 0.3 axis_alignment_threshold: float = 0.95 strict_unique_marker_ids: bool = False show_skipped_constraints: bool = True enable_observation_weights: bool = True weight_floor: float = 0.30 weight_ceiling: float = 3.00 ref_distance_m: float = 0.75 ref_marker_size_px: float = 50.0 use_detection_confidence: bool = True use_detection_size_px: bool = True use_initial_range: bool = True use_marker_size_prior: bool = True def _bool_or_default(value: Any, default: bool) -> bool: if value is None: return default return bool(value) def _float_or_default(value: Any, default: float) -> float: if value is None: return default return float(value) def _int_or_default(value: Any, default: int) -> int: if value is None: return default return int(value) def load_constraint_rule_config(robot_data: Dict[str, Any], args: argparse.Namespace) -> ConstraintRuleConfig: """ Merge built-in defaults with optional robot.json constraint_rules and CLI flags. Backward compatibility: - joint_axis_projection -> revolute_axis """ rules = robot_data.get("constraint_rules", {}) or {} cfg = ConstraintRuleConfig() rigid = rules.get("rigid_distance", {}) or {} revolute = rules.get("joint_revolute_axis", {}) or rules.get("joint_axis_projection", {}) or {} prismatic = rules.get("joint_prismatic_orthogonal", {}) or {} chain = rules.get("chain_axis_projection", {}) or {} obs = rules.get("observation_weights", {}) or {} cfg.rigid_distance_enabled = _bool_or_default(rigid.get("enabled"), cfg.rigid_distance_enabled) cfg.rigid_distance_mode = str(rigid.get("mode", cfg.rigid_distance_mode)).strip().lower() cfg.rigid_distance_weight = _float_or_default(rigid.get("weight"), cfg.rigid_distance_weight) cfg.revolute_axis_enabled = _bool_or_default(revolute.get("enabled"), cfg.revolute_axis_enabled) cfg.revolute_axis_max_pairs = _int_or_default(revolute.get("max_pairs"), cfg.revolute_axis_max_pairs) cfg.revolute_axis_weight = _float_or_default(revolute.get("weight"), cfg.revolute_axis_weight) cfg.prismatic_orthogonal_enabled = _bool_or_default(prismatic.get("enabled"), cfg.prismatic_orthogonal_enabled) cfg.prismatic_orthogonal_max_pairs = _int_or_default(prismatic.get("max_pairs"), cfg.prismatic_orthogonal_max_pairs) cfg.prismatic_orthogonal_weight = _float_or_default(prismatic.get("weight"), cfg.prismatic_orthogonal_weight) cfg.chain_axis_enabled = _bool_or_default(chain.get("enabled"), cfg.chain_axis_enabled) cfg.chain_axis_max_depth = _int_or_default(chain.get("max_depth"), cfg.chain_axis_max_depth) cfg.chain_axis_max_pairs = _int_or_default(chain.get("max_pairs"), cfg.chain_axis_max_pairs) cfg.chain_axis_weight = _float_or_default(chain.get("weight"), cfg.chain_axis_weight) cfg.axis_alignment_threshold = _float_or_default( rules.get("axis_alignment_threshold"), cfg.axis_alignment_threshold ) cfg.enable_observation_weights = _bool_or_default(obs.get("enabled"), cfg.enable_observation_weights) cfg.weight_floor = _float_or_default(obs.get("weight_floor"), cfg.weight_floor) cfg.weight_ceiling = _float_or_default(obs.get("weight_ceiling"), cfg.weight_ceiling) cfg.ref_distance_m = _float_or_default(obs.get("ref_distance_m"), cfg.ref_distance_m) cfg.ref_marker_size_px = _float_or_default(obs.get("ref_marker_size_px"), cfg.ref_marker_size_px) cfg.use_detection_confidence = _bool_or_default(obs.get("use_detection_confidence"), cfg.use_detection_confidence) cfg.use_detection_size_px = _bool_or_default(obs.get("use_detection_size_px"), cfg.use_detection_size_px) cfg.use_initial_range = _bool_or_default(obs.get("use_initial_range"), cfg.use_initial_range) cfg.use_marker_size_prior = _bool_or_default(obs.get("use_marker_size_prior"), cfg.use_marker_size_prior) if getattr(args, "strictUniqueMarkerIds", False): cfg.strict_unique_marker_ids = True if getattr(args, "showSkippedConstraints", False): cfg.show_skipped_constraints = True if getattr(args, "noShowSkippedConstraints", False): cfg.show_skipped_constraints = False return cfg # =================================================================== # Observation / constraint definitions # =================================================================== @dataclass class Observation: cam_idx: int norm_coords: np.ndarray meta: Dict[str, Any] @dataclass class MarkerDistanceConstraint: marker_id_a: int marker_id_b: int link_name: str target_distance_m: float weight: float = 1.0 enabled: bool = True source: str = "rigid_distance" @dataclass class JointAxisConstraint: marker_id_parent: int marker_id_child: int parent_link: str child_link: str joint_axis: np.ndarray target_delta_along_axis_m: float weight: float = 1.0 enabled: bool = True source: str = "joint_axis_projection" Constraint = MarkerDistanceConstraint | JointAxisConstraint # =================================================================== # Robot parsing # =================================================================== def parse_robot_markers( robot_data: Dict[str, Any], length_scale: float, strict_unique_marker_ids: bool = False ) -> Tuple[Dict[int, str], Dict[str, List[Dict[str, Any]]], List[str], Dict[int, Dict[str, Any]]]: links = robot_data.get("links", {}) or {} marker_to_link: Dict[int, str] = {} link_markers: Dict[str, List[Dict[str, Any]]] = {} issues: List[str] = [] marker_meta: Dict[int, Dict[str, Any]] = {} seen_global: Dict[int, str] = {} for link_name, link_data in links.items(): markers = link_data.get("markers", []) or [] collected: List[Dict[str, Any]] = [] seen_local: set[int] = set() for idx, marker in enumerate(markers): marker_id = int(marker.get("id", -1)) pos = marker.get("position", None) if marker_id < 0 or pos is None or len(pos) != 3: issues.append(f"[WARN] link={link_name}: skipped invalid marker entry at index {idx}") continue if marker_id in seen_local: msg = f"[WARN] duplicate marker id {marker_id} inside link '{link_name}'" if strict_unique_marker_ids: raise ValueError(msg) issues.append(msg + " -> skipped duplicate inside same link") continue if marker_id in seen_global: msg = ( f"[WARN] duplicate marker id {marker_id} appears in link '{link_name}' " f"and already in link '{seen_global[marker_id]}'" ) if strict_unique_marker_ids: raise ValueError(msg) issues.append(msg + " -> skipped duplicate occurrence") continue seen_local.add(marker_id) seen_global[marker_id] = link_name pos_raw = np.array(pos, dtype=np.float64) pos_m = pos_raw * float(length_scale) item = { "id": marker_id, "name": marker.get("name", f"marker_{marker_id}"), "position_raw": pos_raw, "position_m": pos_m, "normal": np.array(marker.get("normal", [0, 0, 1]), dtype=np.float64), "size": marker.get("size", None), "spin": marker.get("spin", None), } collected.append(item) marker_to_link[marker_id] = link_name marker_meta[marker_id] = { "link_name": link_name, "name": item["name"], "position_m": pos_m, "normal": item["normal"], "size": item["size"], "spin": item["spin"], } link_markers[link_name] = collected return marker_to_link, link_markers, issues, marker_meta def get_link_parent_map(robot_data: Dict[str, Any]) -> Dict[str, Optional[str]]: links = robot_data.get("links", {}) or {} return {link_name: (link_data.get("parent", None)) for link_name, link_data in links.items()} def get_joint_info(robot_data: Dict[str, Any], child_link: str) -> Dict[str, Any]: links = robot_data.get("links", {}) or {} return (links.get(child_link, {}) or {}).get("jointToParent", {}) or {} def get_joint_axis(robot_data: Dict[str, Any], child_link: str) -> Optional[np.ndarray]: joint = get_joint_info(robot_data, child_link) axis = joint.get("axis", None) if axis is None: return None axis = np.asarray(axis, dtype=np.float64) if safe_norm(axis) < 1e-12: return None return normalize_vector(axis) def get_vision_marker_size_default(robot_data: Dict[str, Any]) -> float: vision = robot_data.get("vision_config", {}) or {} ms = vision.get("MarkerSize", None) if ms is None: return 0.025 return float(ms) # =================================================================== # Constraint compilation helpers # =================================================================== def get_enabled_link_rule( robot_data: Dict[str, Any], link_name: str, rule_name: str, default_enabled: bool = True ) -> bool: overrides = robot_data.get("constraint_overrides", {}) or {} link_override = overrides.get(link_name, {}) or {} rule_override = link_override.get(rule_name, {}) or {} if "enabled" in rule_override: return bool(rule_override["enabled"]) return default_enabled def select_anchor_marker_ids( markers: List[Dict[str, Any]], axis: Optional[np.ndarray] = None, max_count: int = 2 ) -> List[int]: if not markers: return [] if len(markers) == 1: return [int(markers[0]["id"])] ids = [int(m["id"]) for m in markers] pos = np.stack([np.asarray(m["position_m"], dtype=np.float64) for m in markers], axis=0) selected: List[int] = [] if axis is not None and safe_norm(axis) > 1e-12: a = normalize_vector(axis) proj = pos @ a min_idx = int(np.argmin(proj)) max_idx = int(np.argmax(proj)) selected = [ids[min_idx], ids[max_idx]] else: centroid = np.mean(pos, axis=0) d = np.linalg.norm(pos - centroid, axis=1) min_idx = int(np.argmin(d)) max_idx = int(np.argmax(d)) selected = [ids[min_idx], ids[max_idx]] if len(selected) < max_count: for mid in ids: if mid not in selected: selected.append(mid) if len(selected) >= max_count: break out: List[int] = [] seen: set[int] = set() for mid in selected: if mid not in seen: seen.add(mid) out.append(mid) if len(out) >= max_count: break return out def mst_edges_for_link(markers: List[Dict[str, Any]]) -> List[Tuple[int, int]]: n = len(markers) if n < 2: return [] ids = [int(m["id"]) for m in markers] pos = np.stack([np.asarray(m["position_m"], dtype=np.float64) for m in markers], axis=0) in_tree = np.zeros(n, dtype=bool) in_tree[0] = True edges: List[Tuple[int, int]] = [] dist = np.linalg.norm(pos[:, None, :] - pos[None, :, :], axis=2) for _ in range(n - 1): best = None best_d = float("inf") for i in range(n): if not in_tree[i]: continue for j in range(n): if in_tree[j]: continue d = float(dist[i, j]) if d < best_d: best_d = d best = (i, j) if best is None: break i, j = best in_tree[j] = True edges.append((ids[i], ids[j])) return edges def compile_rigid_distance_constraints( robot_data: Dict[str, Any], link_markers: Dict[str, List[Dict[str, Any]]], cfg: ConstraintRuleConfig ) -> List[MarkerDistanceConstraint]: constraints: List[MarkerDistanceConstraint] = [] for link_name, markers in link_markers.items(): if not get_enabled_link_rule(robot_data, link_name, "rigid_distance", cfg.rigid_distance_enabled): continue if len(markers) < 2: continue mode = cfg.rigid_distance_mode if mode == "full": pairs = [(int(a["id"]), int(b["id"])) for a, b in combinations(markers, 2)] elif mode == "star": anchor_ids = select_anchor_marker_ids(markers, axis=None, max_count=1) anchor_id = anchor_ids[0] pairs = [] for m in markers: mid = int(m["id"]) if mid != anchor_id: pairs.append((anchor_id, mid)) elif mode == "mst": pairs = mst_edges_for_link(markers) else: raise ValueError(f"Unknown rigid_distance_mode='{mode}'. Use mst|star|full.") pos_map = {int(m["id"]): np.asarray(m["position_m"], dtype=np.float64) for m in markers} seen_pairs: set[Tuple[int, int]] = set() for mid_a, mid_b in pairs: if mid_a == mid_b: continue key = tuple(sorted((int(mid_a), int(mid_b)))) if key in seen_pairs: continue seen_pairs.add(key) pos_a = pos_map[mid_a] pos_b = pos_map[mid_b] target = float(np.linalg.norm(pos_b - pos_a)) constraints.append( MarkerDistanceConstraint( marker_id_a=mid_a, marker_id_b=mid_b, link_name=link_name, target_distance_m=target, weight=cfg.rigid_distance_weight, enabled=True, source=f"rigid_distance:{mode}", ) ) return constraints def compile_joint_dof_constraints( robot_data: Dict[str, Any], link_markers: Dict[str, List[Dict[str, Any]]], cfg: ConstraintRuleConfig ) -> List[JointAxisConstraint]: """ Compile local joint constraints from robot.json. Revolute joints: one scalar constraint per anchor pair (projection along the joint axis stays constant) Prismatic joints: two scalar constraints per anchor pair (the orthogonal projections stay constant) Both are emitted as JointAxisConstraint objects so the rest of the optimization pipeline remains unchanged. """ constraints: List[JointAxisConstraint] = [] links = robot_data.get("links", {}) or {} for child_link, child_data in links.items(): parent_link = child_data.get("parent", None) if not parent_link: continue joint_info = child_data.get("jointToParent", {}) or {} joint_type = str(joint_info.get("type", "")).strip().lower() joint_axis = get_joint_axis(robot_data, child_link) if joint_axis is None: continue axis_vec = principal_axis_vector(joint_axis) parent_markers = link_markers.get(parent_link, []) child_markers = link_markers.get(child_link, []) if len(parent_markers) == 0 or len(child_markers) == 0: continue parent_pos = {int(m["id"]): np.asarray(m["position_m"], dtype=np.float64) for m in parent_markers} child_pos = {int(m["id"]): np.asarray(m["position_m"], dtype=np.float64) for m in child_markers} seen: set[Tuple[int, int]] = set() if joint_type == "revolute": if not get_enabled_link_rule( robot_data, child_link, "joint_revolute_axis", cfg.revolute_axis_enabled ): continue max_pairs = max(1, int(cfg.revolute_axis_max_pairs)) parent_anchor_ids = select_anchor_marker_ids(parent_markers, axis=axis_vec, max_count=max_pairs) child_anchor_ids = select_anchor_marker_ids(child_markers, axis=axis_vec, max_count=max_pairs) for mid_p in parent_anchor_ids: for mid_c in child_anchor_ids: if mid_p == mid_c: continue key = (mid_p, mid_c) if key in seen: continue seen.add(key) delta = child_pos[mid_c] - parent_pos[mid_p] target = float(np.dot(delta, axis_vec)) constraints.append( JointAxisConstraint( marker_id_parent=mid_p, marker_id_child=mid_c, parent_link=parent_link, child_link=child_link, joint_axis=axis_vec, target_delta_along_axis_m=target, weight=cfg.revolute_axis_weight, enabled=True, source="revolute_axis_projection", ) ) elif joint_type == "linear": if not get_enabled_link_rule( robot_data, child_link, "joint_prismatic_orthogonal", cfg.prismatic_orthogonal_enabled ): continue max_pairs = max(1, int(cfg.prismatic_orthogonal_max_pairs)) parent_anchor_ids = select_anchor_marker_ids(parent_markers, axis=axis_vec, max_count=max_pairs) child_anchor_ids = select_anchor_marker_ids(child_markers, axis=axis_vec, max_count=max_pairs) basis_u, basis_v = orthonormal_basis_from_axis(axis_vec) for mid_p in parent_anchor_ids: for mid_c in child_anchor_ids: if mid_p == mid_c: continue key = (mid_p, mid_c) if key in seen: continue seen.add(key) delta = child_pos[mid_c] - parent_pos[mid_p] constraints.append( JointAxisConstraint( marker_id_parent=mid_p, marker_id_child=mid_c, parent_link=parent_link, child_link=child_link, joint_axis=basis_u, target_delta_along_axis_m=float(np.dot(delta, basis_u)), weight=cfg.prismatic_orthogonal_weight, enabled=True, source="prismatic_orthogonal_projection:u", ) ) constraints.append( JointAxisConstraint( marker_id_parent=mid_p, marker_id_child=mid_c, parent_link=parent_link, child_link=child_link, joint_axis=basis_v, target_delta_along_axis_m=float(np.dot(delta, basis_v)), weight=cfg.prismatic_orthogonal_weight, enabled=True, source="prismatic_orthogonal_projection:v", ) ) else: continue return constraints def compile_constraints( robot_data: Dict[str, Any], length_scale: float, cfg: ConstraintRuleConfig ) -> Tuple[Dict[int, str], Dict[str, List[Dict[str, Any]]], List[Constraint], List[str], Dict[int, Dict[str, Any]]]: marker_to_link, link_markers, issues, marker_meta = parse_robot_markers( robot_data, length_scale=length_scale, strict_unique_marker_ids=cfg.strict_unique_marker_ids, ) constraints: List[Constraint] = [] constraints.extend(compile_rigid_distance_constraints(robot_data, link_markers, cfg)) constraints.extend(compile_joint_dof_constraints(robot_data, link_markers, cfg)) # Legacy optional family, OFF by default. if cfg.chain_axis_enabled: constraints.extend(compile_chain_axis_constraints(robot_data, link_markers, cfg)) unique_constraints: List[Constraint] = [] seen_keys: set[Tuple[Any, ...]] = set() for c in constraints: if isinstance(c, MarkerDistanceConstraint): key = ( "d", min(c.marker_id_a, c.marker_id_b), max(c.marker_id_a, c.marker_id_b), c.link_name, round(c.target_distance_m, 9), ) else: key = ( "j", c.parent_link, c.child_link, c.marker_id_parent, c.marker_id_child, tuple(np.round(c.joint_axis, 9).tolist()), round(c.target_delta_along_axis_m, 9), ) if key in seen_keys: continue seen_keys.add(key) unique_constraints.append(c) return marker_to_link, link_markers, unique_constraints, issues, marker_meta # =================================================================== # Observation quality / weighting # =================================================================== def _optional_float(meta: Dict[str, Any], keys: List[str]) -> Optional[float]: for k in keys: if k in meta and meta[k] is not None: try: return float(meta[k]) except Exception: pass return None def detection_quality_from_metadata(det_obj: Dict[str, Any], cfg: ConstraintRuleConfig) -> float: q = 1.0 if cfg.use_detection_confidence: conf = _optional_float(det_obj, ["confidence", "score", "quality", "det_confidence"]) if conf is not None: q *= clamp(conf, 0.1, 1.0) if cfg.use_detection_size_px: size_px = _optional_float(det_obj, ["size_px", "marker_size_px", "side_px", "side_length_px"]) if size_px is None and "corners_px" in det_obj and isinstance(det_obj["corners_px"], list): try: corners = np.asarray(det_obj["corners_px"], dtype=np.float64).reshape(-1, 2) if len(corners) >= 4: edges = [] for i in range(len(corners)): p = corners[i] q2 = corners[(i + 1) % len(corners)] edges.append(float(np.linalg.norm(q2 - p))) size_px = float(np.mean(edges)) except Exception: size_px = None if size_px is not None: q *= clamp(size_px / max(cfg.ref_marker_size_px, 1e-6), 0.25, 3.0) sharpness = _optional_float(det_obj, ["sharpness", "corner_sharpness"]) if sharpness is not None: q *= clamp(sharpness / 2500.0, 0.5, 1.5) normal_alignment = _optional_float(det_obj, ["normal_alignment", "view_cosine", "cos_to_camera"]) if normal_alignment is not None: q *= clamp(normal_alignment, 0.3, 1.0) return float(q) def marker_size_prior_factor(marker_meta: Dict[str, Any], default_marker_size_m: float) -> float: size_val = marker_meta.get("size", None) if size_val is None: return 1.0 try: size_val = float(size_val) except Exception: return 1.0 size_m = size_val / 1000.0 if size_val > 1.0 else size_val ref = max(default_marker_size_m, 1e-6) return clamp(size_m / ref, 0.7, 1.3) def compute_observation_weights( marker_observations: Dict[int, List[Observation]], cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]], initial_positions: Dict[int, np.ndarray], marker_meta: Dict[int, Dict[str, Any]], cfg: ConstraintRuleConfig, robot_data: Dict[str, Any] ) -> Dict[Tuple[int, int], float]: weights: Dict[Tuple[int, int], float] = {} default_marker_size_m = get_vision_marker_size_default(robot_data) for marker_id, obs_list in marker_observations.items(): X = initial_positions.get(marker_id, None) size_prior = marker_size_prior_factor(marker_meta.get(marker_id, {}), default_marker_size_m) for obs_idx, obs in enumerate(obs_list): w = 1.0 q = detection_quality_from_metadata(obs.meta, cfg) w *= q if cfg.use_marker_size_prior: w *= size_prior if cfg.use_initial_range and X is not None: _, _, R_wc, t_wc = cameras[obs.cam_idx] C = camera_center_from_world_to_cam(R_wc, t_wc) dist = float(np.linalg.norm(X - C)) if np.isfinite(dist): w *= clamp(cfg.ref_distance_m / max(dist, 1e-6), 0.4, 2.0) weights[(marker_id, obs_idx)] = clamp(w, cfg.weight_floor, cfg.weight_ceiling) return weights # =================================================================== # Multi-view loading # =================================================================== def load_observations_and_poses( detection_files: List[str], pose_files: List[str] ) -> Tuple[ Dict[int, List[Observation]], List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]], List[Dict[str, Any]] ]: if len(detection_files) != len(pose_files): raise ValueError(f"Mismatch: {len(detection_files)} detections vs {len(pose_files)} poses") marker_observations: Dict[int, List[Observation]] = {} cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]] = [] obs_metadata: List[Dict[str, Any]] = [] for cam_idx, (det_file, pose_file) in enumerate(zip(detection_files, pose_files)): det = load_json(det_file) pose_data = load_json(pose_file) cam_section = det.get("camera", {}) or {} K = np.array(cam_section.get("camera_matrix", []), dtype=np.float64).reshape(3, 3) D = np.array(cam_section.get("distortion_coefficients", []), dtype=np.float64).reshape(-1, 1) pose_section = pose_data.get("camera_pose", {}) or {} world_to_cam = pose_section.get("world_to_camera", {}) or {} R_wc = np.array(world_to_cam.get("rotation_matrix", []), dtype=np.float64).reshape(3, 3) t_wc = np.array(world_to_cam.get("translation_m", []), dtype=np.float64).reshape(3) cameras.append((K, D, R_wc, t_wc)) detections = det.get("detections", []) or [] for det_obj in detections: marker_id = int(det_obj.get("marker_id", -1)) if marker_id < 0: continue center_px = np.array(det_obj.get("center_px", []), dtype=np.float64) if center_px.shape != (2,): continue pts = center_px.reshape(1, 1, 2).astype(np.float32) und = cv2.undistortPoints(pts, K.astype(np.float32), D.astype(np.float32), P=None) norm_coords = und.reshape(2).astype(np.float64) obs = Observation(cam_idx=cam_idx, norm_coords=norm_coords, meta=dict(det_obj)) marker_observations.setdefault(marker_id, []).append(obs) obs_metadata.append( { "detection_file": det_file, "pose_file": pose_file, "num_detections": len(detections), } ) return marker_observations, cameras, obs_metadata # =================================================================== # Initial triangulation # =================================================================== def triangulate_marker_initial( marker_id: int, observations: List[Observation], cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]] ) -> Optional[np.ndarray]: if len(observations) < 2: return None best_pair = None best_baseline = -1.0 for obs_i, obs_j in combinations(observations, 2): cam_i, cam_j = obs_i.cam_idx, obs_j.cam_idx _, _, R1, t1 = cameras[cam_i] _, _, R2, t2 = cameras[cam_j] c1 = camera_center_from_world_to_cam(R1, t1) c2 = camera_center_from_world_to_cam(R2, t2) baseline = float(np.linalg.norm(c2 - c1)) if baseline > best_baseline: best_baseline = baseline best_pair = (obs_i, obs_j) if best_pair is None: return None obs_i, obs_j = best_pair cam_i, cam_j = obs_i.cam_idx, obs_j.cam_idx norm_coords_i = obs_i.norm_coords norm_coords_j = obs_j.norm_coords K1, D1, R1, t1 = cameras[cam_i] K2, D2, R2, t2 = cameras[cam_j] x1_px = K1[0, 0] * norm_coords_i[0] + K1[0, 2] y1_px = K1[1, 1] * norm_coords_i[1] + K1[1, 2] x2_px = K2[0, 0] * norm_coords_j[0] + K2[0, 2] y2_px = K2[1, 1] * norm_coords_j[1] + K2[1, 2] P1 = K1 @ np.hstack([R1, t1.reshape(3, 1)]) P2 = K2 @ np.hstack([R2, t2.reshape(3, 1)]) try: X_h = cv2.triangulatePoints( P1, P2, np.array([[x1_px], [y1_px]], dtype=np.float64), np.array([[x2_px], [y2_px]], dtype=np.float64), ) X = (X_h[:3] / X_h[3]).reshape(3).astype(np.float64) if not np.all(np.isfinite(X)): return None return X except Exception: return None def initial_triangulation( marker_observations: Dict[int, List[Observation]], cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]] ) -> Dict[int, np.ndarray]: triangulated: Dict[int, np.ndarray] = {} for marker_id, observations in marker_observations.items(): X = triangulate_marker_initial(marker_id, observations, cameras) if X is not None: triangulated[marker_id] = X return triangulated # =================================================================== # Weighted residuals / optimization # =================================================================== def bundle_adjustment_residuals( marker_positions_flat: np.ndarray, marker_ids: List[int], marker_observations: Dict[int, List[Observation]], cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]], constraints: List[Constraint], obs_weights: Dict[Tuple[int, int], float], lambda_constraint: float = 100.0 ) -> np.ndarray: marker_dict: Dict[int, np.ndarray] = {} for i, marker_id in enumerate(marker_ids): marker_dict[marker_id] = marker_positions_flat[i * 3:(i + 1) * 3] residuals: List[float] = [] for marker_id, observations in marker_observations.items(): if marker_id not in marker_dict: continue X_world = marker_dict[marker_id] for obs_idx, obs in enumerate(observations): cam_idx, norm_coords_obs = obs.cam_idx, obs.norm_coords K, D, R_wc, t_wc = cameras[cam_idx] X_cam = R_wc @ X_world + t_wc if X_cam[2] > 1e-6: proj_norm = X_cam[:2] / X_cam[2] r = proj_norm - norm_coords_obs w = float(np.sqrt(obs_weights.get((marker_id, obs_idx), 1.0))) residuals.append(w * float(r[0])) residuals.append(w * float(r[1])) for constraint in constraints: if isinstance(constraint, MarkerDistanceConstraint): if constraint.marker_id_a in marker_dict and constraint.marker_id_b in marker_dict: pos_a = marker_dict[constraint.marker_id_a] pos_b = marker_dict[constraint.marker_id_b] actual_dist = float(np.linalg.norm(pos_b - pos_a)) residuals.append((actual_dist - constraint.target_distance_m) * constraint.weight * lambda_constraint) elif isinstance(constraint, JointAxisConstraint): if constraint.marker_id_parent in marker_dict and constraint.marker_id_child in marker_dict: pos_parent = marker_dict[constraint.marker_id_parent] pos_child = marker_dict[constraint.marker_id_child] delta = pos_child - pos_parent actual_delta = float(np.dot(delta, constraint.joint_axis)) residuals.append((actual_delta - constraint.target_delta_along_axis_m) * constraint.weight * lambda_constraint) return np.asarray(residuals, dtype=np.float64) def optimize_with_constraints( initial_positions: Dict[int, np.ndarray], marker_observations: Dict[int, List[Observation]], cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]], constraints: List[Constraint], obs_weights: Dict[Tuple[int, int], float], lambda_constraint: float = 100.0, max_iterations: int = 50 ) -> Dict[int, np.ndarray]: try: from scipy.optimize import least_squares except ImportError: print("[WARN] scipy not available, skipping optimization.") return initial_positions marker_ids = sorted(initial_positions.keys()) if not marker_ids: return {} x0 = np.concatenate([initial_positions[mid] for mid in marker_ids]) def residuals_fn(x: np.ndarray) -> np.ndarray: return bundle_adjustment_residuals( x, marker_ids, marker_observations, cameras, constraints, obs_weights, lambda_constraint ) print(f"[INFO] Starting optimization with {len(x0)} variables and {len(constraints)} constraints...") result = least_squares( residuals_fn, x0, max_nfev=max_iterations * max(1, len(marker_ids)), verbose=1, ) optimized = {} for i, marker_id in enumerate(marker_ids): optimized[marker_id] = result.x[i * 3:(i + 1) * 3] print(f"[INFO] Optimization complete. Final cost: {float(np.sum(result.fun ** 2)):.6f}") return optimized # =================================================================== # Reporting helpers # =================================================================== def print_constraint_summary(constraints: List[Constraint]) -> None: num_dist = sum(isinstance(c, MarkerDistanceConstraint) for c in constraints) num_joint = sum(isinstance(c, JointAxisConstraint) for c in constraints) num_other = len(constraints) - num_dist - num_joint extra = f" other={num_other}" if num_other else "" print(f"[INFO] Constraint summary: total={len(constraints)} distance={num_dist} joint/chain={num_joint}{extra}") def print_constraint_list(constraints: List[Constraint]) -> None: print("\n[INFO] Constraint list:") for idx, constraint in enumerate(constraints): if isinstance(constraint, MarkerDistanceConstraint): print( f" [{idx:03d}] DISTANCE | " f"Link='{constraint.link_name}' | " f"M{constraint.marker_id_a} <-> M{constraint.marker_id_b} | " f"Target={constraint.target_distance_m:.6f} m | " f"Weight={constraint.weight} | " f"Source={constraint.source}" ) elif isinstance(constraint, JointAxisConstraint): axis_str = np.array2string(constraint.joint_axis, precision=3, suppress_small=True) print( f" [{idx:03d}] JOINT_AXIS | " f"{constraint.parent_link}(M{constraint.marker_id_parent}) -> " f"{constraint.child_link}(M{constraint.marker_id_child}) | " f"Axis={axis_str} | " f"TargetDelta={constraint.target_delta_along_axis_m:.6f} m | " f"Weight={constraint.weight} | " f"Source={constraint.source}" ) else: print( f" [{idx:03d}] {type(constraint).__name__} | " f"weight={getattr(constraint, 'weight', '?')} | " f"source={getattr(constraint, 'source', '?')}" ) def print_constraints_with_errors( title: str, constraints: List[Constraint], positions: Dict[int, np.ndarray], show_skipped: bool = True ) -> None: print(f"\n[INFO] {title}") active = 0 skipped = 0 for idx, constraint in enumerate(constraints): if isinstance(constraint, MarkerDistanceConstraint): if constraint.marker_id_a not in positions or constraint.marker_id_b not in positions: skipped += 1 if show_skipped: print( f" [{idx:03d}] DISTANCE | " f"M{constraint.marker_id_a} <-> M{constraint.marker_id_b} | SKIPPED (missing marker)" ) continue pos_a = positions[constraint.marker_id_a] pos_b = positions[constraint.marker_id_b] actual = float(np.linalg.norm(pos_b - pos_a)) error = actual - constraint.target_distance_m active += 1 print( f" [{idx:03d}] DISTANCE | " f"Link='{constraint.link_name}' | " f"M{constraint.marker_id_a} <-> M{constraint.marker_id_b} | " f"target={constraint.target_distance_m*1000:.2f} mm | " f"actual={actual*1000:.2f} mm | " f"error={error*1000:+.2f} mm" ) elif isinstance(constraint, JointAxisConstraint): if constraint.marker_id_parent not in positions or constraint.marker_id_child not in positions: skipped += 1 if show_skipped: print( f" [{idx:03d}] JOINT_AXIS | " f"M{constraint.marker_id_parent} -> M{constraint.marker_id_child} | SKIPPED (missing marker)" ) continue pos_parent = positions[constraint.marker_id_parent] pos_child = positions[constraint.marker_id_child] delta = pos_child - pos_parent actual = float(np.dot(delta, constraint.joint_axis)) error = actual - constraint.target_delta_along_axis_m active += 1 axis_str = np.array2string(constraint.joint_axis, precision=2, suppress_small=True) print( f" [{idx:03d}] JOINT_AXIS | " f"{constraint.parent_link}(M{constraint.marker_id_parent}) -> " f"{constraint.child_link}(M{constraint.marker_id_child}) | " f"axis={axis_str} | " f"target={constraint.target_delta_along_axis_m*1000:.2f} mm | " f"actual={actual*1000:.2f} mm | " f"error={error*1000:+.2f} mm" ) print(f"[INFO] Active constraints: {active} | Skipped: {skipped}") def print_observation_weight_summary(obs_weights: Dict[Tuple[int, int], float]) -> None: if not obs_weights: print("[INFO] Observation weighting: disabled or empty") return values = np.array(list(obs_weights.values()), dtype=np.float64) print( "[INFO] Observation weights: " f"min={values.min():.3f} mean={values.mean():.3f} " f"median={np.median(values):.3f} max={values.max():.3f}" ) def serialize_vec3(v: Any) -> List[float]: arr = np.asarray(v, dtype=np.float64).reshape(3) n = np.linalg.norm(arr) if n > 1e-12: arr = arr / n return [float(x) for x in arr] # =================================================================== # Main # =================================================================== def main() -> None: parser = argparse.ArgumentParser( description="Multi-view bundle adjustment with rule-based geometric constraints" ) parser.add_argument( "-det", "--detections", action="append", required=True, help="*_aruco_detection.json files" ) parser.add_argument( "-pose", "--poses", action="append", required=True, help="*_camera_pose.json files" ) parser.add_argument( "-robot", "--robot", required=True, help="robot.json" ) parser.add_argument( "-outDir", "--outDir", default=None, help="Output directory" ) parser.add_argument( "-lambdaWeight", "--lambdaWeight", type=float, default=100.0, help="Constraint weight multiplier" ) parser.add_argument( "--strictUniqueMarkerIds", action="store_true", help="Fail if a marker ID appears more than once in robot.json" ) parser.add_argument( "--showSkippedConstraints", action="store_true", help="Print skipped constraints in the report" ) parser.add_argument( "--noShowSkippedConstraints", action="store_true", help="Hide skipped constraints in the report" ) parser.add_argument( "--saveConstraintReport", action="store_true", help="Save constraint report JSON files" ) parser.add_argument( "--saveObservationWeightReport", action="store_true", help="Save observation-weight report JSON file" ) args = parser.parse_args() if args.showSkippedConstraints and args.noShowSkippedConstraints: print("[ERROR] Choose only one of --showSkippedConstraints or --noShowSkippedConstraints") sys.exit(1) if len(args.detections) != len(args.poses): print(f"[ERROR] Mismatch: {len(args.detections)} detection files vs {len(args.poses)} pose files") sys.exit(1) robot_data = load_json(args.robot) length_scale = get_length_scale(robot_data) cfg = load_constraint_rule_config(robot_data, args) print("[STEP 1] Compile constraints from robot.json structure...") print( "[INFO] Constraint families: " f"rigid_distance={'on' if cfg.rigid_distance_enabled else 'off'}, " f"revolute={'on' if cfg.revolute_axis_enabled else 'off'}, " f"prismatic={'on' if cfg.prismatic_orthogonal_enabled else 'off'}, " f"chain_legacy={'on' if cfg.chain_axis_enabled else 'off'}, " f"observation_weights={'on' if cfg.enable_observation_weights else 'off'}" ) marker_to_link, link_markers, constraints, issues, marker_meta = compile_constraints(robot_data, length_scale, cfg) for issue in issues: print(issue) print(f"[INFO] Links with markers: {sum(1 for v in link_markers.values() if len(v) > 0)}") print(f"[INFO] Unique marker IDs: {len(marker_to_link)}") print_constraint_summary(constraints) print_constraint_list(constraints) print("\n[STEP 2] Load observations and camera poses...") marker_observations, cameras, obs_metadata = load_observations_and_poses(args.detections, args.poses) print(f"[INFO] {len(cameras)} cameras, {len(marker_observations)} observed markers") print(f"[INFO] Detection files loaded: {len(obs_metadata)}") print("\n[STEP 3] Initial triangulation...") initial_pos = initial_triangulation(marker_observations, cameras) print(f"[INFO] Triangulated {len(initial_pos)} markers") out_dir = args.outDir or os.path.dirname(args.detections[0]) or "." os.makedirs(resolve_path(out_dir), exist_ok=True) # camera poses in world (for viewer frusta): centre C = -R^T t, view axis = R[2] cameras_section = [] for idx, (K, D, R_wc, t_wc) in enumerate(cameras): C = -R_wc.T @ t_wc cam_id = str(idx) base = os.path.basename(str(obs_metadata[idx].get("pose_file", ""))) if idx < len(obs_metadata) else "" if base.startswith("render_") and "_camera_pose" in base: cam_id = base[len("render_"):base.index("_camera_pose")] cameras_section.append({ "camera_id": cam_id, "position_m": [float(v) for v in C], "position_mm": [float(v * 1000.0) for v in C], "direction": [float(v) for v in R_wc[2]], }) initial_output_markers = [] for marker_id, position in sorted(initial_pos.items()): normal = marker_meta.get(marker_id, {}).get("normal", None) initial_output_markers.append( { "marker_id": int(marker_id), "position_m": [float(x) for x in position], "position_mm": [float(x * 1000.0) for x in position], "link": marker_to_link.get(marker_id, "unknown"), "normal": serialize_vec3(normal) if normal is not None else None, } ) initial_output = { "schema_version": "1.2", "stage": "initial_triangulation", "created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "summary": { "num_cameras": len(cameras), "num_markers": len(initial_pos), "num_constraints": len(constraints), }, "cameras": cameras_section, "markers": initial_output_markers, } initial_out_file = os.path.join(out_dir, "aruco_positions_initial.json") save_json(initial_out_file, initial_output) print(f"[INFO] Initial triangulation saved to {initial_out_file}") obs_weights = compute_observation_weights( marker_observations=marker_observations, cameras=cameras, initial_positions=initial_pos, marker_meta=marker_meta, cfg=cfg, robot_data=robot_data, ) print_observation_weight_summary(obs_weights) print_constraints_with_errors( "Constraint list BEFORE optimization", constraints, initial_pos, show_skipped=cfg.show_skipped_constraints, ) print("\n[STEP 4] Bundle adjustment with constraints...") optimized_pos = optimize_with_constraints( initial_pos, marker_observations, cameras, constraints, obs_weights, lambda_constraint=args.lambdaWeight, ) print_constraints_with_errors( "Constraint list AFTER optimization", constraints, optimized_pos, show_skipped=cfg.show_skipped_constraints, ) output_markers = [] for marker_id, position in sorted(optimized_pos.items()): normal = marker_meta.get(marker_id, {}).get("normal", None) output_markers.append( { "marker_id": int(marker_id), "position_m": [float(x) for x in position], "position_mm": [float(x * 1000.0) for x in position], "link": marker_to_link.get(marker_id, "unknown"), "normal": serialize_vec3(normal) if normal is not None else None, } ) output = { "schema_version": "1.2", "created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "summary": { "num_cameras": len(cameras), "num_markers": len(optimized_pos), "num_constraints": len(constraints), }, "cameras": cameras_section, "markers": output_markers, } out_file = os.path.join(out_dir, "aruco_positions_optimized.json") save_json(out_file, output) print(f"\n[INFO] Saved to {out_file}") if args.saveConstraintReport: report = { "schema_version": "1.0", "created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "summary": { "num_constraints": len(constraints), "num_links_with_markers": sum(1 for v in link_markers.values() if len(v) > 0), "num_observed_markers": len(marker_observations), "num_triangulated_markers": len(initial_pos), "num_optimized_markers": len(optimized_pos), }, "constraints": [], } for c in constraints: if isinstance(c, MarkerDistanceConstraint): report["constraints"].append( { "kind": "distance", "link_name": c.link_name, "marker_id_a": c.marker_id_a, "marker_id_b": c.marker_id_b, "target_distance_m": c.target_distance_m, "weight": c.weight, "source": c.source, } ) else: report["constraints"].append( { "kind": "joint_axis", "parent_link": c.parent_link, "child_link": c.child_link, "marker_id_parent": c.marker_id_parent, "marker_id_child": c.marker_id_child, "joint_axis": [float(x) for x in c.joint_axis], "target_delta_along_axis_m": c.target_delta_along_axis_m, "weight": c.weight, "source": c.source, } ) report_file = os.path.join(out_dir, "constraint_report.json") save_json(report_file, report) print(f"[INFO] Constraint report saved to {report_file}") if args.saveObservationWeightReport: obs_report = { "schema_version": "1.0", "created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "summary": { "num_weighted_observations": len(obs_weights), }, "observation_weights": [ { "marker_id": int(mid), "observation_index": int(obs_idx), "weight": float(w), } for (mid, obs_idx), w in sorted(obs_weights.items()) ], } obs_file = os.path.join(out_dir, "observation_weight_report.json") save_json(obs_file, obs_report) print(f"[INFO] Observation-weight report saved to {obs_file}") if __name__ == "__main__": main()