#!/usr/bin/env python3 """ 4_robotState_estimation_v6.py Deterministic geometric robot-state estimation from optimized 3D ArUco marker positions. Mathematical idea ----------------- This script does NOT run a generic minimizer by default. It estimates the robot state q with closed-form geometric rules and sequential prefix expansion. Input: aruco_positions_optimized*.json robot.json Core geometric steps -------------------- 1) Root pose estimation The root link pose (Board) is estimated from its observed markers using a weighted Kabsch fit: R_root, t_root = argmin || R P_i + t - Q_i ||^2 with weights driven by marker size. 2) Linear joints / sliders For a prismatic joint along axis a, the joint variable q is estimated by weighted averaging of the projection residuals: q = mean_i w_i * dot(a_world, p_obs_i - p_pred_i(q=0)) / mean_i w_i This is a closed-form geometry update, not a numeric optimizer. 3) Revolute joints For a revolute joint around axis a, the joint angle is estimated by projecting the marker vectors into the plane orthogonal to a and solving a 2D weighted rotation alignment: theta = atan2( sum_i w_i * cross2(u_i, v_i), sum_i w_i * dot2(u_i, v_i) ) where u_i are predicted vectors (q=0) and v_i are observed vectors. To handle 180° flip ambiguities, the script optionally compares theta and theta + pi and uses a weak normal-based tie-break. Marker normals from robot.json are used as a geometric hint, not as hard image evidence. 4) Sequential prefix expansion The chain is estimated link-by-link from the root outwards. Each stage only activates the first N links (controlled by robot.json::state_pose_params), re-estimates the active prefix, and keeps the previous prefix as the start of the next stage. This is intentionally designed as a fast, deterministic geometric initializer for later refinement, not as a full bundle-adjustment solver. Notes ----- - The script uses the robot hierarchy, joint axes, joint origins, and marker local positions from robot.json. - No least-squares minimizer is used in the default path. - Solver-related flags may be present in robot.json for compatibility but are not used here. Dependencies ------------ numpy only """ from __future__ import annotations import argparse import json import math import os import sys import time from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple import numpy as np # ============================================================================= # Path / JSON helpers # ============================================================================= def resolve_path(path: str) -> str: path = os.path.expanduser(path) if os.path.isabs(path): return path return os.path.abspath(path) def load_json(path: str) -> Any: with open(resolve_path(path), "r", encoding="utf-8") as f: return json.load(f) def save_json(path: str, data: Any) -> None: with open(resolve_path(path), "w", encoding="utf-8") as f: json.dump(data, f, indent=2) # ============================================================================= # Small geometry helpers # ============================================================================= def safe_norm(v: np.ndarray, eps: float = 1e-12) -> float: return float(np.linalg.norm(v) + eps) def normalize(v: np.ndarray, eps: float = 1e-12) -> np.ndarray: v = np.asarray(v, dtype=np.float64) return v / safe_norm(v, eps) def wrap_angle_rad(a: float) -> float: return (a + math.pi) % (2.0 * math.pi) - math.pi def get_length_scale(robot_data: Dict[str, Any]) -> float: units = robot_data.get("units", {}) or {} length_unit = str(units.get("length", "")).strip().lower() if length_unit in ("mm", "millimeter", "millimeters"): return 1.0 / 1000.0 if length_unit in ("cm", "centimeter", "centimeters"): return 1.0 / 100.0 return 1.0 def rotation_matrix_xyz(rx: float, ry: float, rz: float, degrees: bool = False) -> np.ndarray: """Rotation order: X then Y then Z.""" if degrees: rx, ry, rz = math.radians(rx), math.radians(ry), math.radians(rz) cx, sx = math.cos(rx), math.sin(rx) cy, sy = math.cos(ry), math.sin(ry) cz, sz = math.cos(rz), math.sin(rz) rx_m = np.array([[1.0, 0.0, 0.0], [0.0, cx, -sx], [0.0, sx, cx]], dtype=np.float64) ry_m = np.array([[cy, 0.0, sy], [0.0, 1.0, 0.0], [-sy, 0.0, cy]], dtype=np.float64) rz_m = np.array([[cz, -sz, 0.0], [sz, cz, 0.0], [0.0, 0.0, 1.0]], dtype=np.float64) return rz_m @ ry_m @ rx_m def axis_angle_rotation(axis: np.ndarray, angle_rad: float) -> np.ndarray: axis = normalize(axis) x, y, z = axis c = math.cos(angle_rad) s = math.sin(angle_rad) C = 1.0 - c return np.array([ [c + x * x * C, x * y * C - z * s, x * z * C + y * s], [y * x * C + z * s, c + y * y * C, y * z * C - x * s], [z * x * C - y * s, z * y * C + x * s, c + z * z * C], ], dtype=np.float64) def make_transform(R: Optional[np.ndarray] = None, t: Optional[np.ndarray] = None) -> np.ndarray: T = np.eye(4, dtype=np.float64) if R is not None: T[:3, :3] = np.asarray(R, dtype=np.float64) if t is not None: T[:3, 3] = np.asarray(t, dtype=np.float64).reshape(3) return T def transform_point(T: np.ndarray, p: np.ndarray) -> np.ndarray: p4 = np.ones(4, dtype=np.float64) p4[:3] = np.asarray(p, dtype=np.float64).reshape(3) return (T @ p4)[:3] def transform_vector(T: np.ndarray, v: np.ndarray) -> np.ndarray: return T[:3, :3] @ np.asarray(v, dtype=np.float64).reshape(3) def kabsch(P: np.ndarray, Q: np.ndarray, W: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]: """Weighted Kabsch fit: find R, t so that R @ P + t ≈ Q.""" P = np.asarray(P, dtype=np.float64) Q = np.asarray(Q, dtype=np.float64) assert P.shape == Q.shape and P.shape[1] == 3 if W is None: W = np.ones(len(P), dtype=np.float64) W = np.asarray(W, dtype=np.float64).reshape(-1) W = np.maximum(W, 1e-12) W = W / np.sum(W) p_cent = np.sum(P * W[:, None], axis=0) q_cent = np.sum(Q * W[:, None], axis=0) P0 = P - p_cent Q0 = Q - q_cent H = (P0 * W[:, None]).T @ Q0 U, S, Vt = np.linalg.svd(H) R = Vt.T @ U.T if np.linalg.det(R) < 0: Vt[-1, :] *= -1.0 R = Vt.T @ U.T t = q_cent - R @ p_cent return R, t def principal_axis_id(axis: np.ndarray, threshold: float = 0.95) -> Optional[int]: a = normalize(np.asarray(axis, dtype=np.float64)) idx = int(np.argmax(np.abs(a))) if abs(a[idx]) >= threshold: return idx return None def axis_unit_from_id(axis_id: int, sign: float = 1.0) -> np.ndarray: v = np.zeros(3, dtype=np.float64) v[axis_id] = 1.0 if sign >= 0 else -1.0 return v def plane_basis_for_axis(axis: np.ndarray) -> Tuple[np.ndarray, np.ndarray, int]: """ Return a 2D basis (e1, e2) spanning the plane orthogonal to axis. Also return the principal axis index if axis is close to x/y/z. """ a = normalize(axis) axis_id = int(np.argmax(np.abs(a))) if axis_id == 0: # x -> yz-plane e1 = np.array([0.0, 1.0, 0.0], dtype=np.float64) e2 = np.array([0.0, 0.0, 1.0], dtype=np.float64) elif axis_id == 1: # y -> zx-plane e1 = np.array([0.0, 0.0, 1.0], dtype=np.float64) e2 = np.array([1.0, 0.0, 0.0], dtype=np.float64) else: # z -> xy-plane e1 = np.array([1.0, 0.0, 0.0], dtype=np.float64) e2 = np.array([0.0, 1.0, 0.0], dtype=np.float64) return e1, e2, axis_id def project_to_plane(v: np.ndarray, axis: np.ndarray) -> np.ndarray: axis = normalize(axis) return np.asarray(v, dtype=np.float64) - np.dot(v, axis) * axis def weighted_mean(values: List[float], weights: List[float]) -> float: if not values or not weights: return 0.0 w = np.asarray(weights, dtype=np.float64) v = np.asarray(values, dtype=np.float64) s = float(np.sum(w)) if s <= 1e-12: return float(np.mean(v)) return float(np.sum(v * w) / s) def marker_quality_weight(marker: "MarkerInfo", ref_size: float = 25.0) -> float: w = 1.0 if marker.size is not None: try: size = float(marker.size) if size > 0: w *= max(0.35, math.sqrt(size / ref_size)) except Exception: pass return float(w) def normal_flip_bias(marker: "MarkerInfo", world_up: np.ndarray = np.array([0.0, 0.0, 1.0], dtype=np.float64)) -> float: """ Weak bias for top/bottom flip discrimination. Only markers with a strong ±z local normal contribute. """ if marker.local_normal is None: return 0.0 n = normalize(marker.local_normal) if abs(n[2]) < 0.5: return 0.0 pref = 1.0 if n[2] >= 0 else -1.0 # Positive score means "prefer world_up alignment" for top markers, # negative score means "prefer world_down alignment" for bottom markers. return pref # ============================================================================= # Robot structures # ============================================================================= @dataclass class MarkerInfo: marker_id: int name: str local_pos_m: np.ndarray local_normal: np.ndarray size: Optional[float] spin: Optional[float] link_name: str @dataclass class LinkInfo: name: str parent: Optional[str] joint_type: str joint_axis: Optional[np.ndarray] joint_variable: Optional[str] joint_origin_m: np.ndarray joint_rotation_deg: np.ndarray mount_position_m: np.ndarray mount_rotation_deg: np.ndarray markers: List[MarkerInfo] @dataclass class StatePoseParams: numbers_of_elements_to_consider_start: int = 4 numbers_of_elements_to_consider_final: int = 5 geometric_passes_per_stage: int = 2 root_pose_min_markers: int = 2 use_marker_normals_flip_tiebreak: bool = True normal_flip_weight: float = 0.05 def load_state_pose_params(robot_data: Dict[str, Any]) -> StatePoseParams: raw = robot_data.get("state_pose_params", {}) or {} return StatePoseParams( numbers_of_elements_to_consider_start=max(1, int(raw.get("numbers_of_Elements_to_consider_start", 4))), numbers_of_elements_to_consider_final=max(1, int(raw.get("numbers_of_Elements_to_consider_final", 5))), geometric_passes_per_stage=max(1, int(raw.get("geometric_passes_per_stage", 2))), root_pose_min_markers=max(1, int(raw.get("root_pose_min_markers", 2))), use_marker_normals_flip_tiebreak=bool(raw.get("use_marker_normals_flip_tiebreak", True)), normal_flip_weight=float(raw.get("normal_flip_weight", 0.05)), ) def parse_robot(robot_data: Dict[str, Any], length_scale: float) -> Tuple[Dict[str, LinkInfo], Dict[int, str], List[str]]: links = robot_data.get("links", {}) or {} link_infos: Dict[str, LinkInfo] = {} marker_by_id: Dict[int, str] = {} issues: List[str] = [] for link_name, link_data in links.items(): parent = link_data.get("parent", None) joint = link_data.get("jointToParent", {}) or {} joint_type = str(joint.get("type", "")).strip().lower() axis = joint.get("axis", None) joint_axis = None if axis is not None: axis_arr = np.asarray(axis, dtype=np.float64) if safe_norm(axis_arr) > 1e-12: joint_axis = normalize(axis_arr) joint_variable = joint.get("variable", None) joint_origin_m = np.asarray(joint.get("origin", [0, 0, 0]), dtype=np.float64) * float(length_scale) joint_rotation_deg = np.asarray(joint.get("rotation", [0, 0, 0]), dtype=np.float64) mount_position_m = np.asarray(link_data.get("mountPosition", [0, 0, 0]), dtype=np.float64) * float(length_scale) mount_rotation_deg = np.asarray(link_data.get("mountRotation", [0, 0, 0]), dtype=np.float64) markers: List[MarkerInfo] = [] seen_local: set[int] = set() for idx, marker in enumerate(link_data.get("markers", []) or []): marker_id = int(marker.get("id", -1)) pos = marker.get("position", None) if marker_id < 0 or pos is None or len(pos) != 3: issues.append(f"[WARN] link={link_name}: skipped invalid marker entry at index {idx}") continue if marker_id in seen_local: issues.append(f"[WARN] link={link_name}: duplicate marker id {marker_id} inside same link -> skipped") continue if marker_id in marker_by_id: issues.append( f"[WARN] marker id {marker_id} already assigned to link '{marker_by_id[marker_id]}', " f"duplicate in link '{link_name}' -> skipped" ) continue seen_local.add(marker_id) marker_by_id[marker_id] = link_name markers.append( MarkerInfo( marker_id=marker_id, name=str(marker.get("name", f"aruco_{marker_id}")), local_pos_m=np.asarray(pos, dtype=np.float64) * float(length_scale), local_normal=normalize(np.asarray(marker.get("normal", [0, 0, 1]), dtype=np.float64)), size=marker.get("size", None), spin=marker.get("spin", None), link_name=link_name, ) ) link_infos[link_name] = LinkInfo( name=link_name, parent=parent, joint_type=joint_type, joint_axis=joint_axis, joint_variable=joint_variable, joint_origin_m=joint_origin_m, joint_rotation_deg=joint_rotation_deg, mount_position_m=mount_position_m, mount_rotation_deg=mount_rotation_deg, markers=markers, ) return link_infos, marker_by_id, issues def topological_links(link_infos: Dict[str, LinkInfo]) -> List[str]: roots = [ln for ln, li in link_infos.items() if li.parent is None] if not roots: return [] root = roots[0] children: Dict[str, List[str]] = {ln: [] for ln in link_infos} for ln, li in link_infos.items(): if li.parent is not None and li.parent in children: children[li.parent].append(ln) order: List[str] = [] queue = [root] while queue: cur = queue.pop(0) if cur in order: continue order.append(cur) for ch in children.get(cur, []): queue.append(ch) # Append disconnected subtrees if any. for ln in link_infos: if ln not in order: order.append(ln) return order def compute_children_map(link_infos: Dict[str, LinkInfo]) -> Dict[str, List[str]]: children: Dict[str, List[str]] = {ln: [] for ln in link_infos} for ln, li in link_infos.items(): if li.parent is not None and li.parent in children: children[li.parent].append(ln) return children def subtree_links(link_name: str, children_map: Dict[str, List[str]], active_links: set[str]) -> List[str]: out: List[str] = [] queue = [link_name] while queue: cur = queue.pop(0) if cur not in active_links: continue out.append(cur) for ch in children_map.get(cur, []): if ch in active_links: queue.append(ch) return out def marker_ids_in_links(link_infos: Dict[str, LinkInfo], links: List[str]) -> List[int]: ids: List[int] = [] for link_name in links: for m in link_infos[link_name].markers: ids.append(m.marker_id) return ids # ============================================================================= # Observations # ============================================================================= def load_observed_markers(optimized_markers_file: str) -> Dict[int, Dict[str, Any]]: data = load_json(optimized_markers_file) if isinstance(data, dict): markers = data.get("markers", []) or [] elif isinstance(data, list): markers = data else: markers = [] observed: Dict[int, Dict[str, Any]] = {} for m in markers: if not isinstance(m, dict) or "marker_id" not in m: continue marker_id = int(m["marker_id"]) pos = m.get("position_m", None) if pos is None or len(pos) != 3: continue obs = dict(m) obs["marker_id"] = marker_id obs["position_m"] = np.asarray(pos, dtype=np.float64) observed[marker_id] = obs return observed def active_observations_for_links( observed_markers: Dict[int, Dict[str, Any]], link_infos: Dict[str, LinkInfo], active_links: set[str], ) -> Dict[int, Dict[str, Any]]: out: Dict[int, Dict[str, Any]] = {} for marker_id, obs in observed_markers.items(): link = obs.get("link", None) if link is None or link == "unknown": continue if link not in active_links: continue if link not in link_infos: continue out[marker_id] = obs return out # ============================================================================= # Forward kinematics # ============================================================================= def link_static_transform(link: LinkInfo) -> np.ndarray: Rm = rotation_matrix_xyz(*link.mount_rotation_deg, degrees=True) return make_transform(Rm, link.mount_position_m) def joint_motion_transform(link: LinkInfo, q_value: float) -> np.ndarray: joint_type = (link.joint_type or "").strip().lower() if link.joint_axis is None: return np.eye(4, dtype=np.float64) axis = normalize(link.joint_axis) if joint_type == "linear": return make_transform(np.eye(3), axis * float(q_value)) if joint_type == "revolute": return make_transform(axis_angle_rotation(axis, float(q_value)), np.zeros(3)) return np.eye(4, dtype=np.float64) def world_to_link_transform( link_name: str, link_infos: Dict[str, LinkInfo], state: Dict[str, Any], cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray], joint_override: Optional[Dict[str, float]] = None, ) -> np.ndarray: override_items = tuple(sorted((joint_override or {}).items())) cache_key = (link_name, override_items) if cache_key in cache: return cache[cache_key] link = link_infos[link_name] if link.parent is None: T = make_transform(state["root_R"], state["root_t"]) cache[cache_key] = T return T parent_T = world_to_link_transform(link.parent, link_infos, state, cache, joint_override=joint_override) # Parent -> joint origin T = parent_T @ make_transform(np.eye(3), link.joint_origin_m) # Joint motion joint_values = state["joint_values"] q = joint_override.get(link.name, joint_values.get(link.joint_variable, 0.0)) if joint_override else joint_values.get(link.joint_variable, 0.0) T = T @ joint_motion_transform(link, q) # Static rotation after the joint R_static = rotation_matrix_xyz(*link.joint_rotation_deg, degrees=True) T = T @ make_transform(R_static, np.zeros(3)) # Mount transform / link-local offset T = T @ link_static_transform(link) cache[cache_key] = T return T def predict_marker_positions( link_infos: Dict[str, LinkInfo], state: Dict[str, Any], joint_override: Optional[Dict[str, float]] = None, active_links: Optional[set[str]] = None, ) -> Dict[int, np.ndarray]: pred: Dict[int, np.ndarray] = {} cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {} for link_name, link in link_infos.items(): if active_links is not None and link_name not in active_links: continue T = world_to_link_transform(link_name, link_infos, state, cache, joint_override=joint_override) for marker in link.markers: pred[marker.marker_id] = transform_point(T, marker.local_pos_m) return pred def world_joint_axis_for_link( link_name: str, link_infos: Dict[str, LinkInfo], state: Dict[str, Any], ) -> np.ndarray: link = link_infos[link_name] if link.parent is None or link.joint_axis is None: return np.array([1.0, 0.0, 0.0], dtype=np.float64) cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {} parent_T = world_to_link_transform(link.parent, link_infos, state, cache) axis_world = transform_vector(parent_T, link.joint_axis) return normalize(axis_world) def world_joint_origin_for_link( link_name: str, link_infos: Dict[str, LinkInfo], state: Dict[str, Any], ) -> np.ndarray: link = link_infos[link_name] if link.parent is None: return np.asarray(state["root_t"], dtype=np.float64).copy() cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {} parent_T = world_to_link_transform(link.parent, link_infos, state, cache) return transform_point(parent_T, link.joint_origin_m) # ============================================================================= # Geometric estimation # ============================================================================= def initial_root_pose( root_link: LinkInfo, observed_markers: Dict[int, Dict[str, Any]], min_markers: int = 2, ) -> Tuple[np.ndarray, np.ndarray, Dict[str, Any]]: P = [] Q = [] W = [] matched = [] for marker in root_link.markers: if marker.marker_id in observed_markers: obs = np.asarray(observed_markers[marker.marker_id]["position_m"], dtype=np.float64) P.append(marker.local_pos_m) Q.append(obs) W.append(marker_quality_weight(marker)) matched.append(marker.marker_id) if len(P) >= min_markers: R, t = kabsch(np.asarray(P), np.asarray(Q), np.asarray(W)) return R, t, {"reason": "kabsch", "used_markers": matched} if len(P) >= 1: marker = root_link.markers[0] obs = np.asarray(observed_markers[marker.marker_id]["position_m"], dtype=np.float64) return np.eye(3, dtype=np.float64), obs - marker.local_pos_m, {"reason": "single_marker_fallback", "used_markers": matched} return np.eye(3, dtype=np.float64), np.zeros(3, dtype=np.float64), {"reason": "no_root_markers"} def estimate_linear_joint_value( link_name: str, link_infos: Dict[str, LinkInfo], state: Dict[str, Any], observed_markers: Dict[int, Dict[str, Any]], active_links: set[str], children_map: Dict[str, List[str]], ) -> Tuple[float, Dict[str, Any]]: link = link_infos[link_name] if link.joint_variable is None: return 0.0, {"reason": "no_joint_variable"} subtree = subtree_links(link_name, children_map, active_links) obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers] if not obs_ids: return float(state["joint_values"].get(link.joint_variable, 0.0)), {"reason": "no_observations"} axis_world = world_joint_axis_for_link(link_name, link_infos, state) pred0 = predict_marker_positions(link_infos, state, joint_override={link_name: 0.0}, active_links=active_links) num = 0.0 den = 0.0 per_marker = [] for mid in obs_ids: if mid not in pred0: continue marker = next((m for m in link_infos[observed_markers[mid]["link"]].markers if m.marker_id == mid), None) w = marker_quality_weight(marker) if marker is not None else 1.0 p_obs = np.asarray(observed_markers[mid]["position_m"], dtype=np.float64) p0 = pred0[mid] q_i = float(np.dot(axis_world, p_obs - p0)) num += w * q_i den += w per_marker.append({"marker_id": mid, "q_i": q_i, "weight": w}) if den <= 1e-12: return float(state["joint_values"].get(link.joint_variable, 0.0)), {"reason": "zero_weight"} q = num / den return float(q), { "reason": "weighted_projection", "used_markers": len(per_marker), "axis_world": [float(x) for x in axis_world], "per_marker": per_marker, } def revolute_score_with_normals( link_name: str, q_value: float, link_infos: Dict[str, LinkInfo], state: Dict[str, Any], observed_markers: Dict[int, Dict[str, Any]], active_links: set[str], ) -> float: """ Weak tiebreak for q vs q + pi using marker normals. Lower is better. """ link = link_infos[link_name] if not link.markers: return 0.0 pred = predict_marker_positions(link_infos, state, joint_override={link_name: q_value}, active_links=active_links) T_link = world_to_link_transform(link_name, link_infos, state, cache={}, joint_override={link_name: q_value}) R_link = T_link[:3, :3] up = np.array([0.0, 0.0, 1.0], dtype=np.float64) score = 0.0 for marker in link.markers: if marker.marker_id not in observed_markers: continue if marker.local_normal is None: continue n_local = normalize(marker.local_normal) if abs(n_local[2]) < 0.5: continue n_world = normalize(R_link @ n_local) sign_pref = 1.0 if n_local[2] >= 0 else -1.0 score += marker_quality_weight(marker) * (1.0 - sign_pref * float(np.dot(n_world, up))) return float(score) def estimate_revolute_joint_value( link_name: str, link_infos: Dict[str, LinkInfo], state: Dict[str, Any], observed_markers: Dict[int, Dict[str, Any]], active_links: set[str], children_map: Dict[str, List[str]], use_normal_tiebreak: bool = True, normal_weight: float = 0.05, ) -> Tuple[float, Dict[str, Any]]: link = link_infos[link_name] if link.joint_variable is None: return 0.0, {"reason": "no_joint_variable"} subtree = subtree_links(link_name, children_map, active_links) obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers] if not obs_ids: return float(state["joint_values"].get(link.joint_variable, 0.0)), {"reason": "no_observations"} q0 = float(state["joint_values"].get(link.joint_variable, 0.0)) # Prediction with joint set to zero (only upstream geometry active). pred0 = predict_marker_positions(link_infos, state, joint_override={link_name: 0.0}, active_links=active_links) origin_world = world_joint_origin_for_link(link_name, link_infos, state) axis_world = world_joint_axis_for_link(link_name, link_infos, state) e1, e2, axis_id = plane_basis_for_axis(axis_world) S = 0.0 C = 0.0 used = 0 per_marker = [] for mid in obs_ids: if mid not in pred0: continue marker = next((m for m in link_infos[observed_markers[mid]["link"]].markers if m.marker_id == mid), None) w = marker_quality_weight(marker) if marker is not None else 1.0 p_obs = np.asarray(observed_markers[mid]["position_m"], dtype=np.float64) p0 = pred0[mid] u = project_to_plane(p0 - origin_world, axis_world) v = project_to_plane(p_obs - origin_world, axis_world) u2 = np.array([np.dot(u, e1), np.dot(u, e2)], dtype=np.float64) v2 = np.array([np.dot(v, e1), np.dot(v, e2)], dtype=np.float64) nu = float(np.linalg.norm(u2)) nv = float(np.linalg.norm(v2)) if nu <= 1e-9 or nv <= 1e-9: continue S += w * float(u2[0] * v2[1] - u2[1] * v2[0]) C += w * float(u2[0] * v2[0] + u2[1] * v2[1]) used += 1 per_marker.append({"marker_id": mid, "weight": w}) if used == 0: return q0, {"reason": "no_valid_vectors"} theta = math.atan2(S, C) theta_alt = wrap_angle_rad(theta + math.pi) # Evaluate both candidates deterministically, choose the better one. def score_candidate(qcand: float) -> float: pred = predict_marker_positions(link_infos, state, joint_override={link_name: qcand}, active_links=active_links) err = 0.0 for mid in obs_ids: if mid not in pred: continue marker = next((m for m in link_infos[observed_markers[mid]["link"]].markers if m.marker_id == mid), None) w = marker_quality_weight(marker) if marker is not None else 1.0 p_obs = np.asarray(observed_markers[mid]["position_m"], dtype=np.float64) d = pred[mid] - p_obs err += w * float(np.dot(d, d)) if use_normal_tiebreak: err += normal_weight * revolute_score_with_normals(link_name, qcand, link_infos, state, observed_markers, active_links) return float(err) score_theta = score_candidate(theta) score_alt = score_candidate(theta_alt) best_q = theta if score_theta <= score_alt else theta_alt best_score = min(score_theta, score_alt) return wrap_angle_rad(best_q), { "reason": "2d_alignment" + ("+normal_tiebreak" if use_normal_tiebreak else ""), "used_markers": used, "axis_world": [float(x) for x in axis_world], "axis_id": axis_id, "theta_rad": float(theta), "theta_alt_rad": float(theta_alt), "score_theta": float(score_theta), "score_theta_alt": float(score_alt), "best_score": float(best_score), "per_marker": per_marker, } def estimate_joint_geometrically( link_name: str, link_infos: Dict[str, LinkInfo], state: Dict[str, Any], observed_markers: Dict[int, Dict[str, Any]], active_links: set[str], children_map: Dict[str, List[str]], params: StatePoseParams, ) -> Tuple[float, Dict[str, Any]]: link = link_infos[link_name] jt = (link.joint_type or "").strip().lower() if jt == "linear": return estimate_linear_joint_value(link_name, link_infos, state, observed_markers, active_links, children_map) if jt == "revolute": return estimate_revolute_joint_value( link_name, link_infos, state, observed_markers, active_links, children_map, use_normal_tiebreak=params.use_marker_normals_flip_tiebreak, normal_weight=params.normal_flip_weight, ) return float(state["joint_values"].get(link.joint_variable, 0.0)) if link.joint_variable else 0.0, {"reason": "unsupported_joint_type"} # ============================================================================= # Scoring / reporting # ============================================================================= def build_state_template(link_infos: Dict[str, LinkInfo]) -> Dict[str, Any]: return { "root_R": np.eye(3, dtype=np.float64), "root_t": np.zeros(3, dtype=np.float64), "joint_values": {li.joint_variable: 0.0 for li in link_infos.values() if li.joint_variable}, } def copy_state(state: Dict[str, Any]) -> Dict[str, Any]: return { "root_R": np.asarray(state["root_R"], dtype=np.float64).copy(), "root_t": np.asarray(state["root_t"], dtype=np.float64).copy(), "joint_values": dict(state["joint_values"]), } def marker_error_report( link_infos: Dict[str, LinkInfo], state: Dict[str, Any], observed_markers: Dict[int, Dict[str, Any]], active_links: Optional[set[str]] = None, ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: pred = predict_marker_positions(link_infos, state, active_links=active_links) marker_reports: List[Dict[str, Any]] = [] errors = [] for marker_id, obs in observed_markers.items(): if marker_id not in pred: continue p_obs = np.asarray(obs["position_m"], dtype=np.float64) p_pred = pred[marker_id] err = p_pred - p_obs err_norm = float(np.linalg.norm(err)) errors.append(err_norm) marker_reports.append( { "marker_id": int(marker_id), "link": obs.get("link", "unknown"), "error_m": [float(x) for x in err], "error_norm_m": err_norm, "predicted_m": [float(x) for x in p_pred], "observed_m": [float(x) for x in p_obs], } ) if errors: arr = np.asarray(errors, dtype=np.float64) stats = { "num_markers_used": int(len(errors)), "mean_error_m": float(np.mean(arr)), "rms_error_m": float(math.sqrt(np.mean(arr ** 2))), "median_error_m": float(np.median(arr)), "worst_error_m": float(np.max(arr)), } else: stats = { "num_markers_used": 0, "mean_error_m": None, "rms_error_m": None, "median_error_m": None, "worst_error_m": None, } return marker_reports, stats def summarize_state(state: Dict[str, Any], link_infos: Dict[str, LinkInfo]) -> Dict[str, Any]: movements: Dict[str, Any] = {} for link_name, link in link_infos.items(): q = state["joint_values"].get(link.joint_variable, None) if link.joint_variable else None if q is None: continue jt = (link.joint_type or "").strip().lower() if jt == "linear": movements[link.joint_variable] = { "value_m": float(q), "value_mm": float(q * 1000.0), "joint_type": jt, "link": link_name, } elif jt == "revolute": movements[link.joint_variable] = { "value_rad": float(q), "value_deg": float(math.degrees(q)), "joint_type": jt, "link": link_name, } else: movements[link.joint_variable] = { "value": float(q), "joint_type": jt, "link": link_name, } root_R = np.asarray(state["root_R"], dtype=np.float64) root_t = np.asarray(state["root_t"], dtype=np.float64) return { "root_pose": { "translation_m": [float(x) for x in root_t], "rotation_matrix": [[float(x) for x in row] for row in root_R], "euler_xyz_deg": [float(x) for x in np.degrees(np.array([ math.atan2(root_R[2, 1], root_R[2, 2]), math.atan2(-root_R[2, 0], math.sqrt(root_R[0, 0] ** 2 + root_R[1, 0] ** 2)), math.atan2(root_R[1, 0], root_R[0, 0]), ]))], }, "movements": movements, } # ============================================================================= # Sequential geometric estimation # ============================================================================= def optimize_geometric_prefix( link_infos: Dict[str, LinkInfo], observed_markers: Dict[int, Dict[str, Any]], initial_state: Dict[str, Any], active_links: List[str], params: StatePoseParams, ) -> Tuple[Dict[str, Any], Dict[str, Any]]: active_set = set(active_links) children_map = compute_children_map(link_infos) stage_obs = active_observations_for_links(observed_markers, link_infos, active_set) state = copy_state(initial_state) stage_info: Dict[str, Any] = { "method": "deterministic_geometric_prefix", "active_links": active_links, "active_observations": len(stage_obs), "joint_updates": [], } root_link_name = next((ln for ln in active_links if link_infos[ln].parent is None), None) if root_link_name is not None: root_link = link_infos[root_link_name] root_R, root_t, root_info = initial_root_pose(root_link, stage_obs, min_markers=params.root_pose_min_markers) state["root_R"] = root_R state["root_t"] = root_t stage_info["root_link"] = root_link_name stage_info["root_pose"] = root_info active_joint_links = [ln for ln in active_links if link_infos[ln].parent is not None] for pass_idx in range(params.geometric_passes_per_stage): pass_updates = [] for link_name in active_joint_links: link = link_infos[link_name] if not link.joint_variable: continue q_old = float(state["joint_values"].get(link.joint_variable, 0.0)) q_new, info = estimate_joint_geometrically( link_name, link_infos, state, stage_obs, active_set, children_map, params, ) state["joint_values"][link.joint_variable] = q_new pass_updates.append( { "link": link_name, "joint_variable": link.joint_variable, "joint_type": link.joint_type, "old": q_old, "new": q_new, "info": info, } ) stage_info["joint_updates"].append({"pass": pass_idx, "updates": pass_updates}) marker_reports, stats = marker_error_report(link_infos, state, stage_obs, active_links=active_set) stage_info["marker_stats"] = stats stage_info["marker_reports"] = marker_reports[:] return state, stage_info def sequential_geometric_estimation( link_infos: Dict[str, LinkInfo], observed_markers: Dict[int, Dict[str, Any]], params: StatePoseParams, ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: ordered = topological_links(link_infos) if not ordered: return build_state_template(link_infos), [] start_n = max(1, min(int(params.numbers_of_elements_to_consider_start), len(ordered))) final_n = max(start_n, min(int(params.numbers_of_elements_to_consider_final), len(ordered))) state = build_state_template(link_infos) stage_reports: List[Dict[str, Any]] = [] for n in range(start_n, final_n + 1): active_links = ordered[:n] state, stage_info = optimize_geometric_prefix(link_infos, observed_markers, state, active_links, params) stage_info["stage_idx"] = len(stage_reports) stage_info["num_active_links"] = len(active_links) stage_info["active_links"] = active_links stage_obs = active_observations_for_links(observed_markers, link_infos, set(active_links)) marker_reports, stats = marker_error_report(link_infos, state, stage_obs, active_links=set(active_links)) stage_info["marker_stats"] = stats stage_info["marker_reports"] = marker_reports stage_reports.append(stage_info) mean_mm = None if stats["mean_error_m"] is None else stats["mean_error_m"] * 1000.0 print( f"[INFO] Stage {stage_info['stage_idx']} | links={len(active_links)} | " f"markers_used={stats['num_markers_used']} | " f"mean_error={'n/a' if mean_mm is None else f'{mean_mm:.2f} mm'}" ) return state, stage_reports # ============================================================================= # Serialization # ============================================================================= def link_world_poses(link_infos: Dict[str, LinkInfo], state: Dict[str, Any]) -> Dict[str, Any]: poses: Dict[str, Any] = {} cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {} for link_name in link_infos: T = world_to_link_transform(link_name, link_infos, state, cache) R = T[:3, :3] t = T[:3, 3] poses[link_name] = { "translation_m": [float(x) for x in t], "rotation_matrix": [[float(x) for x in row] for row in R], } return poses def state_to_json( state: Dict[str, Any], link_infos: Dict[str, LinkInfo], stage_reports: List[Dict[str, Any]], observed_markers: Dict[int, Dict[str, Any]], ) -> Dict[str, Any]: movements = summarize_state(state, link_infos)["movements"] root_pose = summarize_state(state, link_infos)["root_pose"] predicted = predict_marker_positions(link_infos, state) markers_out = [] for marker_id, obs in sorted(observed_markers.items()): entry = { "marker_id": int(marker_id), "link": obs.get("link", "unknown"), "observed_position_m": [float(x) for x in np.asarray(obs["position_m"], dtype=np.float64)], } if marker_id in predicted: entry["predicted_position_m"] = [float(x) for x in predicted[marker_id]] err = predicted[marker_id] - np.asarray(obs["position_m"], dtype=np.float64) entry["error_m"] = [float(x) for x in err] entry["error_norm_m"] = float(np.linalg.norm(err)) markers_out.append(entry) return { "schema_version": "1.0", "method": "deterministic_geometric_sequential_prefix", "created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "root_pose": root_pose, "movements": movements, "link_poses": link_world_poses(link_infos, state), "stage_reports": stage_reports, "markers": markers_out, } # ============================================================================= # Main # ============================================================================= def main() -> None: parser = argparse.ArgumentParser( description="Sequential deterministic geometric robot-state estimation from optimized ArUco marker positions" ) parser.add_argument("optimized_markers", help="aruco_positions_optimized*.json") parser.add_argument("-robot", "--robot", required=True, help="robot.json") parser.add_argument("-out", "--out", default=None, help="Output robot_state.json path") parser.add_argument("--prefixStart", type=int, default=None, help="Override state_pose_params start") parser.add_argument("--prefixFinal", type=int, default=None, help="Override state_pose_params final") parser.add_argument("--passes", type=int, default=None, help="Override geometric passes per stage") args = parser.parse_args() print("[STEP 1] Load robot.json and optimized marker positions...") robot_data = load_json(args.robot) length_scale = get_length_scale(robot_data) params = load_state_pose_params(robot_data) if args.prefixStart is not None: params.numbers_of_elements_to_consider_start = max(1, int(args.prefixStart)) if args.prefixFinal is not None: params.numbers_of_elements_to_consider_final = max(1, int(args.prefixFinal)) if args.passes is not None: params.geometric_passes_per_stage = max(1, int(args.passes)) link_infos, marker_by_id, issues = parse_robot(robot_data, length_scale) observed_map = load_observed_markers(args.optimized_markers) for issue in issues: print(issue) print(f"[INFO] Links: {len(link_infos)}") print(f"[INFO] Known robot markers: {len(marker_by_id)}") print(f"[INFO] Observed optimized markers: {len(observed_map)}") print( f"[INFO] state_pose_params: start={params.numbers_of_elements_to_consider_start}, " f"final={params.numbers_of_elements_to_consider_final}, " f"passes={params.geometric_passes_per_stage}, " f"normal_flip_tiebreak={params.use_marker_normals_flip_tiebreak}" ) print("[STEP 2] Sequential geometric reconstruction...") state, stage_reports = sequential_geometric_estimation(link_infos, observed_map, params) out_data = state_to_json(state, link_infos, stage_reports, observed_map) out_dir = os.path.dirname(resolve_path(args.out)) if args.out else os.path.dirname(resolve_path(args.optimized_markers)) if not out_dir: out_dir = "." os.makedirs(out_dir, exist_ok=True) out_file = args.out or os.path.join(out_dir, "robot_state.json") save_json(out_file, out_data) print(f"[INFO] Saved robot state to {out_file}") # Compact summary of the resulting movements print("\n[INFO] Estimated movements:") for key, info in out_data["movements"].items(): if "value_mm" in info: print(f" {key}: {info['value_mm']:.3f} mm ({info['link']}, {info['joint_type']})") elif "value_deg" in info: print(f" {key}: {info['value_deg']:.3f} deg ({info['link']}, {info['joint_type']})") else: print(f" {key}: {info.get('value', 0.0):.6f} ({info.get('link', 'unknown')}, {info.get('joint_type', 'unknown')})") if __name__ == "__main__": main()