Files
appRobotRender/pipeline/4_robotState_estimation_v5.py
2026-05-30 07:52:59 +02:00

1331 lines
48 KiB
Python

#!/usr/bin/env python3
"""
4_robotState_estimation_v5.py
Sequential robot-state estimation from optimized 3D ArUco marker positions.
Mathematical idea
-----------------
This script estimates a robot state q directly from already optimized marker
positions p_i^obs (from aruco_positions_optimized*.json).
The state is built incrementally along the kinematic chain:
1) estimate the root link pose from observed root-link markers
2) extend the active prefix link-by-link
3) for each newly activated joint, estimate its scalar variable by a geometric
rule that matches the robot's degrees of freedom
For each stage we keep the already estimated prefix fixed and only add the next
link(s) from the chain. This is intentionally *not* a generic global optimizer
by default. It is a deterministic geometric initializer that uses the robot
structure and marker geometry directly.
The stage logic is controlled by robot.json::state_pose_params:
- numbers_of_Elements_to_consider_start
- numbers_of_Elements_to_consider_final
- solver_in_between_geometrical
- solver_after_geometrical
Geometric rules used here
-------------------------
Rigid root / early prefix:
- root link pose is estimated from its observed markers with weighted Kabsch
Linear joint (slider):
- the joint translates its whole descendant subtree along the joint axis
- q is estimated as the weighted mean projection of observed minus predicted
marker positions onto the world-space joint axis
Revolute joint:
- the joint rotates its descendant subtree around the joint axis
- q is estimated by a coarse-to-fine 1D angular search that minimizes the
weighted marker residual over the active prefix
Optional solver
---------------
If enabled in state_pose_params, a least-squares refinement can be run after a
geometric stage or at the end. The geometric estimator remains the default and
is the first-class method in this script.
Important limitation
--------------------
This script works from marker positions only. It does not yet use image-space
marker orientation / normal / visibility cues. Those can be added later by
feeding in raw detections and camera poses.
Dependencies
------------
numpy, scipy (optional for refinement)
"""
from __future__ import annotations
import argparse
import copy
import json
import math
import os
import sys
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
# =============================================================================
# Path / JSON helpers
# =============================================================================
def resolve_path(path: str) -> str:
path = os.path.expanduser(path)
if os.path.isabs(path):
return path
return os.path.abspath(path)
def load_json(path: str) -> Any:
with open(resolve_path(path), 'r', encoding='utf-8') as f:
return json.load(f)
def save_json(path: str, data: Any) -> None:
with open(resolve_path(path), 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2)
# =============================================================================
# Small math helpers
# =============================================================================
def safe_norm(v: np.ndarray, eps: float = 1e-12) -> float:
return float(np.linalg.norm(v) + eps)
def normalize(v: np.ndarray, eps: float = 1e-12) -> np.ndarray:
v = np.asarray(v, dtype=np.float64)
return v / safe_norm(v, eps)
def wrap_angle_rad(a: float) -> float:
return (a + math.pi) % (2.0 * math.pi) - math.pi
def get_length_scale(robot_data: Dict[str, Any]) -> float:
units = robot_data.get('units', {}) or {}
length_unit = str(units.get('length', '')).strip().lower()
if length_unit in ('mm', 'millimeter', 'millimeters'):
return 1.0 / 1000.0
if length_unit in ('cm', 'centimeter', 'centimeters'):
return 1.0 / 100.0
return 1.0
def rotation_matrix_xyz(rx: float, ry: float, rz: float, degrees: bool = False) -> np.ndarray:
"""Rotation order: X then Y then Z."""
if degrees:
rx, ry, rz = math.radians(rx), math.radians(ry), math.radians(rz)
cx, sx = math.cos(rx), math.sin(rx)
cy, sy = math.cos(ry), math.sin(ry)
cz, sz = math.cos(rz), math.sin(rz)
rx_m = np.array([[1.0, 0.0, 0.0],
[0.0, cx, -sx],
[0.0, sx, cx]], dtype=np.float64)
ry_m = np.array([[cy, 0.0, sy],
[0.0, 1.0, 0.0],
[-sy, 0.0, cy]], dtype=np.float64)
rz_m = np.array([[cz, -sz, 0.0],
[sz, cz, 0.0],
[0.0, 0.0, 1.0]], dtype=np.float64)
return rz_m @ ry_m @ rx_m
def axis_angle_rotation(axis: np.ndarray, angle_rad: float) -> np.ndarray:
axis = normalize(axis)
x, y, z = axis
c = math.cos(angle_rad)
s = math.sin(angle_rad)
C = 1.0 - c
return np.array([
[c + x * x * C, x * y * C - z * s, x * z * C + y * s],
[y * x * C + z * s, c + y * y * C, y * z * C - x * s],
[z * x * C - y * s, z * y * C + x * s, c + z * z * C]
], dtype=np.float64)
def make_transform(R: Optional[np.ndarray] = None, t: Optional[np.ndarray] = None) -> np.ndarray:
T = np.eye(4, dtype=np.float64)
if R is not None:
T[:3, :3] = np.asarray(R, dtype=np.float64)
if t is not None:
T[:3, 3] = np.asarray(t, dtype=np.float64).reshape(3)
return T
def transform_point(T: np.ndarray, p: np.ndarray) -> np.ndarray:
p4 = np.ones(4, dtype=np.float64)
p4[:3] = np.asarray(p, dtype=np.float64).reshape(3)
return (T @ p4)[:3]
def matrix_to_euler_xyz(R: np.ndarray) -> np.ndarray:
R = np.asarray(R, dtype=np.float64)
sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0])
singular = sy < 1e-9
if not singular:
x = math.atan2(R[2, 1], R[2, 2])
y = math.atan2(-R[2, 0], sy)
z = math.atan2(R[1, 0], R[0, 0])
else:
x = math.atan2(-R[1, 2], R[1, 1])
y = math.atan2(-R[2, 0], sy)
z = 0.0
return np.degrees([x, y, z])
def kabsch(P: np.ndarray, Q: np.ndarray, W: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]:
"""Find R, t such that R @ P + t ≈ Q."""
P = np.asarray(P, dtype=np.float64)
Q = np.asarray(Q, dtype=np.float64)
assert P.shape == Q.shape and P.shape[1] == 3
if W is None:
W = np.ones(len(P), dtype=np.float64)
W = np.asarray(W, dtype=np.float64).reshape(-1)
W = np.maximum(W, 1e-12)
W = W / np.sum(W)
p_cent = np.sum(P * W[:, None], axis=0)
q_cent = np.sum(Q * W[:, None], axis=0)
P0 = P - p_cent
Q0 = Q - q_cent
H = (P0 * W[:, None]).T @ Q0
U, S, Vt = np.linalg.svd(H)
R = Vt.T @ U.T
if np.linalg.det(R) < 0:
Vt[-1, :] *= -1.0
R = Vt.T @ U.T
t = q_cent - R @ p_cent
return R, t
# =============================================================================
# Data structures
# =============================================================================
@dataclass
class MarkerInfo:
marker_id: int
link_name: str
local_pos_m: np.ndarray
size: Optional[float] = None
normal: Optional[np.ndarray] = None
name: str = ''
@dataclass
class LinkInfo:
name: str
parent: Optional[str]
joint_type: Optional[str]
joint_axis: Optional[np.ndarray]
joint_origin_m: np.ndarray
joint_rotation_deg: np.ndarray
joint_variable: Optional[str]
mount_position_m: np.ndarray
mount_rotation_deg: np.ndarray
markers: List[MarkerInfo]
@dataclass
class StageResult:
stage_idx: int
active_links: List[str]
active_joint_vars: List[str]
num_markers_used: int
mean_error_m: Optional[float]
rms_error_m: Optional[float]
worst_error_m: Optional[float]
method: str
optimizer_info: Dict[str, Any]
@dataclass
class StatePoseParams:
numbers_of_elements_to_consider_start: int = 4
numbers_of_elements_to_consider_final: int = 5
solver_in_between_geometrical: bool = False
solver_after_geometrical: bool = False
geometric_passes_per_stage: int = 2
revolute_search_coarse_deg: float = 5.0
revolute_search_fine_deg: float = 1.0
root_pose_min_markers: int = 3
@dataclass
class EstimationConfig:
marker_base_weight: float = 1.0
root_pose_prior_weight: float = 0.0
joint_prior_weight: float = 0.0
robust_loss: str = 'soft_l1'
max_iterations: int = 120
show_stage_reports: bool = True
use_geometric_prefix: bool = True
# =============================================================================
# Robot parsing
# =============================================================================
def parse_robot(robot_data: Dict[str, Any]) -> Tuple[Dict[str, LinkInfo], Dict[int, MarkerInfo], List[str]]:
length_scale = get_length_scale(robot_data)
links = robot_data.get('links', {}) or {}
link_infos: Dict[str, LinkInfo] = {}
marker_by_id: Dict[int, MarkerInfo] = {}
issues: List[str] = []
for link_name, link_data in links.items():
parent = link_data.get('parent', None)
joint = link_data.get('jointToParent', {}) or {}
joint_type = joint.get('type', None)
axis = joint.get('axis', None)
if axis is not None:
axis = np.asarray(axis, dtype=np.float64)
if safe_norm(axis) > 1e-12:
axis = normalize(axis)
else:
axis = None
joint_origin = np.asarray(joint.get('origin', [0, 0, 0]), dtype=np.float64) * float(length_scale)
joint_rotation_deg = np.asarray(joint.get('rotation', [0, 0, 0]), dtype=np.float64)
joint_variable = joint.get('variable', None)
mount_position = np.asarray(link_data.get('mountPosition', [0, 0, 0]), dtype=np.float64) * float(length_scale)
mount_rotation_deg = np.asarray(link_data.get('mountRotation', [0, 0, 0]), dtype=np.float64)
markers_data = link_data.get('markers', []) or []
markers: List[MarkerInfo] = []
for idx, m in enumerate(markers_data):
marker_id = int(m.get('id', -1))
pos = m.get('position', None)
if marker_id < 0 or pos is None or len(pos) != 3:
issues.append(f"[WARN] link='{link_name}': skipped invalid marker entry at index {idx}")
continue
if marker_id in marker_by_id:
issues.append(
f"[WARN] duplicate marker ID {marker_id} in link '{link_name}' "
f"(already in '{marker_by_id[marker_id].link_name}'); using first occurrence"
)
continue
normal = None
if isinstance(m.get('normal', None), list) and len(m['normal']) == 3:
normal = np.asarray(m['normal'], dtype=np.float64)
marker = MarkerInfo(
marker_id=marker_id,
link_name=link_name,
local_pos_m=np.asarray(pos, dtype=np.float64) * float(length_scale),
size=m.get('size', None),
normal=normal,
name=str(m.get('name', f'marker_{marker_id}')),
)
markers.append(marker)
marker_by_id[marker_id] = marker
link_infos[link_name] = LinkInfo(
name=link_name,
parent=parent,
joint_type=joint_type,
joint_axis=axis,
joint_origin_m=joint_origin,
joint_rotation_deg=joint_rotation_deg,
joint_variable=joint_variable,
mount_position_m=mount_position,
mount_rotation_deg=mount_rotation_deg,
markers=markers,
)
return link_infos, marker_by_id, issues
def topological_links(link_infos: Dict[str, LinkInfo]) -> List[str]:
children = {name: [] for name in link_infos}
roots = []
for name, info in link_infos.items():
if info.parent is None:
roots.append(name)
else:
children.setdefault(info.parent, []).append(name)
order: List[str] = []
queue = list(roots)
seen = set()
while queue:
cur = queue.pop(0)
if cur in seen:
continue
seen.add(cur)
order.append(cur)
for ch in children.get(cur, []):
queue.append(ch)
for name in link_infos:
if name not in seen:
order.append(name)
return order
def compute_children_map(link_infos: Dict[str, LinkInfo]) -> Dict[str, List[str]]:
children: Dict[str, List[str]] = {name: [] for name in link_infos}
for name, info in link_infos.items():
if info.parent is not None:
children.setdefault(info.parent, []).append(name)
return children
def compute_link_depths(link_infos: Dict[str, LinkInfo]) -> Dict[str, int]:
depths: Dict[str, int] = {}
def depth_of(name: str) -> int:
if name in depths:
return depths[name]
info = link_infos[name]
if info.parent is None:
depths[name] = 0
else:
depths[name] = depth_of(info.parent) + 1
return depths[name]
for name in link_infos:
depth_of(name)
return depths
# =============================================================================
# State pose params
# =============================================================================
def load_state_pose_params(robot_data: Dict[str, Any]) -> StatePoseParams:
raw = robot_data.get('state_pose_params', {}) or {}
return StatePoseParams(
numbers_of_elements_to_consider_start=int(raw.get('numbers_of_Elements_to_consider_start', 4)),
numbers_of_elements_to_consider_final=int(raw.get('numbers_of_Elements_to_consider_final', 5)),
solver_in_between_geometrical=bool(raw.get('solver_in_between_geometrical', False)),
solver_after_geometrical=bool(raw.get('solver_after_geometrical', False)),
geometric_passes_per_stage=max(1, int(raw.get('geometric_passes_per_stage', 2))),
revolute_search_coarse_deg=float(raw.get('revolute_search_coarse_deg', 5.0)),
revolute_search_fine_deg=float(raw.get('revolute_search_fine_deg', 1.0)),
root_pose_min_markers=max(1, int(raw.get('root_pose_min_markers', 3))),
)
# =============================================================================
# Observations
# =============================================================================
def load_observed_markers(optimized_markers_file: str) -> Dict[int, Dict[str, Any]]:
data = load_json(optimized_markers_file)
if isinstance(data, dict):
markers = data.get('markers', []) or []
elif isinstance(data, list):
markers = data
else:
markers = []
observed: Dict[int, Dict[str, Any]] = {}
for m in markers:
if not isinstance(m, dict):
continue
if 'marker_id' not in m:
continue
marker_id = int(m['marker_id'])
pos = m.get('position_m', None)
if pos is None or len(pos) != 3:
continue
obs = dict(m)
obs['marker_id'] = marker_id
obs['position_m'] = np.asarray(pos, dtype=np.float64)
observed[marker_id] = obs
return observed
def marker_weight(marker_info: MarkerInfo, base_weight: float = 1.0, ref_size: float = 25.0) -> float:
w = base_weight
if marker_info.size is not None:
try:
size = float(marker_info.size)
if size > 0:
w *= max(0.35, math.sqrt(size / ref_size))
except Exception:
pass
return w
# =============================================================================
# Forward kinematics
# =============================================================================
def link_static_transform(link: LinkInfo) -> np.ndarray:
Rm = rotation_matrix_xyz(*link.mount_rotation_deg, degrees=True)
return make_transform(Rm, link.mount_position_m)
def joint_motion_transform(link: LinkInfo, q_value: float) -> np.ndarray:
joint_type = (link.joint_type or '').strip().lower()
if link.joint_axis is None:
return np.eye(4, dtype=np.float64)
axis = normalize(link.joint_axis)
if joint_type == 'linear':
return make_transform(np.eye(3), axis * float(q_value))
if joint_type == 'revolute':
return make_transform(axis_angle_rotation(axis, float(q_value)), np.zeros(3))
return np.eye(4, dtype=np.float64)
def world_to_link_transform(
link_name: str,
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray],
joint_override: Optional[Dict[str, float]] = None,
) -> np.ndarray:
override_items = tuple(sorted((joint_override or {}).items()))
cache_key = (link_name, override_items)
if cache_key in cache:
return cache[cache_key]
link = link_infos[link_name]
if link.parent is None:
T = make_transform(state['root_R'], state['root_t'])
cache[cache_key] = T
return T
parent_T = world_to_link_transform(link.parent, link_infos, state, cache, joint_override=joint_override)
T = parent_T @ make_transform(np.eye(3), link.joint_origin_m)
joint_values = state['joint_values']
q = joint_override.get(link.name, joint_values.get(link.joint_variable, 0.0)) if joint_override else joint_values.get(link.joint_variable, 0.0)
T = T @ joint_motion_transform(link, q)
R_static = rotation_matrix_xyz(*link.joint_rotation_deg, degrees=True)
T = T @ make_transform(R_static, np.zeros(3))
T = T @ link_static_transform(link)
cache[cache_key] = T
return T
def predict_marker_positions(
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
joint_override: Optional[Dict[str, float]] = None,
active_links: Optional[set[str]] = None,
) -> Dict[int, np.ndarray]:
pred: Dict[int, np.ndarray] = {}
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
for link_name, link in link_infos.items():
if active_links is not None and link_name not in active_links:
continue
T = world_to_link_transform(link_name, link_infos, state, cache, joint_override=joint_override)
for marker in link.markers:
pred[marker.marker_id] = transform_point(T, marker.local_pos_m)
return pred
def collect_observations_by_link(
observed_markers: Dict[int, Dict[str, Any]],
active_links: Optional[set[str]] = None,
) -> Dict[str, List[int]]:
by_link: Dict[str, List[int]] = {}
for marker_id, obs in observed_markers.items():
link = obs.get('link', None)
if link is None or link == 'unknown':
continue
if active_links is not None and link not in active_links:
continue
by_link.setdefault(link, []).append(marker_id)
return by_link
# =============================================================================
# Initial geometric root pose
# =============================================================================
def initial_root_pose(
root_link: LinkInfo,
observed_markers: Dict[int, Dict[str, Any]],
min_markers: int = 3,
) -> Tuple[np.ndarray, np.ndarray]:
P = []
Q = []
W = []
for marker in root_link.markers:
if marker.marker_id in observed_markers:
obs = np.asarray(observed_markers[marker.marker_id]['position_m'], dtype=np.float64)
P.append(marker.local_pos_m)
Q.append(obs)
W.append(marker_weight(marker))
if len(P) >= min_markers:
return kabsch(np.asarray(P), np.asarray(Q), np.asarray(W))
if len(P) >= 1:
# Weak fallback: keep world axes and place root so the first marker matches.
marker = root_link.markers[0]
obs = np.asarray(observed_markers[marker.marker_id]['position_m'], dtype=np.float64)
return np.eye(3, dtype=np.float64), obs - marker.local_pos_m
return np.eye(3, dtype=np.float64), np.zeros(3, dtype=np.float64)
# =============================================================================
# Subtree helpers
# =============================================================================
def subtree_links(link_name: str, children_map: Dict[str, List[str]], active_links: set[str]) -> List[str]:
out: List[str] = []
queue = [link_name]
while queue:
cur = queue.pop(0)
if cur not in active_links:
continue
out.append(cur)
for ch in children_map.get(cur, []):
if ch in active_links:
queue.append(ch)
return out
def marker_ids_in_links(link_infos: Dict[str, LinkInfo], links: List[str]) -> List[int]:
ids: List[int] = []
for link_name in links:
for m in link_infos[link_name].markers:
ids.append(m.marker_id)
return ids
# =============================================================================
# Geometric joint estimation
# =============================================================================
def current_joint_value(state: Dict[str, Any], link: LinkInfo) -> float:
return float(state['joint_values'].get(link.joint_variable, 0.0)) if link.joint_variable else 0.0
def world_joint_axis_for_link(
link_name: str,
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
) -> np.ndarray:
link = link_infos[link_name]
if link.parent is None or link.joint_axis is None:
return np.array([1.0, 0.0, 0.0], dtype=np.float64)
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
parent_T = world_to_link_transform(link.parent, link_infos, state, cache)
axis_world = parent_T[:3, :3] @ normalize(link.joint_axis)
return normalize(axis_world)
def estimate_linear_joint_value(
link_name: str,
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
observed_markers: Dict[int, Dict[str, Any]],
active_links: set[str],
children_map: Dict[str, List[str]],
) -> Tuple[float, Dict[str, Any]]:
link = link_infos[link_name]
if link.joint_variable is None:
return 0.0, {'reason': 'no_joint_variable'}
subtree = subtree_links(link_name, children_map, active_links)
obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers]
if not obs_ids:
return current_joint_value(state, link), {'reason': 'no_observations'}
axis_world = world_joint_axis_for_link(link_name, link_infos, state)
# Evaluate current prediction with this joint forced to zero.
override = {link_name: 0.0}
pred0 = predict_marker_positions(link_infos, state, joint_override=override, active_links=active_links)
num = 0.0
den = 0.0
used = 0
per_marker = []
for mid in obs_ids:
if mid not in pred0:
continue
marker = next((m for m in link_infos[observed_markers[mid]['link']].markers if m.marker_id == mid), None)
w = marker_weight(marker) if marker is not None else 1.0
p_obs = np.asarray(observed_markers[mid]['position_m'], dtype=np.float64)
p0 = pred0[mid]
q_i = float(np.dot(axis_world, p_obs - p0))
num += w * q_i
den += w
used += 1
per_marker.append({'marker_id': mid, 'q_i': q_i, 'weight': w})
if den <= 1e-12:
return current_joint_value(state, link), {'reason': 'zero_weight'}
q = num / den
return float(q), {
'reason': 'weighted_projection',
'used_markers': used,
'axis_world': [float(x) for x in axis_world],
'per_marker': per_marker,
}
def marker_residual_error_for_state(
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
observed_markers: Dict[int, Dict[str, Any]],
active_links: Optional[set[str]] = None,
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
pred = predict_marker_positions(link_infos, state, active_links=active_links)
marker_reports: List[Dict[str, Any]] = []
errors = []
for marker_id, obs in observed_markers.items():
if marker_id not in pred:
continue
p_obs = np.asarray(obs['position_m'], dtype=np.float64)
p_pred = pred[marker_id]
err = p_pred - p_obs
err_norm = float(np.linalg.norm(err))
errors.append(err_norm)
marker_reports.append({
'marker_id': int(marker_id),
'link': obs.get('link', 'unknown'),
'observed_position_m': [float(x) for x in p_obs],
'predicted_position_m': [float(x) for x in p_pred],
'error_m': [float(x) for x in err],
'error_norm_m': err_norm,
'error_norm_mm': err_norm * 1000.0,
'marker_size': obs.get('position_mm', None),
})
if errors:
arr = np.asarray(errors, dtype=np.float64)
stats = {
'num_markers_used': int(len(errors)),
'mean_error_m': float(np.mean(arr)),
'median_error_m': float(np.median(arr)),
'rms_error_m': float(np.sqrt(np.mean(arr ** 2))),
'worst_error_m': float(np.max(arr)),
'p80_error_m': float(np.quantile(arr, 0.80)),
'p90_error_m': float(np.quantile(arr, 0.90)),
}
else:
stats = {
'num_markers_used': 0,
'mean_error_m': None,
'median_error_m': None,
'rms_error_m': None,
'worst_error_m': None,
'p80_error_m': None,
'p90_error_m': None,
}
return marker_reports, stats
def revolute_weighted_error(
link_name: str,
q: float,
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
observed_markers: Dict[int, Dict[str, Any]],
active_links: set[str],
children_map: Dict[str, List[str]],
) -> float:
link = link_infos[link_name]
if link.joint_variable is None:
return float('inf')
subtree = subtree_links(link_name, children_map, active_links)
obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers]
if not obs_ids:
return float('inf')
override = {link_name: q}
pred = predict_marker_positions(link_infos, state, joint_override=override, active_links=active_links)
err = 0.0
wsum = 0.0
for mid in obs_ids:
if mid not in pred:
continue
marker = next((m for m in link_infos[observed_markers[mid]['link']].markers if m.marker_id == mid), None)
w = marker_weight(marker) if marker is not None else 1.0
d = pred[mid] - np.asarray(observed_markers[mid]['position_m'], dtype=np.float64)
err += w * float(np.dot(d, d))
wsum += w
if wsum <= 1e-12:
return float('inf')
return err / wsum
def estimate_revolute_joint_value(
link_name: str,
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
observed_markers: Dict[int, Dict[str, Any]],
active_links: set[str],
children_map: Dict[str, List[str]],
coarse_step_deg: float = 5.0,
fine_step_deg: float = 1.0,
) -> Tuple[float, Dict[str, Any]]:
link = link_infos[link_name]
if link.joint_variable is None:
return 0.0, {'reason': 'no_joint_variable'}
subtree = subtree_links(link_name, children_map, active_links)
obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers]
if not obs_ids:
return current_joint_value(state, link), {'reason': 'no_observations'}
q0 = current_joint_value(state, link)
best_q = q0
best_e = float('inf')
used = len(obs_ids)
# Coarse-to-fine search around the current value, but still wide enough to catch flips.
centers = [q0, 0.0]
span_list = [math.pi, math.pi / 4.0, math.pi / 16.0]
step_list = [math.radians(coarse_step_deg), math.radians(fine_step_deg), math.radians(max(0.25, fine_step_deg / 2.0))]
for center in centers:
for span, step in zip(span_list, step_list):
if step <= 0:
continue
qs = np.arange(center - span, center + span + 0.5 * step, step)
for q in qs:
e = revolute_weighted_error(link_name, float(q), link_infos, state, observed_markers, active_links, children_map)
if e < best_e:
best_e = e
best_q = float(q)
center = best_q
return wrap_angle_rad(best_q), {
'reason': 'coarse_to_fine_scan',
'used_markers': used,
'best_error': best_e,
'q0': q0,
'search_span_rad': math.pi,
}
# =============================================================================
# Geometric prefix estimation
# =============================================================================
def build_state_template(link_infos: Dict[str, LinkInfo]) -> Dict[str, Any]:
return {
'root_R': np.eye(3, dtype=np.float64),
'root_t': np.zeros(3, dtype=np.float64),
'joint_values': {li.joint_variable: 0.0 for li in link_infos.values() if li.joint_variable},
}
def copy_state(state: Dict[str, Any]) -> Dict[str, Any]:
return {
'root_R': np.asarray(state['root_R'], dtype=np.float64).copy(),
'root_t': np.asarray(state['root_t'], dtype=np.float64).copy(),
'joint_values': dict(state.get('joint_values', {})),
}
def active_observations_for_links(
observed_markers: Dict[int, Dict[str, Any]],
active_links: set[str],
) -> Dict[int, Dict[str, Any]]:
out: Dict[int, Dict[str, Any]] = {}
for marker_id, obs in observed_markers.items():
link_name = obs.get('link', None)
if link_name in active_links:
out[marker_id] = obs
return out
def optimize_geometric_prefix(
link_infos: Dict[str, LinkInfo],
observed_markers: Dict[int, Dict[str, Any]],
initial_state: Dict[str, Any],
active_links: List[str],
cfg: StatePoseParams,
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
active_set = set(active_links)
children_map = compute_children_map(link_infos)
stage_obs = active_observations_for_links(observed_markers, active_set)
state = copy_state(initial_state)
stage_info: Dict[str, Any] = {
'method': 'geometric_prefix',
'active_links': active_links,
'active_observations': len(stage_obs),
'joint_updates': [],
}
# Root pose is refreshed from the root markers if possible.
root_link_name = next((ln for ln in active_links if link_infos[ln].parent is None), None)
if root_link_name is not None:
root_link = link_infos[root_link_name]
root_R, root_t = initial_root_pose(root_link, stage_obs, min_markers=cfg.root_pose_min_markers)
state['root_R'] = root_R
state['root_t'] = root_t
stage_info['root_link'] = root_link_name
stage_info['root_pose_source'] = 'kabsch_root_markers' if len([m for m in root_link.markers if m.marker_id in stage_obs]) >= cfg.root_pose_min_markers else 'weak_single_marker_fallback'
# Forward geometric passes.
active_joint_links = [ln for ln in active_links if link_infos[ln].parent is not None]
for pass_idx in range(cfg.geometric_passes_per_stage):
pass_updates = []
for link_name in active_joint_links:
link = link_infos[link_name]
jt = (link.joint_type or '').strip().lower()
if jt == 'linear':
q_old = current_joint_value(state, link)
q_new, info = estimate_linear_joint_value(link_name, link_infos, state, stage_obs, active_set, children_map)
state['joint_values'][link.joint_variable] = q_new
pass_updates.append({'link': link_name, 'joint_variable': link.joint_variable, 'joint_type': jt, 'old': q_old, 'new': q_new, 'info': info})
elif jt == 'revolute':
q_old = current_joint_value(state, link)
q_new, info = estimate_revolute_joint_value(
link_name, link_infos, state, stage_obs, active_set, children_map,
coarse_step_deg=cfg.revolute_search_coarse_deg,
fine_step_deg=cfg.revolute_search_fine_deg,
)
state['joint_values'][link.joint_variable] = q_new
pass_updates.append({'link': link_name, 'joint_variable': link.joint_variable, 'joint_type': jt, 'old': q_old, 'new': q_new, 'info': info})
stage_info[f'pass_{pass_idx+1}_updates'] = pass_updates
marker_reports, stats = marker_residual_error_for_state(link_infos, state, stage_obs, active_links=active_set)
stage_info['fit_stats'] = stats
stage_info['num_markers_used'] = stats['num_markers_used']
return state, stage_info
# =============================================================================
# Optional least-squares refinement
# =============================================================================
def stage_variables_to_vector(state: Dict[str, Any], active_joint_vars: List[str]) -> np.ndarray:
try:
from scipy.spatial.transform import Rotation
root_rvec = Rotation.from_matrix(state['root_R']).as_rotvec()
except Exception:
root_rvec = np.zeros(3, dtype=np.float64)
root_t = np.asarray(state['root_t'], dtype=np.float64).reshape(3)
values = list(root_t) + list(np.asarray(root_rvec, dtype=np.float64))
for var in active_joint_vars:
values.append(float(state['joint_values'].get(var, 0.0)))
return np.asarray(values, dtype=np.float64)
def vector_to_state(x: np.ndarray, template_state: Dict[str, Any], active_joint_vars: List[str]) -> Dict[str, Any]:
try:
from scipy.spatial.transform import Rotation
except Exception:
Rotation = None
x = np.asarray(x, dtype=np.float64).reshape(-1)
out = {
'root_t': x[0:3].copy(),
'root_R': template_state['root_R'].copy(),
'joint_values': dict(template_state['joint_values']),
}
root_rvec = x[3:6].copy()
if Rotation is not None:
out['root_R'] = Rotation.from_rotvec(root_rvec).as_matrix()
else:
angle = safe_norm(root_rvec)
out['root_R'] = np.eye(3, dtype=np.float64) if angle < 1e-12 else axis_angle_rotation(root_rvec / angle, angle)
idx = 6
for var in active_joint_vars:
if idx >= len(x):
break
out['joint_values'][var] = float(x[idx])
idx += 1
return out
def residuals_stage(
x: np.ndarray,
link_infos: Dict[str, LinkInfo],
observed_markers: Dict[int, Dict[str, Any]],
template_state: Dict[str, Any],
active_joint_vars: List[str],
active_links: set[str],
cfg: EstimationConfig,
) -> np.ndarray:
state = vector_to_state(x, template_state, active_joint_vars)
pred = predict_marker_positions(link_infos, state, active_links=active_links)
res: List[float] = []
for marker_id, obs in observed_markers.items():
if marker_id not in pred:
continue
link_name = obs.get('link', None)
marker = None
if link_name is not None and link_name in link_infos:
marker = next((m for m in link_infos[link_name].markers if m.marker_id == marker_id), None)
w = marker_weight(marker, base_weight=cfg.marker_base_weight) if marker is not None else cfg.marker_base_weight
sqrt_w = math.sqrt(max(1e-12, w))
p_obs = np.asarray(obs['position_m'], dtype=np.float64)
p_pred = pred[marker_id]
d = (p_pred - p_obs) * sqrt_w
res.extend(d.tolist())
if cfg.root_pose_prior_weight > 0:
root_t_prior = np.asarray(template_state['root_t'], dtype=np.float64)
try:
from scipy.spatial.transform import Rotation
root_rvec_prior = Rotation.from_matrix(template_state['root_R']).as_rotvec()
except Exception:
root_rvec_prior = np.zeros(3, dtype=np.float64)
w = math.sqrt(max(1e-12, cfg.root_pose_prior_weight))
res.extend(((x[0:3] - root_t_prior) * w).tolist())
res.extend(((x[3:6] - root_rvec_prior) * w).tolist())
if cfg.joint_prior_weight > 0:
idx = 6
w = math.sqrt(max(1e-12, cfg.joint_prior_weight))
for var in active_joint_vars:
if idx >= len(x):
break
q = x[idx]
q0 = template_state['joint_values'].get(var, 0.0)
dq = wrap_angle_rad(float(q - q0)) if abs(q0) <= math.pi * 2.5 else float(q - q0)
res.append(dq * w)
idx += 1
return np.asarray(res, dtype=np.float64)
def optional_solver_refine(
link_infos: Dict[str, LinkInfo],
observed_markers: Dict[int, Dict[str, Any]],
state: Dict[str, Any],
active_links: List[str],
cfg: EstimationConfig,
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
try:
from scipy.optimize import least_squares
except Exception:
return state, {'success': False, 'message': 'scipy unavailable'}
active_joint_vars = [link_infos[ln].joint_variable for ln in active_links if link_infos[ln].joint_variable is not None]
x0 = stage_variables_to_vector(state, active_joint_vars)
active_set = set(active_links)
result = least_squares(
lambda x: residuals_stage(x, link_infos, observed_markers, state, active_joint_vars, active_set, cfg),
x0,
loss=cfg.robust_loss,
f_scale=1.0,
max_nfev=cfg.max_iterations,
verbose=1,
)
refined = vector_to_state(result.x, state, active_joint_vars)
info = {
'success': bool(result.success),
'status': int(result.status),
'message': str(result.message),
'nfev': int(result.nfev),
'cost': float(np.sum(result.fun ** 2)),
}
return refined, info
# =============================================================================
# Sequential estimation driver
# =============================================================================
def sequential_geometric_estimation(
link_infos: Dict[str, LinkInfo],
observed_markers: Dict[int, Dict[str, Any]],
params: StatePoseParams,
cfg: EstimationConfig,
) -> Tuple[Dict[str, Any], List[StageResult]]:
ordered = topological_links(link_infos)
if not ordered:
return build_state_template(link_infos), []
start_n = max(1, min(int(params.numbers_of_elements_to_consider_start), len(ordered)))
final_n = max(start_n, min(int(params.numbers_of_elements_to_consider_final), len(ordered)))
state = build_state_template(link_infos)
stage_results: List[StageResult] = []
for n in range(start_n, final_n + 1):
active_links = ordered[:n]
stage_state, stage_info = optimize_geometric_prefix(link_infos, observed_markers, state, active_links, params)
state = stage_state
if params.solver_in_between_geometrical:
state, solver_info = optional_solver_refine(link_infos, observed_markers, state, active_links, cfg)
stage_info['solver_in_between'] = solver_info
stage_obs = active_observations_for_links(observed_markers, set(active_links))
marker_reports, stats = marker_residual_error_for_state(link_infos, state, stage_obs, active_links=set(active_links))
active_joint_vars = [link_infos[ln].joint_variable for ln in active_links if link_infos[ln].joint_variable is not None]
stage_result = StageResult(
stage_idx=len(stage_results),
active_links=active_links,
active_joint_vars=active_joint_vars,
num_markers_used=stats['num_markers_used'],
mean_error_m=stats['mean_error_m'],
rms_error_m=stats['rms_error_m'],
worst_error_m=stats['worst_error_m'],
method='geometric_prefix' + ('+solver' if params.solver_in_between_geometrical else ''),
optimizer_info=stage_info,
)
stage_results.append(stage_result)
if cfg.show_stage_reports:
print(
f"[INFO] Stage {stage_result.stage_idx} | links={len(active_links)} | joints={len(active_joint_vars)} | "
f"markers={stage_result.num_markers_used} | mean={None if stats['mean_error_m'] is None else stats['mean_error_m']*1000.0:.2f} mm"
)
if params.solver_after_geometrical:
state, solver_info = optional_solver_refine(link_infos, observed_markers, state, ordered[:final_n], cfg)
if stage_results:
stage_results[-1].optimizer_info['solver_after_geometrical'] = solver_info
return state, stage_results
# =============================================================================
# Output
# =============================================================================
def state_to_json(
robot_data: Dict[str, Any],
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
observed_markers: Dict[int, Dict[str, Any]],
marker_reports: List[Dict[str, Any]],
stats: Dict[str, Any],
input_file: str,
robot_file: str,
stage_results: List[StageResult],
params: StatePoseParams,
) -> Dict[str, Any]:
movements = {}
for link_name in topological_links(link_infos):
link = link_infos[link_name]
if not link.joint_variable:
continue
q = float(state['joint_values'].get(link.joint_variable, 0.0))
jt = (link.joint_type or '').strip().lower()
entry = {
'joint_type': jt,
'link': link_name,
'estimated': True,
}
if jt == 'revolute':
entry['value_rad'] = q
entry['value_deg'] = float(math.degrees(q))
elif jt == 'linear':
entry['value_m'] = q
entry['value_mm'] = q * 1000.0
else:
entry['value'] = q
movements[link.joint_variable] = entry
link_pose_entries = []
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
for link_name in topological_links(link_infos):
T = world_to_link_transform(link_name, link_infos, state, cache)
R = T[:3, :3]
t = T[:3, 3]
link = link_infos[link_name]
num_used = sum(1 for m in link.markers if m.marker_id in observed_markers)
link_pose_entries.append({
'link': link_name,
'parent': link.parent,
'position_m': [float(x) for x in t],
'rotation_matrix': [[float(v) for v in row] for row in R],
'euler_xyz_deg': [float(x) for x in matrix_to_euler_xyz(R)],
'num_observed_markers': int(num_used),
'num_markers_total': int(len(link.markers)),
})
root_link = next((name for name, info in link_infos.items() if info.parent is None), None)
root_T = make_transform(state['root_R'], state['root_t'])
out = {
'schema_version': '3.0',
'created_utc': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
'source': {
'marker_positions_file': input_file,
'robot_file': robot_file,
},
'state_pose_params': {
'numbers_of_Elements_to_consider_start': params.numbers_of_elements_to_consider_start,
'numbers_of_Elements_to_consider_final': params.numbers_of_elements_to_consider_final,
'solver_in_between_geometrical': params.solver_in_between_geometrical,
'solver_after_geometrical': params.solver_after_geometrical,
'geometric_passes_per_stage': params.geometric_passes_per_stage,
'revolute_search_coarse_deg': params.revolute_search_coarse_deg,
'revolute_search_fine_deg': params.revolute_search_fine_deg,
'root_pose_min_markers': params.root_pose_min_markers,
},
'summary': {
'num_links': int(len(link_infos)),
'num_observed_markers': int(len(observed_markers)),
'num_link_markers': int(sum(len(li.markers) for li in link_infos.values())),
'root_link': root_link,
'fit_stats': stats,
'optimizer': state.get('_optimizer', {}),
'stages': [
{
'stage_idx': int(s.stage_idx),
'active_links': s.active_links,
'active_joint_vars': s.active_joint_vars,
'num_markers_used': int(s.num_markers_used),
'mean_error_m': s.mean_error_m,
'rms_error_m': s.rms_error_m,
'worst_error_m': s.worst_error_m,
'method': s.method,
'optimizer_info': s.optimizer_info,
}
for s in stage_results
],
},
'world_pose': {
'root_translation_m': [float(x) for x in root_T[:3, 3]],
'root_rotation_matrix': [[float(v) for v in row] for row in root_T[:3, :3]],
'root_euler_xyz_deg': [float(x) for x in matrix_to_euler_xyz(root_T[:3, :3])],
},
'movements': movements,
'links': link_pose_entries,
'markers': marker_reports,
}
return out
# =============================================================================
# Main
# =============================================================================
def main() -> None:
parser = argparse.ArgumentParser(
description='Sequential geometric robot-state estimation from optimized ArUco marker positions'
)
parser.add_argument('optimized_markers', help='aruco_positions_optimized*.json')
parser.add_argument('-robot', '--robot', required=True, help='robot.json')
parser.add_argument('-out', '--out', default=None, help='Output robot_state.json path')
parser.add_argument('--maxIterations', type=int, default=120, help='Maximum least-squares iterations per optional refinement stage')
parser.add_argument('--jointPriorWeight', type=float, default=0.0, help='Prior weight for joint variables during optional solver refinement')
parser.add_argument('--rootPosePriorWeight', type=float, default=0.0, help='Prior weight for root pose during optional solver refinement')
parser.add_argument('--markerBaseWeight', type=float, default=1.0, help='Base weight for marker residuals')
parser.add_argument('--noSequential', action='store_true', help='Disable sequential prefix mode and run one prefix stage only')
parser.add_argument('--forceSolverAfter', action='store_true', help='Force solver after the geometric prefix regardless of robot.json')
parser.add_argument('--forceSolverBetween', action='store_true', help='Force solver between geometric stages regardless of robot.json')
args = parser.parse_args()
print('[STEP 1] Load robot.json and optimized marker positions...')
robot_data = load_json(args.robot)
link_infos, marker_by_id, issues = parse_robot(robot_data)
observed_map = load_observed_markers(args.optimized_markers)
params = load_state_pose_params(robot_data)
if args.noSequential:
params.numbers_of_elements_to_consider_start = len(topological_links(link_infos))
params.numbers_of_elements_to_consider_final = len(topological_links(link_infos))
if args.forceSolverBetween:
params.solver_in_between_geometrical = True
if args.forceSolverAfter:
params.solver_after_geometrical = True
for issue in issues:
print(issue)
print(f"[INFO] Links: {len(link_infos)}")
print(f"[INFO] Known robot markers: {len(marker_by_id)}")
print(f"[INFO] Observed optimized markers: {len(observed_map)}")
print(f"[INFO] state_pose_params: start={params.numbers_of_elements_to_consider_start}, final={params.numbers_of_elements_to_consider_final}, between={params.solver_in_between_geometrical}, after={params.solver_after_geometrical}")
cfg = EstimationConfig(
marker_base_weight=float(args.markerBaseWeight),
root_pose_prior_weight=float(args.rootPosePriorWeight),
joint_prior_weight=float(args.jointPriorWeight),
robust_loss='soft_l1',
max_iterations=int(args.maxIterations),
show_stage_reports=True,
use_geometric_prefix=True,
)
print('\n[STEP 2] Geometric sequential pose estimation...')
final_state, stage_results = sequential_geometric_estimation(
link_infos=link_infos,
observed_markers=observed_map,
params=params,
cfg=cfg,
)
print('\n[STEP 3] Build report and save robot_state.json...')
marker_reports, stats = marker_residual_error_for_state(link_infos, final_state, observed_map)
out_data = state_to_json(
robot_data=robot_data,
link_infos=link_infos,
state=final_state,
observed_markers=observed_map,
marker_reports=marker_reports,
stats=stats,
input_file=args.optimized_markers,
robot_file=args.robot,
stage_results=stage_results,
params=params,
)
out_path = args.out
if out_path is None:
out_dir = os.path.dirname(args.optimized_markers) or '.'
out_path = os.path.join(out_dir, 'robot_state.json')
save_json(out_path, out_data)
print(f"[INFO] Saved to {out_path}")
if stats['mean_error_m'] is not None:
print(f"[INFO] Mean marker error: {stats['mean_error_m'] * 1000.0:.2f} mm")
print(f"[INFO] RMS marker error : {stats['rms_error_m'] * 1000.0:.2f} mm")
print(f"[INFO] Worst marker error: {stats['worst_error_m'] * 1000.0:.2f} mm")
else:
print('[INFO] No marker error statistics available.')
if __name__ == '__main__':
main()