#!/usr/bin/env python3 """ 3_multiview_bundle_adjustment_v2.py Multi-view ArUco marker position optimization with explicit, programmable constraint families. Main ideas: 1) Triangulate initial marker positions from multi-view detections. 2) Compile constraints from robot.json structure using rule-based recognition: - rigid link constraints (within a link) - joint-axis projection constraints (between directly connected links) - chain-axis projection constraints (ancestor/descendant propagation) 3) Optimize marker positions with bundle adjustment: reprojection residuals + constraint residuals. This version is designed to be debuggable and switchable: - each constraint family can be enabled/disabled - robot.json may optionally include constraint_rules / constraint_overrides - duplicate marker IDs can be checked strictly or only warned about Dependencies: numpy, opencv-python, scipy (optional for optimization) Example: python 3_multiview_bundle_adjustment_v2.py ^ -det cam1_aruco_detection.json cam2_aruco_detection.json cam3_aruco_detection.json ^ -pose cam1_camera_pose.json cam2_camera_pose.json cam3_camera_pose.json ^ -robot robot.json ^ -lambdaWeight 100.0 """ from __future__ import annotations import argparse import json import os import sys import time from dataclasses import dataclass from itertools import combinations from typing import Any, Dict, List, Optional, Tuple import cv2 import numpy as np # =================================================================== # Path / JSON helpers # =================================================================== def resolve_path(path: str) -> str: path = os.path.expanduser(path) if os.path.isabs(path): return path return os.path.abspath(path) def load_json(path: str) -> Dict[str, Any]: with open(resolve_path(path), "r", encoding="utf-8") as f: return json.load(f) def save_json(path: str, data: Dict[str, Any]) -> None: with open(resolve_path(path), "w", encoding="utf-8") as f: json.dump(data, f, indent=2) # =================================================================== # Units # =================================================================== def get_length_scale(robot_data: Dict[str, Any]) -> float: units = robot_data.get("units", {}) length_unit = str(units.get("length", "")).strip().lower() if length_unit in ("mm", "millimeter", "millimeters"): return 1.0 / 1000.0 if length_unit in ("cm", "centimeter", "centimeters"): return 1.0 / 100.0 return 1.0 # =================================================================== # Small geometry helpers # =================================================================== def safe_norm(v: np.ndarray, eps: float = 1e-12) -> float: return float(np.linalg.norm(v) + eps) def normalize_vector(v: np.ndarray, eps: float = 1e-12) -> np.ndarray: n = safe_norm(v, eps) return np.asarray(v, dtype=np.float64) / n def principal_axis_id(axis: np.ndarray, threshold: float = 0.95) -> Optional[int]: """ Return 0,1,2 for x,y,z if the axis is close enough to a principal axis. Return None if it is not sufficiently aligned. """ a = normalize_vector(np.asarray(axis, dtype=np.float64)) idx = int(np.argmax(np.abs(a))) if abs(a[idx]) >= threshold: return idx return None def axis_name(axis_id: int) -> str: return ["x", "y", "z"][axis_id] def camera_center_from_world_to_cam(R_wc: np.ndarray, t_wc: np.ndarray) -> np.ndarray: """ world_to_camera: X_cam = R_wc * X_world + t_wc Camera center in world is -R^T t. """ return -R_wc.T @ t_wc # =================================================================== # Constraint rules / config # =================================================================== @dataclass class ConstraintRuleConfig: # Rigid link constraints rigid_distance_enabled: bool = True rigid_distance_mode: str = "mst" # mst | star | full rigid_distance_weight: float = 1.0 # Direct joint constraints (parent <-> child) joint_axis_enabled: bool = True joint_axis_max_pairs: int = 2 # number of anchor markers selected per side joint_axis_weight: float = 0.5 # Propagated ancestor-descendant constraints chain_axis_enabled: bool = True chain_axis_max_depth: int = 3 chain_axis_max_pairs: int = 2 chain_axis_weight: float = 0.3 # Axis recognition threshold axis_alignment_threshold: float = 0.95 # If True, link-marker duplicates in robot.json are fatal. strict_unique_marker_ids: bool = False # Print/skipped diagnostics show_skipped_constraints: bool = True def _bool_or_default(value: Any, default: bool) -> bool: if value is None: return default return bool(value) def _float_or_default(value: Any, default: float) -> float: if value is None: return default return float(value) def _int_or_default(value: Any, default: int) -> int: if value is None: return default return int(value) def load_constraint_rule_config(robot_data: Dict[str, Any], args: argparse.Namespace) -> ConstraintRuleConfig: """ Merge built-in defaults, optional robot.json constraint_rules, and CLI flags. """ cfg = ConstraintRuleConfig() rules = robot_data.get("constraint_rules", {}) or {} rigid = rules.get("rigid_distance", {}) or {} joint = rules.get("joint_axis_projection", {}) or {} chain = rules.get("chain_axis_projection", {}) or {} cfg.rigid_distance_enabled = _bool_or_default(rigid.get("enabled"), cfg.rigid_distance_enabled) cfg.rigid_distance_mode = str(rigid.get("mode", cfg.rigid_distance_mode)).strip().lower() cfg.rigid_distance_weight = _float_or_default(rigid.get("weight"), cfg.rigid_distance_weight) cfg.joint_axis_enabled = _bool_or_default(joint.get("enabled"), cfg.joint_axis_enabled) cfg.joint_axis_max_pairs = _int_or_default(joint.get("max_pairs"), cfg.joint_axis_max_pairs) cfg.joint_axis_weight = _float_or_default(joint.get("weight"), cfg.joint_axis_weight) cfg.chain_axis_enabled = _bool_or_default(chain.get("enabled"), cfg.chain_axis_enabled) cfg.chain_axis_max_depth = _int_or_default(chain.get("max_depth"), cfg.chain_axis_max_depth) cfg.chain_axis_max_pairs = _int_or_default(chain.get("max_pairs"), cfg.chain_axis_max_pairs) cfg.chain_axis_weight = _float_or_default(chain.get("weight"), cfg.chain_axis_weight) cfg.axis_alignment_threshold = _float_or_default( rules.get("axis_alignment_threshold"), cfg.axis_alignment_threshold ) if getattr(args, "strictUniqueMarkerIds", False): cfg.strict_unique_marker_ids = True if getattr(args, "showSkippedConstraints", None) is not None: cfg.show_skipped_constraints = bool(args.showSkippedConstraints) return cfg # =================================================================== # Constraint definitions # =================================================================== @dataclass class MarkerDistanceConstraint: marker_id_a: int marker_id_b: int link_name: str target_distance_m: float weight: float = 1.0 enabled: bool = True source: str = "rigid_distance" @dataclass class JointAxisConstraint: marker_id_parent: int marker_id_child: int parent_link: str child_link: str joint_axis: np.ndarray target_delta_along_axis_m: float weight: float = 1.0 enabled: bool = True source: str = "joint_axis_projection" Constraint = MarkerDistanceConstraint | JointAxisConstraint # =================================================================== # Robot parsing # =================================================================== def parse_robot_markers( robot_data: Dict[str, Any], length_scale: float, strict_unique_marker_ids: bool = False ) -> Tuple[Dict[int, str], Dict[str, List[Dict[str, Any]]], List[str]]: """ Build: marker_to_link: marker_id -> link_name link_markers: link_name -> [{id, position_m, position_raw, ...}, ...] """ links = robot_data.get("links", {}) or {} marker_to_link: Dict[int, str] = {} link_markers: Dict[str, List[Dict[str, Any]]] = {} issues: List[str] = [] seen_global: Dict[int, str] = {} for link_name, link_data in links.items(): markers = link_data.get("markers", []) or [] collected: List[Dict[str, Any]] = [] seen_local: set[int] = set() for idx, marker in enumerate(markers): marker_id = int(marker.get("id", -1)) pos = marker.get("position", None) if marker_id < 0 or pos is None or len(pos) != 3: issues.append(f"[WARN] link={link_name}: skipped invalid marker entry at index {idx}") continue if marker_id in seen_local: msg = f"[WARN] duplicate marker id {marker_id} inside link '{link_name}'" if strict_unique_marker_ids: raise ValueError(msg) issues.append(msg + " -> skipped duplicate inside same link") continue if marker_id in seen_global: msg = ( f"[WARN] duplicate marker id {marker_id} appears in link '{link_name}' " f"and already in link '{seen_global[marker_id]}'" ) if strict_unique_marker_ids: raise ValueError(msg) issues.append(msg + " -> skipped duplicate occurrence") continue seen_local.add(marker_id) seen_global[marker_id] = link_name pos_raw = np.array(pos, dtype=np.float64) pos_m = pos_raw * float(length_scale) collected.append( { "id": marker_id, "name": marker.get("name", f"marker_{marker_id}"), "position_raw": pos_raw, "position_m": pos_m, "normal": np.array(marker.get("normal", [0, 0, 1]), dtype=np.float64), "size": marker.get("size", None), "spin": marker.get("spin", None), } ) marker_to_link[marker_id] = link_name link_markers[link_name] = collected return marker_to_link, link_markers, issues def get_link_parent_map(robot_data: Dict[str, Any]) -> Dict[str, Optional[str]]: links = robot_data.get("links", {}) or {} parent_map: Dict[str, Optional[str]] = {} for link_name, link_data in links.items(): parent_map[link_name] = link_data.get("parent", None) return parent_map def get_joint_info(robot_data: Dict[str, Any], child_link: str) -> Dict[str, Any]: links = robot_data.get("links", {}) or {} return (links.get(child_link, {}) or {}).get("jointToParent", {}) or {} def get_joint_axis(robot_data: Dict[str, Any], child_link: str) -> Optional[np.ndarray]: joint = get_joint_info(robot_data, child_link) axis = joint.get("axis", None) if axis is None: return None axis = np.asarray(axis, dtype=np.float64) if safe_norm(axis) < 1e-12: return None return normalize_vector(axis) # =================================================================== # Constraint compilation helpers # =================================================================== def get_enabled_link_rule( robot_data: Dict[str, Any], link_name: str, rule_name: str, default_enabled: bool = True ) -> bool: """ Optional per-link override structure: "constraint_overrides": { "Arm2": { "rigid_distance": {"enabled": true}, "joint_axis_projection": {"enabled": false} } } """ overrides = robot_data.get("constraint_overrides", {}) or {} link_override = overrides.get(link_name, {}) or {} rule_override = link_override.get(rule_name, {}) or {} if "enabled" in rule_override: return bool(rule_override["enabled"]) return default_enabled def select_anchor_marker_ids( markers: List[Dict[str, Any]], axis: Optional[np.ndarray] = None, max_count: int = 2 ) -> List[int]: """ Deterministic anchor selection: - if axis is provided, choose markers with min/max projection on that axis - otherwise choose marker(s) closest to centroid and farthest from centroid Returns up to max_count ids, deduplicated. """ if not markers: return [] if len(markers) == 1: return [int(markers[0]["id"])] ids = [int(m["id"]) for m in markers] pos = np.stack([np.asarray(m["position_m"], dtype=np.float64) for m in markers], axis=0) selected: List[int] = [] if axis is not None and safe_norm(axis) > 1e-12: a = normalize_vector(axis) proj = pos @ a min_idx = int(np.argmin(proj)) max_idx = int(np.argmax(proj)) selected = [ids[min_idx], ids[max_idx]] else: centroid = np.mean(pos, axis=0) d = np.linalg.norm(pos - centroid, axis=1) min_idx = int(np.argmin(d)) max_idx = int(np.argmax(d)) selected = [ids[min_idx], ids[max_idx]] # Fill if needed if len(selected) < max_count: for mid in ids: if mid not in selected: selected.append(mid) if len(selected) >= max_count: break # Deduplicate while preserving order out: List[int] = [] seen: set[int] = set() for mid in selected: if mid not in seen: seen.add(mid) out.append(mid) if len(out) >= max_count: break return out def mst_edges_for_link(markers: List[Dict[str, Any]]) -> List[Tuple[int, int]]: """ Prim's algorithm on a complete graph of marker positions. Returns n-1 edges connecting all markers with minimal total distance. """ n = len(markers) if n < 2: return [] ids = [int(m["id"]) for m in markers] pos = np.stack([np.asarray(m["position_m"], dtype=np.float64) for m in markers], axis=0) in_tree = np.zeros(n, dtype=bool) in_tree[0] = True edges: List[Tuple[int, int]] = [] # pairwise distances dist = np.linalg.norm(pos[:, None, :] - pos[None, :, :], axis=2) for _ in range(n - 1): best = None best_d = float("inf") for i in range(n): if not in_tree[i]: continue for j in range(n): if in_tree[j]: continue d = float(dist[i, j]) if d < best_d: best_d = d best = (i, j) if best is None: break i, j = best in_tree[j] = True edges.append((ids[i], ids[j])) return edges def compile_rigid_distance_constraints( robot_data: Dict[str, Any], link_markers: Dict[str, List[Dict[str, Any]]], cfg: ConstraintRuleConfig ) -> List[MarkerDistanceConstraint]: constraints: List[MarkerDistanceConstraint] = [] links = robot_data.get("links", {}) or {} for link_name, markers in link_markers.items(): if not get_enabled_link_rule(robot_data, link_name, "rigid_distance", cfg.rigid_distance_enabled): continue if len(markers) < 2: continue mode = cfg.rigid_distance_mode if mode == "full": pairs = [(int(a["id"]), int(b["id"])) for a, b in combinations(markers, 2)] elif mode == "star": anchor_ids = select_anchor_marker_ids(markers, axis=None, max_count=1) anchor_id = anchor_ids[0] pairs = [] for m in markers: mid = int(m["id"]) if mid != anchor_id: pairs.append((anchor_id, mid)) elif mode == "mst": pairs = mst_edges_for_link(markers) else: raise ValueError(f"Unknown rigid_distance_mode='{mode}'. Use mst|star|full.") pos_map = {int(m["id"]): np.asarray(m["position_m"], dtype=np.float64) for m in markers} unique_pairs: List[Tuple[int, int]] = [] seen_pairs: set[Tuple[int, int]] = set() for mid_a, mid_b in pairs: if mid_a == mid_b: continue key = tuple(sorted((int(mid_a), int(mid_b)))) if key in seen_pairs: continue seen_pairs.add(key) unique_pairs.append((int(mid_a), int(mid_b))) for mid_a, mid_b in unique_pairs: pos_a = pos_map[mid_a] pos_b = pos_map[mid_b] target = float(np.linalg.norm(pos_b - pos_a)) constraints.append( MarkerDistanceConstraint( marker_id_a=mid_a, marker_id_b=mid_b, link_name=link_name, target_distance_m=target, weight=cfg.rigid_distance_weight, enabled=True, source=f"rigid_distance:{mode}", ) ) return constraints def compile_direct_joint_axis_constraints( robot_data: Dict[str, Any], link_markers: Dict[str, List[Dict[str, Any]]], cfg: ConstraintRuleConfig ) -> List[JointAxisConstraint]: """ Direct parent-child axis projection constraints. We keep them intentionally sparse: - choose up to 2 anchors per side (min/max along axis) - create cross-product pairs between anchors """ constraints: List[JointAxisConstraint] = [] links = robot_data.get("links", {}) or {} for child_link, child_data in links.items(): parent_link = child_data.get("parent", None) if not parent_link: continue if not get_enabled_link_rule(robot_data, child_link, "joint_axis_projection", cfg.joint_axis_enabled): continue joint_axis = get_joint_axis(robot_data, child_link) if joint_axis is None: continue axis_id = principal_axis_id(joint_axis, threshold=cfg.axis_alignment_threshold) if axis_id is None: # Still allow non-principal axes, but keep them with lower confidence: axis_id = int(np.argmax(np.abs(joint_axis))) axis_vec = np.zeros(3, dtype=np.float64) axis_vec[axis_id] = float(np.sign(joint_axis[axis_id]) if abs(joint_axis[axis_id]) > 1e-12 else 1.0) axis_vec = normalize_vector(axis_vec) parent_markers = link_markers.get(parent_link, []) child_markers = link_markers.get(child_link, []) if len(parent_markers) == 0 or len(child_markers) == 0: continue max_pairs = max(1, int(cfg.joint_axis_max_pairs)) parent_anchor_ids = select_anchor_marker_ids(parent_markers, axis=axis_vec, max_count=max_pairs) child_anchor_ids = select_anchor_marker_ids(child_markers, axis=axis_vec, max_count=max_pairs) parent_pos = {int(m["id"]): np.asarray(m["position_m"], dtype=np.float64) for m in parent_markers} child_pos = {int(m["id"]): np.asarray(m["position_m"], dtype=np.float64) for m in child_markers} seen: set[Tuple[int, int]] = set() for mid_p in parent_anchor_ids: for mid_c in child_anchor_ids: if mid_p == mid_c: continue key = (mid_p, mid_c) if key in seen: continue seen.add(key) delta = child_pos[mid_c] - parent_pos[mid_p] target = float(np.dot(delta, axis_vec)) constraints.append( JointAxisConstraint( marker_id_parent=mid_p, marker_id_child=mid_c, parent_link=parent_link, child_link=child_link, joint_axis=axis_vec, target_delta_along_axis_m=target, weight=cfg.joint_axis_weight, enabled=True, source="joint_axis_projection", ) ) return constraints def ancestors_of(link_name: str, parent_map: Dict[str, Optional[str]], max_depth: int) -> List[str]: out = [] cur = parent_map.get(link_name, None) depth = 0 while cur is not None and depth < max_depth: out.append(cur) cur = parent_map.get(cur, None) depth += 1 return out def path_axes_consistent( robot_data: Dict[str, Any], ancestor: str, descendant: str, parent_map: Dict[str, Optional[str]], threshold: float ) -> Optional[np.ndarray]: """ Check whether the chain from ancestor to descendant is consistent with a single principal axis. Returns the axis vector (unit, sign-preserving) if yes. """ # Build path descendant -> ancestor chain: List[str] = [] cur = descendant while cur is not None and cur != ancestor: chain.append(cur) cur = parent_map.get(cur, None) if cur != ancestor: return None # not an ancestor chain.reverse() # from ancestor child-step to descendant # Collect axes on each joint in the path axes: List[np.ndarray] = [] for link in chain: ax = get_joint_axis(robot_data, link) if ax is None: return None axes.append(ax) if not axes: return None axis_ids: List[int] = [] signs: List[float] = [] for ax in axes: aid = principal_axis_id(ax, threshold=threshold) if aid is None: return None axis_ids.append(aid) signs.append(float(np.sign(ax[aid])) if abs(ax[aid]) > 1e-12 else 1.0) if len(set(axis_ids)) != 1: return None axis_id = axis_ids[0] sign = 1.0 if np.sum(signs) >= 0 else -1.0 axis_vec = np.zeros(3, dtype=np.float64) axis_vec[axis_id] = sign return normalize_vector(axis_vec) def compile_chain_axis_constraints( robot_data: Dict[str, Any], link_markers: Dict[str, List[Dict[str, Any]]], cfg: ConstraintRuleConfig ) -> List[JointAxisConstraint]: """ Ancestor-descendant projection constraints for chains that preserve a stable principal axis over multiple joints. Example: Board -> Base -> Arm1 -> Ellbow with all joints x-aligned: add Board<->Arm1, Board<->Ellbow, Base<->Ellbow, ... """ constraints: List[JointAxisConstraint] = [] parent_map = get_link_parent_map(robot_data) links_with_markers = [ln for ln, mk in link_markers.items() if len(mk) > 0] max_depth = max(1, int(cfg.chain_axis_max_depth)) max_pairs = max(1, int(cfg.chain_axis_max_pairs)) # Cache positions for quick lookup pos_cache: Dict[int, np.ndarray] = {} for ln, mk in link_markers.items(): for m in mk: pos_cache[int(m["id"])] = np.asarray(m["position_m"], dtype=np.float64) for descendant in links_with_markers: if not get_enabled_link_rule(robot_data, descendant, "chain_axis_projection", cfg.chain_axis_enabled): continue for ancestor in ancestors_of(descendant, parent_map, max_depth=max_depth): if ancestor == descendant: continue axis_vec = path_axes_consistent( robot_data=robot_data, ancestor=ancestor, descendant=descendant, parent_map=parent_map, threshold=cfg.axis_alignment_threshold, ) if axis_vec is None: continue ancestor_markers = link_markers.get(ancestor, []) descendant_markers = link_markers.get(descendant, []) if len(ancestor_markers) == 0 or len(descendant_markers) == 0: continue anc_anchors = select_anchor_marker_ids(ancestor_markers, axis=axis_vec, max_count=max_pairs) des_anchors = select_anchor_marker_ids(descendant_markers, axis=axis_vec, max_count=max_pairs) seen: set[Tuple[int, int]] = set() for mid_a in anc_anchors: for mid_b in des_anchors: if mid_a == mid_b: continue key = (mid_a, mid_b) if key in seen: continue seen.add(key) target = float(np.dot(pos_cache[mid_b] - pos_cache[mid_a], axis_vec)) constraints.append( JointAxisConstraint( marker_id_parent=mid_a, marker_id_child=mid_b, parent_link=ancestor, child_link=descendant, joint_axis=axis_vec, target_delta_along_axis_m=target, weight=cfg.chain_axis_weight, enabled=True, source=f"chain_axis_projection:depth{len(ancestors_of(descendant, parent_map, max_depth))}", ) ) return constraints def compile_constraints( robot_data: Dict[str, Any], length_scale: float, cfg: ConstraintRuleConfig ) -> Tuple[Dict[int, str], Dict[str, List[Dict[str, Any]]], List[Constraint], List[str]]: """ Full rule-based constraint compiler. Returns: marker_to_link link_markers constraints issues """ marker_to_link, link_markers, issues = parse_robot_markers( robot_data, length_scale=length_scale, strict_unique_marker_ids=cfg.strict_unique_marker_ids, ) constraints: List[Constraint] = [] constraints.extend(compile_rigid_distance_constraints(robot_data, link_markers, cfg)) constraints.extend(compile_direct_joint_axis_constraints(robot_data, link_markers, cfg)) constraints.extend(compile_chain_axis_constraints(robot_data, link_markers, cfg)) # De-duplicate identical constraints unique_constraints: List[Constraint] = [] seen_keys: set[Tuple[Any, ...]] = set() for c in constraints: if isinstance(c, MarkerDistanceConstraint): key = ("d", min(c.marker_id_a, c.marker_id_b), max(c.marker_id_a, c.marker_id_b), c.link_name, round(c.target_distance_m, 9)) else: key = ( "j", c.parent_link, c.child_link, c.marker_id_parent, c.marker_id_child, tuple(np.round(c.joint_axis, 9).tolist()), round(c.target_delta_along_axis_m, 9), ) if key in seen_keys: continue seen_keys.add(key) unique_constraints.append(c) return marker_to_link, link_markers, unique_constraints, issues # =================================================================== # Constraint reporting # =================================================================== def print_constraint_summary(constraints: List[Constraint]) -> None: num_dist = sum(isinstance(c, MarkerDistanceConstraint) for c in constraints) num_joint = sum(isinstance(c, JointAxisConstraint) for c in constraints) print(f"[INFO] Constraint summary: total={len(constraints)} distance={num_dist} joint/chain={num_joint}") def print_constraint_list(constraints: List[Constraint]) -> None: print("\n[INFO] Constraint list:") for idx, constraint in enumerate(constraints): if isinstance(constraint, MarkerDistanceConstraint): print( f" [{idx:03d}] DISTANCE | " f"Link='{constraint.link_name}' | " f"M{constraint.marker_id_a} <-> M{constraint.marker_id_b} | " f"Target={constraint.target_distance_m:.6f} m | " f"Weight={constraint.weight} | " f"Source={constraint.source}" ) else: axis_str = np.array2string(constraint.joint_axis, precision=3, suppress_small=True) print( f" [{idx:03d}] JOINT_AXIS | " f"{constraint.parent_link}(M{constraint.marker_id_parent}) -> " f"{constraint.child_link}(M{constraint.marker_id_child}) | " f"Axis={axis_str} | " f"TargetDelta={constraint.target_delta_along_axis_m:.6f} m | " f"Weight={constraint.weight} | " f"Source={constraint.source}" ) def print_constraints_with_errors( title: str, constraints: List[Constraint], positions: Dict[int, np.ndarray], show_skipped: bool = True ) -> None: print(f"\n[INFO] {title}") active = 0 skipped = 0 for idx, constraint in enumerate(constraints): if isinstance(constraint, MarkerDistanceConstraint): if constraint.marker_id_a not in positions or constraint.marker_id_b not in positions: skipped += 1 if show_skipped: print( f" [{idx:03d}] DISTANCE | " f"M{constraint.marker_id_a} <-> M{constraint.marker_id_b} | SKIPPED (missing marker)" ) continue pos_a = positions[constraint.marker_id_a] pos_b = positions[constraint.marker_id_b] actual = float(np.linalg.norm(pos_b - pos_a)) error = actual - constraint.target_distance_m active += 1 print( f" [{idx:03d}] DISTANCE | " f"Link='{constraint.link_name}' | " f"M{constraint.marker_id_a} <-> M{constraint.marker_id_b} | " f"target={constraint.target_distance_m*1000:.2f} mm | " f"actual={actual*1000:.2f} mm | " f"error={error*1000:+.2f} mm" ) elif isinstance(constraint, JointAxisConstraint): if constraint.marker_id_parent not in positions or constraint.marker_id_child not in positions: skipped += 1 if show_skipped: print( f" [{idx:03d}] JOINT_AXIS | " f"M{constraint.marker_id_parent} -> M{constraint.marker_id_child} | SKIPPED (missing marker)" ) continue pos_parent = positions[constraint.marker_id_parent] pos_child = positions[constraint.marker_id_child] delta = pos_child - pos_parent actual = float(np.dot(delta, constraint.joint_axis)) error = actual - constraint.target_delta_along_axis_m active += 1 axis_str = np.array2string(constraint.joint_axis, precision=2, suppress_small=True) print( f" [{idx:03d}] JOINT_AXIS | " f"{constraint.parent_link}(M{constraint.marker_id_parent}) -> " f"{constraint.child_link}(M{constraint.marker_id_child}) | " f"axis={axis_str} | " f"target={constraint.target_delta_along_axis_m*1000:.2f} mm | " f"actual={actual*1000:.2f} mm | " f"error={error*1000:+.2f} mm" ) print(f"[INFO] Active constraints: {active} | Skipped: {skipped}") # =================================================================== # Multi-view loading # =================================================================== def load_observations_and_poses( detection_files: List[str], pose_files: List[str] ) -> Tuple[Dict[int, List[Tuple[int, np.ndarray]]], List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]], List[Dict[str, Any]]]: """Load detections and camera poses.""" if len(detection_files) != len(pose_files): raise ValueError(f"Mismatch: {len(detection_files)} detections vs {len(pose_files)} poses") marker_observations: Dict[int, List[Tuple[int, np.ndarray]]] = {} cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]] = [] obs_metadata: List[Dict[str, Any]] = [] for cam_idx, (det_file, pose_file) in enumerate(zip(detection_files, pose_files)): det = load_json(det_file) pose_data = load_json(pose_file) cam_section = det.get("camera", {}) or {} K = np.array(cam_section.get("camera_matrix", []), dtype=np.float64).reshape(3, 3) D = np.array(cam_section.get("distortion_coefficients", []), dtype=np.float64).reshape(-1, 1) pose_section = pose_data.get("camera_pose", {}) or {} world_to_cam = pose_section.get("world_to_camera", {}) or {} R_wc = np.array(world_to_cam.get("rotation_matrix", []), dtype=np.float64).reshape(3, 3) t_wc = np.array(world_to_cam.get("translation_m", []), dtype=np.float64).reshape(3) cameras.append((K, D, R_wc, t_wc)) detections = det.get("detections", []) or [] for det_obj in detections: marker_id = int(det_obj.get("marker_id", -1)) if marker_id < 0: continue center_px = np.array(det_obj.get("center_px", []), dtype=np.float64) if center_px.shape != (2,): continue pts = center_px.reshape(1, 1, 2).astype(np.float32) und = cv2.undistortPoints(pts, K.astype(np.float32), D.astype(np.float32), P=None) norm_coords = und.reshape(2).astype(np.float64) marker_observations.setdefault(marker_id, []).append((cam_idx, norm_coords)) obs_metadata.append( { "detection_file": det_file, "pose_file": pose_file, "num_detections": len(detections), } ) return marker_observations, cameras, obs_metadata # =================================================================== # Initial triangulation # =================================================================== def triangulate_marker_initial( marker_id: int, observations: List[Tuple[int, np.ndarray]], cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]] ) -> Optional[np.ndarray]: """ Triangulate using the best baseline pair among all available observations. This is more stable than using just the first two views. """ if len(observations) < 2: return None best_pair = None best_baseline = -1.0 for (cam_i, _), (cam_j, _) in combinations(observations, 2): K1, D1, R1, t1 = cameras[cam_i] K2, D2, R2, t2 = cameras[cam_j] c1 = camera_center_from_world_to_cam(R1, t1) c2 = camera_center_from_world_to_cam(R2, t2) baseline = float(np.linalg.norm(c2 - c1)) if baseline > best_baseline: best_baseline = baseline best_pair = (cam_i, cam_j) if best_pair is None: return None cam_i, cam_j = best_pair norm_coords_i = None norm_coords_j = None for ci, coords in observations: if ci == cam_i: norm_coords_i = coords elif ci == cam_j: norm_coords_j = coords if norm_coords_i is None or norm_coords_j is None: return None K1, D1, R1, t1 = cameras[cam_i] K2, D2, R2, t2 = cameras[cam_j] x1_px = K1[0, 0] * norm_coords_i[0] + K1[0, 2] y1_px = K1[1, 1] * norm_coords_i[1] + K1[1, 2] x2_px = K2[0, 0] * norm_coords_j[0] + K2[0, 2] y2_px = K2[1, 1] * norm_coords_j[1] + K2[1, 2] P1 = K1 @ np.hstack([R1, t1.reshape(3, 1)]) P2 = K2 @ np.hstack([R2, t2.reshape(3, 1)]) try: X_h = cv2.triangulatePoints( P1, P2, np.array([[x1_px], [y1_px]], dtype=np.float64), np.array([[x2_px], [y2_px]], dtype=np.float64), ) X = (X_h[:3] / X_h[3]).reshape(3).astype(np.float64) if not np.all(np.isfinite(X)): return None return X except Exception: return None def initial_triangulation( marker_observations: Dict[int, List[Tuple[int, np.ndarray]]], cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]] ) -> Dict[int, np.ndarray]: triangulated: Dict[int, np.ndarray] = {} for marker_id, observations in marker_observations.items(): X = triangulate_marker_initial(marker_id, observations, cameras) if X is not None: triangulated[marker_id] = X return triangulated # =================================================================== # Residuals / optimization # =================================================================== def bundle_adjustment_residuals( marker_positions_flat: np.ndarray, marker_ids: List[int], marker_observations: Dict[int, List[Tuple[int, np.ndarray]]], cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]], constraints: List[Constraint], lambda_constraint: float = 100.0 ) -> np.ndarray: marker_dict: Dict[int, np.ndarray] = {} for i, marker_id in enumerate(marker_ids): marker_dict[marker_id] = marker_positions_flat[i * 3:(i + 1) * 3] residuals: List[float] = [] # Reprojection residuals in normalized coordinates for marker_id, observations in marker_observations.items(): if marker_id not in marker_dict: continue X_world = marker_dict[marker_id] for cam_idx, norm_coords_obs in observations: K, D, R_wc, t_wc = cameras[cam_idx] X_cam = R_wc @ X_world + t_wc if X_cam[2] > 1e-6: proj_norm = X_cam[:2] / X_cam[2] r = proj_norm - norm_coords_obs residuals.append(float(r[0])) residuals.append(float(r[1])) # Constraint residuals for constraint in constraints: if isinstance(constraint, MarkerDistanceConstraint): if constraint.marker_id_a in marker_dict and constraint.marker_id_b in marker_dict: pos_a = marker_dict[constraint.marker_id_a] pos_b = marker_dict[constraint.marker_id_b] actual_dist = float(np.linalg.norm(pos_b - pos_a)) residuals.append( (actual_dist - constraint.target_distance_m) * constraint.weight * lambda_constraint ) elif isinstance(constraint, JointAxisConstraint): if constraint.marker_id_parent in marker_dict and constraint.marker_id_child in marker_dict: pos_parent = marker_dict[constraint.marker_id_parent] pos_child = marker_dict[constraint.marker_id_child] delta = pos_child - pos_parent actual_delta = float(np.dot(delta, constraint.joint_axis)) residuals.append( (actual_delta - constraint.target_delta_along_axis_m) * constraint.weight * lambda_constraint ) return np.asarray(residuals, dtype=np.float64) def optimize_with_constraints( initial_positions: Dict[int, np.ndarray], marker_observations: Dict[int, List[Tuple[int, np.ndarray]]], cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]], constraints: List[Constraint], lambda_constraint: float = 100.0, max_iterations: int = 50 ) -> Dict[int, np.ndarray]: try: from scipy.optimize import least_squares except ImportError: print("[WARN] scipy not available, skipping optimization.") return initial_positions marker_ids = sorted(initial_positions.keys()) if not marker_ids: return {} x0 = np.concatenate([initial_positions[mid] for mid in marker_ids]) def residuals_fn(x: np.ndarray) -> np.ndarray: return bundle_adjustment_residuals( x, marker_ids, marker_observations, cameras, constraints, lambda_constraint ) print(f"[INFO] Starting optimization with {len(x0)} variables and {len(constraints)} constraints...") result = least_squares( residuals_fn, x0, max_nfev=max_iterations * max(1, len(marker_ids)), verbose=1, ) optimized = {} for i, marker_id in enumerate(marker_ids): optimized[marker_id] = result.x[i * 3:(i + 1) * 3] print(f"[INFO] Optimization complete. Final cost: {float(np.sum(result.fun ** 2)):.6f}") return optimized # =================================================================== # Main # =================================================================== def main() -> None: parser = argparse.ArgumentParser( description="Multi-view bundle adjustment with rule-based geometric constraints" ) parser.add_argument( "-det", "--detections", action="append", required=True, help="*_aruco_detection.json files" ) parser.add_argument( "-pose", "--poses", action="append", required=True, help="*_camera_pose.json files" ) parser.add_argument( "-robot", "--robot", required=True, help="robot.json" ) parser.add_argument( "-outDir", "--outDir", default=None, help="Output directory" ) parser.add_argument( "-lambdaWeight", "--lambdaWeight", type=float, default=100.0, help="Constraint weight multiplier" ) parser.add_argument( "--strictUniqueMarkerIds", action="store_true", help="Fail if a marker ID appears more than once in robot.json" ) parser.add_argument( "--showSkippedConstraints", action="store_true", help="Print skipped constraints in the report" ) parser.add_argument( "--noShowSkippedConstraints", action="store_true", help="Hide skipped constraints in the report" ) parser.add_argument( "--saveConstraintReport", action="store_true", help="Save constraint report JSON files" ) args = parser.parse_args() if args.showSkippedConstraints and args.noShowSkippedConstraints: print("[ERROR] Choose only one of --showSkippedConstraints or --noShowSkippedConstraints") sys.exit(1) if len(args.detections) != len(args.poses): print(f"[ERROR] Mismatch: {len(args.detections)} detection files vs {len(args.poses)} pose files") sys.exit(1) robot_data = load_json(args.robot) length_scale = get_length_scale(robot_data) cfg = load_constraint_rule_config(robot_data, args) if args.noShowSkippedConstraints: cfg.show_skipped_constraints = False elif args.showSkippedConstraints: cfg.show_skipped_constraints = True print("[STEP 1] Compile constraints from robot.json structure...") marker_to_link, link_markers, constraints, issues = compile_constraints(robot_data, length_scale, cfg) for issue in issues: print(issue) print(f"[INFO] Links with markers: {sum(1 for v in link_markers.values() if len(v) > 0)}") print(f"[INFO] Unique marker IDs: {len(marker_to_link)}") print_constraint_summary(constraints) print_constraint_list(constraints) print("\n[STEP 2] Load observations and camera poses...") marker_observations, cameras, obs_metadata = load_observations_and_poses(args.detections, args.poses) print(f"[INFO] {len(cameras)} cameras, {len(marker_observations)} observed markers") print(f"[INFO] Detection files loaded: {len(obs_metadata)}") print("\n[STEP 3] Initial triangulation...") initial_pos = initial_triangulation(marker_observations, cameras) print(f"[INFO] Triangulated {len(initial_pos)} markers") out_dir = args.outDir or os.path.dirname(args.detections[0]) or "." os.makedirs(resolve_path(out_dir), exist_ok=True) # Save initial solution initial_output_markers = [] for marker_id, position in sorted(initial_pos.items()): initial_output_markers.append( { "marker_id": int(marker_id), "position_m": [float(x) for x in position], "position_mm": [float(x * 1000.0) for x in position], "link": marker_to_link.get(marker_id, "unknown"), } ) initial_output = { "schema_version": "1.1", "stage": "initial_triangulation", "created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "summary": { "num_cameras": len(cameras), "num_markers": len(initial_pos), "num_constraints": len(constraints), }, "markers": initial_output_markers, } initial_out_file = os.path.join(out_dir, "aruco_positions_initial.json") save_json(initial_out_file, initial_output) print(f"[INFO] Initial triangulation saved to {initial_out_file}") print_constraints_with_errors( "Constraint list BEFORE optimization", constraints, initial_pos, show_skipped=cfg.show_skipped_constraints, ) print("\n[STEP 4] Bundle adjustment with constraints...") optimized_pos = optimize_with_constraints( initial_pos, marker_observations, cameras, constraints, lambda_constraint=args.lambdaWeight, ) print_constraints_with_errors( "Constraint list AFTER optimization", constraints, optimized_pos, show_skipped=cfg.show_skipped_constraints, ) output_markers = [] for marker_id, position in sorted(optimized_pos.items()): output_markers.append( { "marker_id": int(marker_id), "position_m": [float(x) for x in position], "position_mm": [float(x * 1000.0) for x in position], "link": marker_to_link.get(marker_id, "unknown"), } ) output = { "schema_version": "1.1", "created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "summary": { "num_cameras": len(cameras), "num_markers": len(optimized_pos), "num_constraints": len(constraints), }, "markers": output_markers, } out_file = os.path.join(out_dir, "aruco_positions_optimized.json") save_json(out_file, output) print(f"\n[INFO] Saved to {out_file}") if args.saveConstraintReport: report = { "schema_version": "1.0", "created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "summary": { "num_constraints": len(constraints), "num_links_with_markers": sum(1 for v in link_markers.values() if len(v) > 0), "num_observed_markers": len(marker_observations), "num_triangulated_markers": len(initial_pos), "num_optimized_markers": len(optimized_pos), }, "constraints": [], } for c in constraints: if isinstance(c, MarkerDistanceConstraint): report["constraints"].append( { "kind": "distance", "link_name": c.link_name, "marker_id_a": c.marker_id_a, "marker_id_b": c.marker_id_b, "target_distance_m": c.target_distance_m, "weight": c.weight, "source": c.source, } ) else: report["constraints"].append( { "kind": "joint_axis", "parent_link": c.parent_link, "child_link": c.child_link, "marker_id_parent": c.marker_id_parent, "marker_id_child": c.marker_id_child, "joint_axis": [float(x) for x in c.joint_axis], "target_delta_along_axis_m": c.target_delta_along_axis_m, "weight": c.weight, "source": c.source, } ) report_file = os.path.join(out_dir, "constraint_report.json") save_json(report_file, report) print(f"[INFO] Constraint report saved to {report_file}") if __name__ == "__main__": main()