#!/usr/bin/env python3 """ 4_robotState_estimation_v5.py Sequential robot-state estimation from optimized 3D ArUco marker positions. Mathematical idea ----------------- This script estimates a robot state q directly from already optimized marker positions p_i^obs (from aruco_positions_optimized*.json). The state is built incrementally along the kinematic chain: 1) estimate the root link pose from observed root-link markers 2) extend the active prefix link-by-link 3) for each newly activated joint, estimate its scalar variable by a geometric rule that matches the robot's degrees of freedom For each stage we keep the already estimated prefix fixed and only add the next link(s) from the chain. This is intentionally *not* a generic global optimizer by default. It is a deterministic geometric initializer that uses the robot structure and marker geometry directly. The stage logic is controlled by robot.json::state_pose_params: - numbers_of_Elements_to_consider_start - numbers_of_Elements_to_consider_final - solver_in_between_geometrical - solver_after_geometrical Geometric rules used here ------------------------- Rigid root / early prefix: - root link pose is estimated from its observed markers with weighted Kabsch Linear joint (slider): - the joint translates its whole descendant subtree along the joint axis - q is estimated as the weighted mean projection of observed minus predicted marker positions onto the world-space joint axis Revolute joint: - the joint rotates its descendant subtree around the joint axis - q is estimated by a coarse-to-fine 1D angular search that minimizes the weighted marker residual over the active prefix Optional solver --------------- If enabled in state_pose_params, a least-squares refinement can be run after a geometric stage or at the end. The geometric estimator remains the default and is the first-class method in this script. Important limitation -------------------- This script works from marker positions only. It does not yet use image-space marker orientation / normal / visibility cues. Those can be added later by feeding in raw detections and camera poses. Dependencies ------------ numpy, scipy (optional for refinement) """ from __future__ import annotations import argparse import copy import json import math import os import sys import time from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple import numpy as np # ============================================================================= # Path / JSON helpers # ============================================================================= def resolve_path(path: str) -> str: path = os.path.expanduser(path) if os.path.isabs(path): return path return os.path.abspath(path) def load_json(path: str) -> Any: with open(resolve_path(path), 'r', encoding='utf-8') as f: return json.load(f) def save_json(path: str, data: Any) -> None: with open(resolve_path(path), 'w', encoding='utf-8') as f: json.dump(data, f, indent=2) # ============================================================================= # Small math helpers # ============================================================================= def safe_norm(v: np.ndarray, eps: float = 1e-12) -> float: return float(np.linalg.norm(v) + eps) def normalize(v: np.ndarray, eps: float = 1e-12) -> np.ndarray: v = np.asarray(v, dtype=np.float64) return v / safe_norm(v, eps) def wrap_angle_rad(a: float) -> float: return (a + math.pi) % (2.0 * math.pi) - math.pi def get_length_scale(robot_data: Dict[str, Any]) -> float: units = robot_data.get('units', {}) or {} length_unit = str(units.get('length', '')).strip().lower() if length_unit in ('mm', 'millimeter', 'millimeters'): return 1.0 / 1000.0 if length_unit in ('cm', 'centimeter', 'centimeters'): return 1.0 / 100.0 return 1.0 def rotation_matrix_xyz(rx: float, ry: float, rz: float, degrees: bool = False) -> np.ndarray: """Rotation order: X then Y then Z.""" if degrees: rx, ry, rz = math.radians(rx), math.radians(ry), math.radians(rz) cx, sx = math.cos(rx), math.sin(rx) cy, sy = math.cos(ry), math.sin(ry) cz, sz = math.cos(rz), math.sin(rz) rx_m = np.array([[1.0, 0.0, 0.0], [0.0, cx, -sx], [0.0, sx, cx]], dtype=np.float64) ry_m = np.array([[cy, 0.0, sy], [0.0, 1.0, 0.0], [-sy, 0.0, cy]], dtype=np.float64) rz_m = np.array([[cz, -sz, 0.0], [sz, cz, 0.0], [0.0, 0.0, 1.0]], dtype=np.float64) return rz_m @ ry_m @ rx_m def axis_angle_rotation(axis: np.ndarray, angle_rad: float) -> np.ndarray: axis = normalize(axis) x, y, z = axis c = math.cos(angle_rad) s = math.sin(angle_rad) C = 1.0 - c return np.array([ [c + x * x * C, x * y * C - z * s, x * z * C + y * s], [y * x * C + z * s, c + y * y * C, y * z * C - x * s], [z * x * C - y * s, z * y * C + x * s, c + z * z * C] ], dtype=np.float64) def make_transform(R: Optional[np.ndarray] = None, t: Optional[np.ndarray] = None) -> np.ndarray: T = np.eye(4, dtype=np.float64) if R is not None: T[:3, :3] = np.asarray(R, dtype=np.float64) if t is not None: T[:3, 3] = np.asarray(t, dtype=np.float64).reshape(3) return T def transform_point(T: np.ndarray, p: np.ndarray) -> np.ndarray: p4 = np.ones(4, dtype=np.float64) p4[:3] = np.asarray(p, dtype=np.float64).reshape(3) return (T @ p4)[:3] def matrix_to_euler_xyz(R: np.ndarray) -> np.ndarray: R = np.asarray(R, dtype=np.float64) sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0]) singular = sy < 1e-9 if not singular: x = math.atan2(R[2, 1], R[2, 2]) y = math.atan2(-R[2, 0], sy) z = math.atan2(R[1, 0], R[0, 0]) else: x = math.atan2(-R[1, 2], R[1, 1]) y = math.atan2(-R[2, 0], sy) z = 0.0 return np.degrees([x, y, z]) def kabsch(P: np.ndarray, Q: np.ndarray, W: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]: """Find R, t such that R @ P + t ≈ Q.""" P = np.asarray(P, dtype=np.float64) Q = np.asarray(Q, dtype=np.float64) assert P.shape == Q.shape and P.shape[1] == 3 if W is None: W = np.ones(len(P), dtype=np.float64) W = np.asarray(W, dtype=np.float64).reshape(-1) W = np.maximum(W, 1e-12) W = W / np.sum(W) p_cent = np.sum(P * W[:, None], axis=0) q_cent = np.sum(Q * W[:, None], axis=0) P0 = P - p_cent Q0 = Q - q_cent H = (P0 * W[:, None]).T @ Q0 U, S, Vt = np.linalg.svd(H) R = Vt.T @ U.T if np.linalg.det(R) < 0: Vt[-1, :] *= -1.0 R = Vt.T @ U.T t = q_cent - R @ p_cent return R, t # ============================================================================= # Data structures # ============================================================================= @dataclass class MarkerInfo: marker_id: int link_name: str local_pos_m: np.ndarray size: Optional[float] = None normal: Optional[np.ndarray] = None name: str = '' @dataclass class LinkInfo: name: str parent: Optional[str] joint_type: Optional[str] joint_axis: Optional[np.ndarray] joint_origin_m: np.ndarray joint_rotation_deg: np.ndarray joint_variable: Optional[str] mount_position_m: np.ndarray mount_rotation_deg: np.ndarray markers: List[MarkerInfo] @dataclass class StageResult: stage_idx: int active_links: List[str] active_joint_vars: List[str] num_markers_used: int mean_error_m: Optional[float] rms_error_m: Optional[float] worst_error_m: Optional[float] method: str optimizer_info: Dict[str, Any] @dataclass class StatePoseParams: numbers_of_elements_to_consider_start: int = 4 numbers_of_elements_to_consider_final: int = 5 solver_in_between_geometrical: bool = False solver_after_geometrical: bool = False geometric_passes_per_stage: int = 2 revolute_search_coarse_deg: float = 5.0 revolute_search_fine_deg: float = 1.0 root_pose_min_markers: int = 3 @dataclass class EstimationConfig: marker_base_weight: float = 1.0 root_pose_prior_weight: float = 0.0 joint_prior_weight: float = 0.0 robust_loss: str = 'soft_l1' max_iterations: int = 120 show_stage_reports: bool = True use_geometric_prefix: bool = True # ============================================================================= # Robot parsing # ============================================================================= def parse_robot(robot_data: Dict[str, Any]) -> Tuple[Dict[str, LinkInfo], Dict[int, MarkerInfo], List[str]]: length_scale = get_length_scale(robot_data) links = robot_data.get('links', {}) or {} link_infos: Dict[str, LinkInfo] = {} marker_by_id: Dict[int, MarkerInfo] = {} issues: List[str] = [] for link_name, link_data in links.items(): parent = link_data.get('parent', None) joint = link_data.get('jointToParent', {}) or {} joint_type = joint.get('type', None) axis = joint.get('axis', None) if axis is not None: axis = np.asarray(axis, dtype=np.float64) if safe_norm(axis) > 1e-12: axis = normalize(axis) else: axis = None joint_origin = np.asarray(joint.get('origin', [0, 0, 0]), dtype=np.float64) * float(length_scale) joint_rotation_deg = np.asarray(joint.get('rotation', [0, 0, 0]), dtype=np.float64) joint_variable = joint.get('variable', None) mount_position = np.asarray(link_data.get('mountPosition', [0, 0, 0]), dtype=np.float64) * float(length_scale) mount_rotation_deg = np.asarray(link_data.get('mountRotation', [0, 0, 0]), dtype=np.float64) markers_data = link_data.get('markers', []) or [] markers: List[MarkerInfo] = [] for idx, m in enumerate(markers_data): marker_id = int(m.get('id', -1)) pos = m.get('position', None) if marker_id < 0 or pos is None or len(pos) != 3: issues.append(f"[WARN] link='{link_name}': skipped invalid marker entry at index {idx}") continue if marker_id in marker_by_id: issues.append( f"[WARN] duplicate marker ID {marker_id} in link '{link_name}' " f"(already in '{marker_by_id[marker_id].link_name}'); using first occurrence" ) continue normal = None if isinstance(m.get('normal', None), list) and len(m['normal']) == 3: normal = np.asarray(m['normal'], dtype=np.float64) marker = MarkerInfo( marker_id=marker_id, link_name=link_name, local_pos_m=np.asarray(pos, dtype=np.float64) * float(length_scale), size=m.get('size', None), normal=normal, name=str(m.get('name', f'marker_{marker_id}')), ) markers.append(marker) marker_by_id[marker_id] = marker link_infos[link_name] = LinkInfo( name=link_name, parent=parent, joint_type=joint_type, joint_axis=axis, joint_origin_m=joint_origin, joint_rotation_deg=joint_rotation_deg, joint_variable=joint_variable, mount_position_m=mount_position, mount_rotation_deg=mount_rotation_deg, markers=markers, ) return link_infos, marker_by_id, issues def topological_links(link_infos: Dict[str, LinkInfo]) -> List[str]: children = {name: [] for name in link_infos} roots = [] for name, info in link_infos.items(): if info.parent is None: roots.append(name) else: children.setdefault(info.parent, []).append(name) order: List[str] = [] queue = list(roots) seen = set() while queue: cur = queue.pop(0) if cur in seen: continue seen.add(cur) order.append(cur) for ch in children.get(cur, []): queue.append(ch) for name in link_infos: if name not in seen: order.append(name) return order def compute_children_map(link_infos: Dict[str, LinkInfo]) -> Dict[str, List[str]]: children: Dict[str, List[str]] = {name: [] for name in link_infos} for name, info in link_infos.items(): if info.parent is not None: children.setdefault(info.parent, []).append(name) return children def compute_link_depths(link_infos: Dict[str, LinkInfo]) -> Dict[str, int]: depths: Dict[str, int] = {} def depth_of(name: str) -> int: if name in depths: return depths[name] info = link_infos[name] if info.parent is None: depths[name] = 0 else: depths[name] = depth_of(info.parent) + 1 return depths[name] for name in link_infos: depth_of(name) return depths # ============================================================================= # State pose params # ============================================================================= def load_state_pose_params(robot_data: Dict[str, Any]) -> StatePoseParams: raw = robot_data.get('state_pose_params', {}) or {} return StatePoseParams( numbers_of_elements_to_consider_start=int(raw.get('numbers_of_Elements_to_consider_start', 4)), numbers_of_elements_to_consider_final=int(raw.get('numbers_of_Elements_to_consider_final', 5)), solver_in_between_geometrical=bool(raw.get('solver_in_between_geometrical', False)), solver_after_geometrical=bool(raw.get('solver_after_geometrical', False)), geometric_passes_per_stage=max(1, int(raw.get('geometric_passes_per_stage', 2))), revolute_search_coarse_deg=float(raw.get('revolute_search_coarse_deg', 5.0)), revolute_search_fine_deg=float(raw.get('revolute_search_fine_deg', 1.0)), root_pose_min_markers=max(1, int(raw.get('root_pose_min_markers', 3))), ) # ============================================================================= # Observations # ============================================================================= def load_observed_markers(optimized_markers_file: str) -> Dict[int, Dict[str, Any]]: data = load_json(optimized_markers_file) if isinstance(data, dict): markers = data.get('markers', []) or [] elif isinstance(data, list): markers = data else: markers = [] observed: Dict[int, Dict[str, Any]] = {} for m in markers: if not isinstance(m, dict): continue if 'marker_id' not in m: continue marker_id = int(m['marker_id']) pos = m.get('position_m', None) if pos is None or len(pos) != 3: continue obs = dict(m) obs['marker_id'] = marker_id obs['position_m'] = np.asarray(pos, dtype=np.float64) observed[marker_id] = obs return observed def marker_weight(marker_info: MarkerInfo, base_weight: float = 1.0, ref_size: float = 25.0) -> float: w = base_weight if marker_info.size is not None: try: size = float(marker_info.size) if size > 0: w *= max(0.35, math.sqrt(size / ref_size)) except Exception: pass return w # ============================================================================= # Forward kinematics # ============================================================================= def link_static_transform(link: LinkInfo) -> np.ndarray: Rm = rotation_matrix_xyz(*link.mount_rotation_deg, degrees=True) return make_transform(Rm, link.mount_position_m) def joint_motion_transform(link: LinkInfo, q_value: float) -> np.ndarray: joint_type = (link.joint_type or '').strip().lower() if link.joint_axis is None: return np.eye(4, dtype=np.float64) axis = normalize(link.joint_axis) if joint_type == 'linear': return make_transform(np.eye(3), axis * float(q_value)) if joint_type == 'revolute': return make_transform(axis_angle_rotation(axis, float(q_value)), np.zeros(3)) return np.eye(4, dtype=np.float64) def world_to_link_transform( link_name: str, link_infos: Dict[str, LinkInfo], state: Dict[str, Any], cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray], joint_override: Optional[Dict[str, float]] = None, ) -> np.ndarray: override_items = tuple(sorted((joint_override or {}).items())) cache_key = (link_name, override_items) if cache_key in cache: return cache[cache_key] link = link_infos[link_name] if link.parent is None: T = make_transform(state['root_R'], state['root_t']) cache[cache_key] = T return T parent_T = world_to_link_transform(link.parent, link_infos, state, cache, joint_override=joint_override) T = parent_T @ make_transform(np.eye(3), link.joint_origin_m) joint_values = state['joint_values'] q = joint_override.get(link.name, joint_values.get(link.joint_variable, 0.0)) if joint_override else joint_values.get(link.joint_variable, 0.0) T = T @ joint_motion_transform(link, q) R_static = rotation_matrix_xyz(*link.joint_rotation_deg, degrees=True) T = T @ make_transform(R_static, np.zeros(3)) T = T @ link_static_transform(link) cache[cache_key] = T return T def predict_marker_positions( link_infos: Dict[str, LinkInfo], state: Dict[str, Any], joint_override: Optional[Dict[str, float]] = None, active_links: Optional[set[str]] = None, ) -> Dict[int, np.ndarray]: pred: Dict[int, np.ndarray] = {} cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {} for link_name, link in link_infos.items(): if active_links is not None and link_name not in active_links: continue T = world_to_link_transform(link_name, link_infos, state, cache, joint_override=joint_override) for marker in link.markers: pred[marker.marker_id] = transform_point(T, marker.local_pos_m) return pred def collect_observations_by_link( observed_markers: Dict[int, Dict[str, Any]], active_links: Optional[set[str]] = None, ) -> Dict[str, List[int]]: by_link: Dict[str, List[int]] = {} for marker_id, obs in observed_markers.items(): link = obs.get('link', None) if link is None or link == 'unknown': continue if active_links is not None and link not in active_links: continue by_link.setdefault(link, []).append(marker_id) return by_link # ============================================================================= # Initial geometric root pose # ============================================================================= def initial_root_pose( root_link: LinkInfo, observed_markers: Dict[int, Dict[str, Any]], min_markers: int = 3, ) -> Tuple[np.ndarray, np.ndarray]: P = [] Q = [] W = [] for marker in root_link.markers: if marker.marker_id in observed_markers: obs = np.asarray(observed_markers[marker.marker_id]['position_m'], dtype=np.float64) P.append(marker.local_pos_m) Q.append(obs) W.append(marker_weight(marker)) if len(P) >= min_markers: return kabsch(np.asarray(P), np.asarray(Q), np.asarray(W)) if len(P) >= 1: # Weak fallback: keep world axes and place root so the first marker matches. marker = root_link.markers[0] obs = np.asarray(observed_markers[marker.marker_id]['position_m'], dtype=np.float64) return np.eye(3, dtype=np.float64), obs - marker.local_pos_m return np.eye(3, dtype=np.float64), np.zeros(3, dtype=np.float64) # ============================================================================= # Subtree helpers # ============================================================================= def subtree_links(link_name: str, children_map: Dict[str, List[str]], active_links: set[str]) -> List[str]: out: List[str] = [] queue = [link_name] while queue: cur = queue.pop(0) if cur not in active_links: continue out.append(cur) for ch in children_map.get(cur, []): if ch in active_links: queue.append(ch) return out def marker_ids_in_links(link_infos: Dict[str, LinkInfo], links: List[str]) -> List[int]: ids: List[int] = [] for link_name in links: for m in link_infos[link_name].markers: ids.append(m.marker_id) return ids # ============================================================================= # Geometric joint estimation # ============================================================================= def current_joint_value(state: Dict[str, Any], link: LinkInfo) -> float: return float(state['joint_values'].get(link.joint_variable, 0.0)) if link.joint_variable else 0.0 def world_joint_axis_for_link( link_name: str, link_infos: Dict[str, LinkInfo], state: Dict[str, Any], ) -> np.ndarray: link = link_infos[link_name] if link.parent is None or link.joint_axis is None: return np.array([1.0, 0.0, 0.0], dtype=np.float64) cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {} parent_T = world_to_link_transform(link.parent, link_infos, state, cache) axis_world = parent_T[:3, :3] @ normalize(link.joint_axis) return normalize(axis_world) def estimate_linear_joint_value( link_name: str, link_infos: Dict[str, LinkInfo], state: Dict[str, Any], observed_markers: Dict[int, Dict[str, Any]], active_links: set[str], children_map: Dict[str, List[str]], ) -> Tuple[float, Dict[str, Any]]: link = link_infos[link_name] if link.joint_variable is None: return 0.0, {'reason': 'no_joint_variable'} subtree = subtree_links(link_name, children_map, active_links) obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers] if not obs_ids: return current_joint_value(state, link), {'reason': 'no_observations'} axis_world = world_joint_axis_for_link(link_name, link_infos, state) # Evaluate current prediction with this joint forced to zero. override = {link_name: 0.0} pred0 = predict_marker_positions(link_infos, state, joint_override=override, active_links=active_links) num = 0.0 den = 0.0 used = 0 per_marker = [] for mid in obs_ids: if mid not in pred0: continue marker = next((m for m in link_infos[observed_markers[mid]['link']].markers if m.marker_id == mid), None) w = marker_weight(marker) if marker is not None else 1.0 p_obs = np.asarray(observed_markers[mid]['position_m'], dtype=np.float64) p0 = pred0[mid] q_i = float(np.dot(axis_world, p_obs - p0)) num += w * q_i den += w used += 1 per_marker.append({'marker_id': mid, 'q_i': q_i, 'weight': w}) if den <= 1e-12: return current_joint_value(state, link), {'reason': 'zero_weight'} q = num / den return float(q), { 'reason': 'weighted_projection', 'used_markers': used, 'axis_world': [float(x) for x in axis_world], 'per_marker': per_marker, } def marker_residual_error_for_state( link_infos: Dict[str, LinkInfo], state: Dict[str, Any], observed_markers: Dict[int, Dict[str, Any]], active_links: Optional[set[str]] = None, ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: pred = predict_marker_positions(link_infos, state, active_links=active_links) marker_reports: List[Dict[str, Any]] = [] errors = [] for marker_id, obs in observed_markers.items(): if marker_id not in pred: continue p_obs = np.asarray(obs['position_m'], dtype=np.float64) p_pred = pred[marker_id] err = p_pred - p_obs err_norm = float(np.linalg.norm(err)) errors.append(err_norm) marker_reports.append({ 'marker_id': int(marker_id), 'link': obs.get('link', 'unknown'), 'observed_position_m': [float(x) for x in p_obs], 'predicted_position_m': [float(x) for x in p_pred], 'error_m': [float(x) for x in err], 'error_norm_m': err_norm, 'error_norm_mm': err_norm * 1000.0, 'marker_size': obs.get('position_mm', None), }) if errors: arr = np.asarray(errors, dtype=np.float64) stats = { 'num_markers_used': int(len(errors)), 'mean_error_m': float(np.mean(arr)), 'median_error_m': float(np.median(arr)), 'rms_error_m': float(np.sqrt(np.mean(arr ** 2))), 'worst_error_m': float(np.max(arr)), 'p80_error_m': float(np.quantile(arr, 0.80)), 'p90_error_m': float(np.quantile(arr, 0.90)), } else: stats = { 'num_markers_used': 0, 'mean_error_m': None, 'median_error_m': None, 'rms_error_m': None, 'worst_error_m': None, 'p80_error_m': None, 'p90_error_m': None, } return marker_reports, stats def revolute_weighted_error( link_name: str, q: float, link_infos: Dict[str, LinkInfo], state: Dict[str, Any], observed_markers: Dict[int, Dict[str, Any]], active_links: set[str], children_map: Dict[str, List[str]], ) -> float: link = link_infos[link_name] if link.joint_variable is None: return float('inf') subtree = subtree_links(link_name, children_map, active_links) obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers] if not obs_ids: return float('inf') override = {link_name: q} pred = predict_marker_positions(link_infos, state, joint_override=override, active_links=active_links) err = 0.0 wsum = 0.0 for mid in obs_ids: if mid not in pred: continue marker = next((m for m in link_infos[observed_markers[mid]['link']].markers if m.marker_id == mid), None) w = marker_weight(marker) if marker is not None else 1.0 d = pred[mid] - np.asarray(observed_markers[mid]['position_m'], dtype=np.float64) err += w * float(np.dot(d, d)) wsum += w if wsum <= 1e-12: return float('inf') return err / wsum def estimate_revolute_joint_value( link_name: str, link_infos: Dict[str, LinkInfo], state: Dict[str, Any], observed_markers: Dict[int, Dict[str, Any]], active_links: set[str], children_map: Dict[str, List[str]], coarse_step_deg: float = 5.0, fine_step_deg: float = 1.0, ) -> Tuple[float, Dict[str, Any]]: link = link_infos[link_name] if link.joint_variable is None: return 0.0, {'reason': 'no_joint_variable'} subtree = subtree_links(link_name, children_map, active_links) obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers] if not obs_ids: return current_joint_value(state, link), {'reason': 'no_observations'} q0 = current_joint_value(state, link) best_q = q0 best_e = float('inf') used = len(obs_ids) # Coarse-to-fine search around the current value, but still wide enough to catch flips. centers = [q0, 0.0] span_list = [math.pi, math.pi / 4.0, math.pi / 16.0] step_list = [math.radians(coarse_step_deg), math.radians(fine_step_deg), math.radians(max(0.25, fine_step_deg / 2.0))] for center in centers: for span, step in zip(span_list, step_list): if step <= 0: continue qs = np.arange(center - span, center + span + 0.5 * step, step) for q in qs: e = revolute_weighted_error(link_name, float(q), link_infos, state, observed_markers, active_links, children_map) if e < best_e: best_e = e best_q = float(q) center = best_q return wrap_angle_rad(best_q), { 'reason': 'coarse_to_fine_scan', 'used_markers': used, 'best_error': best_e, 'q0': q0, 'search_span_rad': math.pi, } # ============================================================================= # Geometric prefix estimation # ============================================================================= def build_state_template(link_infos: Dict[str, LinkInfo]) -> Dict[str, Any]: return { 'root_R': np.eye(3, dtype=np.float64), 'root_t': np.zeros(3, dtype=np.float64), 'joint_values': {li.joint_variable: 0.0 for li in link_infos.values() if li.joint_variable}, } def copy_state(state: Dict[str, Any]) -> Dict[str, Any]: return { 'root_R': np.asarray(state['root_R'], dtype=np.float64).copy(), 'root_t': np.asarray(state['root_t'], dtype=np.float64).copy(), 'joint_values': dict(state.get('joint_values', {})), } def active_observations_for_links( observed_markers: Dict[int, Dict[str, Any]], active_links: set[str], ) -> Dict[int, Dict[str, Any]]: out: Dict[int, Dict[str, Any]] = {} for marker_id, obs in observed_markers.items(): link_name = obs.get('link', None) if link_name in active_links: out[marker_id] = obs return out def optimize_geometric_prefix( link_infos: Dict[str, LinkInfo], observed_markers: Dict[int, Dict[str, Any]], initial_state: Dict[str, Any], active_links: List[str], cfg: StatePoseParams, ) -> Tuple[Dict[str, Any], Dict[str, Any]]: active_set = set(active_links) children_map = compute_children_map(link_infos) stage_obs = active_observations_for_links(observed_markers, active_set) state = copy_state(initial_state) stage_info: Dict[str, Any] = { 'method': 'geometric_prefix', 'active_links': active_links, 'active_observations': len(stage_obs), 'joint_updates': [], } # Root pose is refreshed from the root markers if possible. root_link_name = next((ln for ln in active_links if link_infos[ln].parent is None), None) if root_link_name is not None: root_link = link_infos[root_link_name] root_R, root_t = initial_root_pose(root_link, stage_obs, min_markers=cfg.root_pose_min_markers) state['root_R'] = root_R state['root_t'] = root_t stage_info['root_link'] = root_link_name stage_info['root_pose_source'] = 'kabsch_root_markers' if len([m for m in root_link.markers if m.marker_id in stage_obs]) >= cfg.root_pose_min_markers else 'weak_single_marker_fallback' # Forward geometric passes. active_joint_links = [ln for ln in active_links if link_infos[ln].parent is not None] for pass_idx in range(cfg.geometric_passes_per_stage): pass_updates = [] for link_name in active_joint_links: link = link_infos[link_name] jt = (link.joint_type or '').strip().lower() if jt == 'linear': q_old = current_joint_value(state, link) q_new, info = estimate_linear_joint_value(link_name, link_infos, state, stage_obs, active_set, children_map) state['joint_values'][link.joint_variable] = q_new pass_updates.append({'link': link_name, 'joint_variable': link.joint_variable, 'joint_type': jt, 'old': q_old, 'new': q_new, 'info': info}) elif jt == 'revolute': q_old = current_joint_value(state, link) q_new, info = estimate_revolute_joint_value( link_name, link_infos, state, stage_obs, active_set, children_map, coarse_step_deg=cfg.revolute_search_coarse_deg, fine_step_deg=cfg.revolute_search_fine_deg, ) state['joint_values'][link.joint_variable] = q_new pass_updates.append({'link': link_name, 'joint_variable': link.joint_variable, 'joint_type': jt, 'old': q_old, 'new': q_new, 'info': info}) stage_info[f'pass_{pass_idx+1}_updates'] = pass_updates marker_reports, stats = marker_residual_error_for_state(link_infos, state, stage_obs, active_links=active_set) stage_info['fit_stats'] = stats stage_info['num_markers_used'] = stats['num_markers_used'] return state, stage_info # ============================================================================= # Optional least-squares refinement # ============================================================================= def stage_variables_to_vector(state: Dict[str, Any], active_joint_vars: List[str]) -> np.ndarray: try: from scipy.spatial.transform import Rotation root_rvec = Rotation.from_matrix(state['root_R']).as_rotvec() except Exception: root_rvec = np.zeros(3, dtype=np.float64) root_t = np.asarray(state['root_t'], dtype=np.float64).reshape(3) values = list(root_t) + list(np.asarray(root_rvec, dtype=np.float64)) for var in active_joint_vars: values.append(float(state['joint_values'].get(var, 0.0))) return np.asarray(values, dtype=np.float64) def vector_to_state(x: np.ndarray, template_state: Dict[str, Any], active_joint_vars: List[str]) -> Dict[str, Any]: try: from scipy.spatial.transform import Rotation except Exception: Rotation = None x = np.asarray(x, dtype=np.float64).reshape(-1) out = { 'root_t': x[0:3].copy(), 'root_R': template_state['root_R'].copy(), 'joint_values': dict(template_state['joint_values']), } root_rvec = x[3:6].copy() if Rotation is not None: out['root_R'] = Rotation.from_rotvec(root_rvec).as_matrix() else: angle = safe_norm(root_rvec) out['root_R'] = np.eye(3, dtype=np.float64) if angle < 1e-12 else axis_angle_rotation(root_rvec / angle, angle) idx = 6 for var in active_joint_vars: if idx >= len(x): break out['joint_values'][var] = float(x[idx]) idx += 1 return out def residuals_stage( x: np.ndarray, link_infos: Dict[str, LinkInfo], observed_markers: Dict[int, Dict[str, Any]], template_state: Dict[str, Any], active_joint_vars: List[str], active_links: set[str], cfg: EstimationConfig, ) -> np.ndarray: state = vector_to_state(x, template_state, active_joint_vars) pred = predict_marker_positions(link_infos, state, active_links=active_links) res: List[float] = [] for marker_id, obs in observed_markers.items(): if marker_id not in pred: continue link_name = obs.get('link', None) marker = None if link_name is not None and link_name in link_infos: marker = next((m for m in link_infos[link_name].markers if m.marker_id == marker_id), None) w = marker_weight(marker, base_weight=cfg.marker_base_weight) if marker is not None else cfg.marker_base_weight sqrt_w = math.sqrt(max(1e-12, w)) p_obs = np.asarray(obs['position_m'], dtype=np.float64) p_pred = pred[marker_id] d = (p_pred - p_obs) * sqrt_w res.extend(d.tolist()) if cfg.root_pose_prior_weight > 0: root_t_prior = np.asarray(template_state['root_t'], dtype=np.float64) try: from scipy.spatial.transform import Rotation root_rvec_prior = Rotation.from_matrix(template_state['root_R']).as_rotvec() except Exception: root_rvec_prior = np.zeros(3, dtype=np.float64) w = math.sqrt(max(1e-12, cfg.root_pose_prior_weight)) res.extend(((x[0:3] - root_t_prior) * w).tolist()) res.extend(((x[3:6] - root_rvec_prior) * w).tolist()) if cfg.joint_prior_weight > 0: idx = 6 w = math.sqrt(max(1e-12, cfg.joint_prior_weight)) for var in active_joint_vars: if idx >= len(x): break q = x[idx] q0 = template_state['joint_values'].get(var, 0.0) dq = wrap_angle_rad(float(q - q0)) if abs(q0) <= math.pi * 2.5 else float(q - q0) res.append(dq * w) idx += 1 return np.asarray(res, dtype=np.float64) def optional_solver_refine( link_infos: Dict[str, LinkInfo], observed_markers: Dict[int, Dict[str, Any]], state: Dict[str, Any], active_links: List[str], cfg: EstimationConfig, ) -> Tuple[Dict[str, Any], Dict[str, Any]]: try: from scipy.optimize import least_squares except Exception: return state, {'success': False, 'message': 'scipy unavailable'} active_joint_vars = [link_infos[ln].joint_variable for ln in active_links if link_infos[ln].joint_variable is not None] x0 = stage_variables_to_vector(state, active_joint_vars) active_set = set(active_links) result = least_squares( lambda x: residuals_stage(x, link_infos, observed_markers, state, active_joint_vars, active_set, cfg), x0, loss=cfg.robust_loss, f_scale=1.0, max_nfev=cfg.max_iterations, verbose=1, ) refined = vector_to_state(result.x, state, active_joint_vars) info = { 'success': bool(result.success), 'status': int(result.status), 'message': str(result.message), 'nfev': int(result.nfev), 'cost': float(np.sum(result.fun ** 2)), } return refined, info # ============================================================================= # Sequential estimation driver # ============================================================================= def sequential_geometric_estimation( link_infos: Dict[str, LinkInfo], observed_markers: Dict[int, Dict[str, Any]], params: StatePoseParams, cfg: EstimationConfig, ) -> Tuple[Dict[str, Any], List[StageResult]]: ordered = topological_links(link_infos) if not ordered: return build_state_template(link_infos), [] start_n = max(1, min(int(params.numbers_of_elements_to_consider_start), len(ordered))) final_n = max(start_n, min(int(params.numbers_of_elements_to_consider_final), len(ordered))) state = build_state_template(link_infos) stage_results: List[StageResult] = [] for n in range(start_n, final_n + 1): active_links = ordered[:n] stage_state, stage_info = optimize_geometric_prefix(link_infos, observed_markers, state, active_links, params) state = stage_state if params.solver_in_between_geometrical: state, solver_info = optional_solver_refine(link_infos, observed_markers, state, active_links, cfg) stage_info['solver_in_between'] = solver_info stage_obs = active_observations_for_links(observed_markers, set(active_links)) marker_reports, stats = marker_residual_error_for_state(link_infos, state, stage_obs, active_links=set(active_links)) active_joint_vars = [link_infos[ln].joint_variable for ln in active_links if link_infos[ln].joint_variable is not None] stage_result = StageResult( stage_idx=len(stage_results), active_links=active_links, active_joint_vars=active_joint_vars, num_markers_used=stats['num_markers_used'], mean_error_m=stats['mean_error_m'], rms_error_m=stats['rms_error_m'], worst_error_m=stats['worst_error_m'], method='geometric_prefix' + ('+solver' if params.solver_in_between_geometrical else ''), optimizer_info=stage_info, ) stage_results.append(stage_result) if cfg.show_stage_reports: print( f"[INFO] Stage {stage_result.stage_idx} | links={len(active_links)} | joints={len(active_joint_vars)} | " f"markers={stage_result.num_markers_used} | mean={None if stats['mean_error_m'] is None else stats['mean_error_m']*1000.0:.2f} mm" ) if params.solver_after_geometrical: state, solver_info = optional_solver_refine(link_infos, observed_markers, state, ordered[:final_n], cfg) if stage_results: stage_results[-1].optimizer_info['solver_after_geometrical'] = solver_info return state, stage_results # ============================================================================= # Output # ============================================================================= def state_to_json( robot_data: Dict[str, Any], link_infos: Dict[str, LinkInfo], state: Dict[str, Any], observed_markers: Dict[int, Dict[str, Any]], marker_reports: List[Dict[str, Any]], stats: Dict[str, Any], input_file: str, robot_file: str, stage_results: List[StageResult], params: StatePoseParams, ) -> Dict[str, Any]: movements = {} for link_name in topological_links(link_infos): link = link_infos[link_name] if not link.joint_variable: continue q = float(state['joint_values'].get(link.joint_variable, 0.0)) jt = (link.joint_type or '').strip().lower() entry = { 'joint_type': jt, 'link': link_name, 'estimated': True, } if jt == 'revolute': entry['value_rad'] = q entry['value_deg'] = float(math.degrees(q)) elif jt == 'linear': entry['value_m'] = q entry['value_mm'] = q * 1000.0 else: entry['value'] = q movements[link.joint_variable] = entry link_pose_entries = [] cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {} for link_name in topological_links(link_infos): T = world_to_link_transform(link_name, link_infos, state, cache) R = T[:3, :3] t = T[:3, 3] link = link_infos[link_name] num_used = sum(1 for m in link.markers if m.marker_id in observed_markers) link_pose_entries.append({ 'link': link_name, 'parent': link.parent, 'position_m': [float(x) for x in t], 'rotation_matrix': [[float(v) for v in row] for row in R], 'euler_xyz_deg': [float(x) for x in matrix_to_euler_xyz(R)], 'num_observed_markers': int(num_used), 'num_markers_total': int(len(link.markers)), }) root_link = next((name for name, info in link_infos.items() if info.parent is None), None) root_T = make_transform(state['root_R'], state['root_t']) out = { 'schema_version': '3.0', 'created_utc': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()), 'source': { 'marker_positions_file': input_file, 'robot_file': robot_file, }, 'state_pose_params': { 'numbers_of_Elements_to_consider_start': params.numbers_of_elements_to_consider_start, 'numbers_of_Elements_to_consider_final': params.numbers_of_elements_to_consider_final, 'solver_in_between_geometrical': params.solver_in_between_geometrical, 'solver_after_geometrical': params.solver_after_geometrical, 'geometric_passes_per_stage': params.geometric_passes_per_stage, 'revolute_search_coarse_deg': params.revolute_search_coarse_deg, 'revolute_search_fine_deg': params.revolute_search_fine_deg, 'root_pose_min_markers': params.root_pose_min_markers, }, 'summary': { 'num_links': int(len(link_infos)), 'num_observed_markers': int(len(observed_markers)), 'num_link_markers': int(sum(len(li.markers) for li in link_infos.values())), 'root_link': root_link, 'fit_stats': stats, 'optimizer': state.get('_optimizer', {}), 'stages': [ { 'stage_idx': int(s.stage_idx), 'active_links': s.active_links, 'active_joint_vars': s.active_joint_vars, 'num_markers_used': int(s.num_markers_used), 'mean_error_m': s.mean_error_m, 'rms_error_m': s.rms_error_m, 'worst_error_m': s.worst_error_m, 'method': s.method, 'optimizer_info': s.optimizer_info, } for s in stage_results ], }, 'world_pose': { 'root_translation_m': [float(x) for x in root_T[:3, 3]], 'root_rotation_matrix': [[float(v) for v in row] for row in root_T[:3, :3]], 'root_euler_xyz_deg': [float(x) for x in matrix_to_euler_xyz(root_T[:3, :3])], }, 'movements': movements, 'links': link_pose_entries, 'markers': marker_reports, } return out # ============================================================================= # Main # ============================================================================= def main() -> None: parser = argparse.ArgumentParser( description='Sequential geometric robot-state estimation from optimized ArUco marker positions' ) parser.add_argument('optimized_markers', help='aruco_positions_optimized*.json') parser.add_argument('-robot', '--robot', required=True, help='robot.json') parser.add_argument('-out', '--out', default=None, help='Output robot_state.json path') parser.add_argument('--maxIterations', type=int, default=120, help='Maximum least-squares iterations per optional refinement stage') parser.add_argument('--jointPriorWeight', type=float, default=0.0, help='Prior weight for joint variables during optional solver refinement') parser.add_argument('--rootPosePriorWeight', type=float, default=0.0, help='Prior weight for root pose during optional solver refinement') parser.add_argument('--markerBaseWeight', type=float, default=1.0, help='Base weight for marker residuals') parser.add_argument('--noSequential', action='store_true', help='Disable sequential prefix mode and run one prefix stage only') parser.add_argument('--forceSolverAfter', action='store_true', help='Force solver after the geometric prefix regardless of robot.json') parser.add_argument('--forceSolverBetween', action='store_true', help='Force solver between geometric stages regardless of robot.json') args = parser.parse_args() print('[STEP 1] Load robot.json and optimized marker positions...') robot_data = load_json(args.robot) link_infos, marker_by_id, issues = parse_robot(robot_data) observed_map = load_observed_markers(args.optimized_markers) params = load_state_pose_params(robot_data) if args.noSequential: params.numbers_of_elements_to_consider_start = len(topological_links(link_infos)) params.numbers_of_elements_to_consider_final = len(topological_links(link_infos)) if args.forceSolverBetween: params.solver_in_between_geometrical = True if args.forceSolverAfter: params.solver_after_geometrical = True for issue in issues: print(issue) print(f"[INFO] Links: {len(link_infos)}") print(f"[INFO] Known robot markers: {len(marker_by_id)}") print(f"[INFO] Observed optimized markers: {len(observed_map)}") print(f"[INFO] state_pose_params: start={params.numbers_of_elements_to_consider_start}, final={params.numbers_of_elements_to_consider_final}, between={params.solver_in_between_geometrical}, after={params.solver_after_geometrical}") cfg = EstimationConfig( marker_base_weight=float(args.markerBaseWeight), root_pose_prior_weight=float(args.rootPosePriorWeight), joint_prior_weight=float(args.jointPriorWeight), robust_loss='soft_l1', max_iterations=int(args.maxIterations), show_stage_reports=True, use_geometric_prefix=True, ) print('\n[STEP 2] Geometric sequential pose estimation...') final_state, stage_results = sequential_geometric_estimation( link_infos=link_infos, observed_markers=observed_map, params=params, cfg=cfg, ) print('\n[STEP 3] Build report and save robot_state.json...') marker_reports, stats = marker_residual_error_for_state(link_infos, final_state, observed_map) out_data = state_to_json( robot_data=robot_data, link_infos=link_infos, state=final_state, observed_markers=observed_map, marker_reports=marker_reports, stats=stats, input_file=args.optimized_markers, robot_file=args.robot, stage_results=stage_results, params=params, ) out_path = args.out if out_path is None: out_dir = os.path.dirname(args.optimized_markers) or '.' out_path = os.path.join(out_dir, 'robot_state.json') save_json(out_path, out_data) print(f"[INFO] Saved to {out_path}") if stats['mean_error_m'] is not None: print(f"[INFO] Mean marker error: {stats['mean_error_m'] * 1000.0:.2f} mm") print(f"[INFO] RMS marker error : {stats['rms_error_m'] * 1000.0:.2f} mm") print(f"[INFO] Worst marker error: {stats['worst_error_m'] * 1000.0:.2f} mm") else: print('[INFO] No marker error statistics available.') if __name__ == '__main__': main()