1331 lines
48 KiB
Python
1331 lines
48 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
4_robotState_estimation_v5.py
|
|
|
|
Sequential robot-state estimation from optimized 3D ArUco marker positions.
|
|
|
|
Mathematical idea
|
|
-----------------
|
|
This script estimates a robot state q directly from already optimized marker
|
|
positions p_i^obs (from aruco_positions_optimized*.json).
|
|
|
|
The state is built incrementally along the kinematic chain:
|
|
1) estimate the root link pose from observed root-link markers
|
|
2) extend the active prefix link-by-link
|
|
3) for each newly activated joint, estimate its scalar variable by a geometric
|
|
rule that matches the robot's degrees of freedom
|
|
|
|
For each stage we keep the already estimated prefix fixed and only add the next
|
|
link(s) from the chain. This is intentionally *not* a generic global optimizer
|
|
by default. It is a deterministic geometric initializer that uses the robot
|
|
structure and marker geometry directly.
|
|
|
|
The stage logic is controlled by robot.json::state_pose_params:
|
|
- numbers_of_Elements_to_consider_start
|
|
- numbers_of_Elements_to_consider_final
|
|
- solver_in_between_geometrical
|
|
- solver_after_geometrical
|
|
|
|
Geometric rules used here
|
|
-------------------------
|
|
Rigid root / early prefix:
|
|
- root link pose is estimated from its observed markers with weighted Kabsch
|
|
|
|
Linear joint (slider):
|
|
- the joint translates its whole descendant subtree along the joint axis
|
|
- q is estimated as the weighted mean projection of observed minus predicted
|
|
marker positions onto the world-space joint axis
|
|
|
|
Revolute joint:
|
|
- the joint rotates its descendant subtree around the joint axis
|
|
- q is estimated by a coarse-to-fine 1D angular search that minimizes the
|
|
weighted marker residual over the active prefix
|
|
|
|
Optional solver
|
|
---------------
|
|
If enabled in state_pose_params, a least-squares refinement can be run after a
|
|
geometric stage or at the end. The geometric estimator remains the default and
|
|
is the first-class method in this script.
|
|
|
|
Important limitation
|
|
--------------------
|
|
This script works from marker positions only. It does not yet use image-space
|
|
marker orientation / normal / visibility cues. Those can be added later by
|
|
feeding in raw detections and camera poses.
|
|
|
|
Dependencies
|
|
------------
|
|
numpy, scipy (optional for refinement)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import copy
|
|
import json
|
|
import math
|
|
import os
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import numpy as np
|
|
|
|
|
|
# =============================================================================
|
|
# Path / JSON helpers
|
|
# =============================================================================
|
|
|
|
def resolve_path(path: str) -> str:
|
|
path = os.path.expanduser(path)
|
|
if os.path.isabs(path):
|
|
return path
|
|
return os.path.abspath(path)
|
|
|
|
|
|
def load_json(path: str) -> Any:
|
|
with open(resolve_path(path), 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_json(path: str, data: Any) -> None:
|
|
with open(resolve_path(path), 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
# =============================================================================
|
|
# Small math helpers
|
|
# =============================================================================
|
|
|
|
def safe_norm(v: np.ndarray, eps: float = 1e-12) -> float:
|
|
return float(np.linalg.norm(v) + eps)
|
|
|
|
|
|
def normalize(v: np.ndarray, eps: float = 1e-12) -> np.ndarray:
|
|
v = np.asarray(v, dtype=np.float64)
|
|
return v / safe_norm(v, eps)
|
|
|
|
|
|
def wrap_angle_rad(a: float) -> float:
|
|
return (a + math.pi) % (2.0 * math.pi) - math.pi
|
|
|
|
|
|
def get_length_scale(robot_data: Dict[str, Any]) -> float:
|
|
units = robot_data.get('units', {}) or {}
|
|
length_unit = str(units.get('length', '')).strip().lower()
|
|
if length_unit in ('mm', 'millimeter', 'millimeters'):
|
|
return 1.0 / 1000.0
|
|
if length_unit in ('cm', 'centimeter', 'centimeters'):
|
|
return 1.0 / 100.0
|
|
return 1.0
|
|
|
|
|
|
def rotation_matrix_xyz(rx: float, ry: float, rz: float, degrees: bool = False) -> np.ndarray:
|
|
"""Rotation order: X then Y then Z."""
|
|
if degrees:
|
|
rx, ry, rz = math.radians(rx), math.radians(ry), math.radians(rz)
|
|
|
|
cx, sx = math.cos(rx), math.sin(rx)
|
|
cy, sy = math.cos(ry), math.sin(ry)
|
|
cz, sz = math.cos(rz), math.sin(rz)
|
|
|
|
rx_m = np.array([[1.0, 0.0, 0.0],
|
|
[0.0, cx, -sx],
|
|
[0.0, sx, cx]], dtype=np.float64)
|
|
ry_m = np.array([[cy, 0.0, sy],
|
|
[0.0, 1.0, 0.0],
|
|
[-sy, 0.0, cy]], dtype=np.float64)
|
|
rz_m = np.array([[cz, -sz, 0.0],
|
|
[sz, cz, 0.0],
|
|
[0.0, 0.0, 1.0]], dtype=np.float64)
|
|
return rz_m @ ry_m @ rx_m
|
|
|
|
|
|
def axis_angle_rotation(axis: np.ndarray, angle_rad: float) -> np.ndarray:
|
|
axis = normalize(axis)
|
|
x, y, z = axis
|
|
c = math.cos(angle_rad)
|
|
s = math.sin(angle_rad)
|
|
C = 1.0 - c
|
|
return np.array([
|
|
[c + x * x * C, x * y * C - z * s, x * z * C + y * s],
|
|
[y * x * C + z * s, c + y * y * C, y * z * C - x * s],
|
|
[z * x * C - y * s, z * y * C + x * s, c + z * z * C]
|
|
], dtype=np.float64)
|
|
|
|
|
|
def make_transform(R: Optional[np.ndarray] = None, t: Optional[np.ndarray] = None) -> np.ndarray:
|
|
T = np.eye(4, dtype=np.float64)
|
|
if R is not None:
|
|
T[:3, :3] = np.asarray(R, dtype=np.float64)
|
|
if t is not None:
|
|
T[:3, 3] = np.asarray(t, dtype=np.float64).reshape(3)
|
|
return T
|
|
|
|
|
|
def transform_point(T: np.ndarray, p: np.ndarray) -> np.ndarray:
|
|
p4 = np.ones(4, dtype=np.float64)
|
|
p4[:3] = np.asarray(p, dtype=np.float64).reshape(3)
|
|
return (T @ p4)[:3]
|
|
|
|
|
|
def matrix_to_euler_xyz(R: np.ndarray) -> np.ndarray:
|
|
R = np.asarray(R, dtype=np.float64)
|
|
sy = math.sqrt(R[0, 0] * R[0, 0] + R[1, 0] * R[1, 0])
|
|
singular = sy < 1e-9
|
|
|
|
if not singular:
|
|
x = math.atan2(R[2, 1], R[2, 2])
|
|
y = math.atan2(-R[2, 0], sy)
|
|
z = math.atan2(R[1, 0], R[0, 0])
|
|
else:
|
|
x = math.atan2(-R[1, 2], R[1, 1])
|
|
y = math.atan2(-R[2, 0], sy)
|
|
z = 0.0
|
|
return np.degrees([x, y, z])
|
|
|
|
|
|
def kabsch(P: np.ndarray, Q: np.ndarray, W: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""Find R, t such that R @ P + t ≈ Q."""
|
|
P = np.asarray(P, dtype=np.float64)
|
|
Q = np.asarray(Q, dtype=np.float64)
|
|
assert P.shape == Q.shape and P.shape[1] == 3
|
|
|
|
if W is None:
|
|
W = np.ones(len(P), dtype=np.float64)
|
|
W = np.asarray(W, dtype=np.float64).reshape(-1)
|
|
W = np.maximum(W, 1e-12)
|
|
W = W / np.sum(W)
|
|
|
|
p_cent = np.sum(P * W[:, None], axis=0)
|
|
q_cent = np.sum(Q * W[:, None], axis=0)
|
|
|
|
P0 = P - p_cent
|
|
Q0 = Q - q_cent
|
|
H = (P0 * W[:, None]).T @ Q0
|
|
|
|
U, S, Vt = np.linalg.svd(H)
|
|
R = Vt.T @ U.T
|
|
if np.linalg.det(R) < 0:
|
|
Vt[-1, :] *= -1.0
|
|
R = Vt.T @ U.T
|
|
t = q_cent - R @ p_cent
|
|
return R, t
|
|
|
|
|
|
# =============================================================================
|
|
# Data structures
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class MarkerInfo:
|
|
marker_id: int
|
|
link_name: str
|
|
local_pos_m: np.ndarray
|
|
size: Optional[float] = None
|
|
normal: Optional[np.ndarray] = None
|
|
name: str = ''
|
|
|
|
|
|
@dataclass
|
|
class LinkInfo:
|
|
name: str
|
|
parent: Optional[str]
|
|
joint_type: Optional[str]
|
|
joint_axis: Optional[np.ndarray]
|
|
joint_origin_m: np.ndarray
|
|
joint_rotation_deg: np.ndarray
|
|
joint_variable: Optional[str]
|
|
mount_position_m: np.ndarray
|
|
mount_rotation_deg: np.ndarray
|
|
markers: List[MarkerInfo]
|
|
|
|
|
|
@dataclass
|
|
class StageResult:
|
|
stage_idx: int
|
|
active_links: List[str]
|
|
active_joint_vars: List[str]
|
|
num_markers_used: int
|
|
mean_error_m: Optional[float]
|
|
rms_error_m: Optional[float]
|
|
worst_error_m: Optional[float]
|
|
method: str
|
|
optimizer_info: Dict[str, Any]
|
|
|
|
|
|
@dataclass
|
|
class StatePoseParams:
|
|
numbers_of_elements_to_consider_start: int = 4
|
|
numbers_of_elements_to_consider_final: int = 5
|
|
solver_in_between_geometrical: bool = False
|
|
solver_after_geometrical: bool = False
|
|
geometric_passes_per_stage: int = 2
|
|
revolute_search_coarse_deg: float = 5.0
|
|
revolute_search_fine_deg: float = 1.0
|
|
root_pose_min_markers: int = 3
|
|
|
|
|
|
@dataclass
|
|
class EstimationConfig:
|
|
marker_base_weight: float = 1.0
|
|
root_pose_prior_weight: float = 0.0
|
|
joint_prior_weight: float = 0.0
|
|
robust_loss: str = 'soft_l1'
|
|
max_iterations: int = 120
|
|
show_stage_reports: bool = True
|
|
use_geometric_prefix: bool = True
|
|
|
|
|
|
# =============================================================================
|
|
# Robot parsing
|
|
# =============================================================================
|
|
|
|
def parse_robot(robot_data: Dict[str, Any]) -> Tuple[Dict[str, LinkInfo], Dict[int, MarkerInfo], List[str]]:
|
|
length_scale = get_length_scale(robot_data)
|
|
links = robot_data.get('links', {}) or {}
|
|
|
|
link_infos: Dict[str, LinkInfo] = {}
|
|
marker_by_id: Dict[int, MarkerInfo] = {}
|
|
issues: List[str] = []
|
|
|
|
for link_name, link_data in links.items():
|
|
parent = link_data.get('parent', None)
|
|
|
|
joint = link_data.get('jointToParent', {}) or {}
|
|
joint_type = joint.get('type', None)
|
|
|
|
axis = joint.get('axis', None)
|
|
if axis is not None:
|
|
axis = np.asarray(axis, dtype=np.float64)
|
|
if safe_norm(axis) > 1e-12:
|
|
axis = normalize(axis)
|
|
else:
|
|
axis = None
|
|
|
|
joint_origin = np.asarray(joint.get('origin', [0, 0, 0]), dtype=np.float64) * float(length_scale)
|
|
joint_rotation_deg = np.asarray(joint.get('rotation', [0, 0, 0]), dtype=np.float64)
|
|
joint_variable = joint.get('variable', None)
|
|
|
|
mount_position = np.asarray(link_data.get('mountPosition', [0, 0, 0]), dtype=np.float64) * float(length_scale)
|
|
mount_rotation_deg = np.asarray(link_data.get('mountRotation', [0, 0, 0]), dtype=np.float64)
|
|
|
|
markers_data = link_data.get('markers', []) or []
|
|
markers: List[MarkerInfo] = []
|
|
for idx, m in enumerate(markers_data):
|
|
marker_id = int(m.get('id', -1))
|
|
pos = m.get('position', None)
|
|
|
|
if marker_id < 0 or pos is None or len(pos) != 3:
|
|
issues.append(f"[WARN] link='{link_name}': skipped invalid marker entry at index {idx}")
|
|
continue
|
|
|
|
if marker_id in marker_by_id:
|
|
issues.append(
|
|
f"[WARN] duplicate marker ID {marker_id} in link '{link_name}' "
|
|
f"(already in '{marker_by_id[marker_id].link_name}'); using first occurrence"
|
|
)
|
|
continue
|
|
|
|
normal = None
|
|
if isinstance(m.get('normal', None), list) and len(m['normal']) == 3:
|
|
normal = np.asarray(m['normal'], dtype=np.float64)
|
|
|
|
marker = MarkerInfo(
|
|
marker_id=marker_id,
|
|
link_name=link_name,
|
|
local_pos_m=np.asarray(pos, dtype=np.float64) * float(length_scale),
|
|
size=m.get('size', None),
|
|
normal=normal,
|
|
name=str(m.get('name', f'marker_{marker_id}')),
|
|
)
|
|
markers.append(marker)
|
|
marker_by_id[marker_id] = marker
|
|
|
|
link_infos[link_name] = LinkInfo(
|
|
name=link_name,
|
|
parent=parent,
|
|
joint_type=joint_type,
|
|
joint_axis=axis,
|
|
joint_origin_m=joint_origin,
|
|
joint_rotation_deg=joint_rotation_deg,
|
|
joint_variable=joint_variable,
|
|
mount_position_m=mount_position,
|
|
mount_rotation_deg=mount_rotation_deg,
|
|
markers=markers,
|
|
)
|
|
|
|
return link_infos, marker_by_id, issues
|
|
|
|
|
|
def topological_links(link_infos: Dict[str, LinkInfo]) -> List[str]:
|
|
children = {name: [] for name in link_infos}
|
|
roots = []
|
|
for name, info in link_infos.items():
|
|
if info.parent is None:
|
|
roots.append(name)
|
|
else:
|
|
children.setdefault(info.parent, []).append(name)
|
|
|
|
order: List[str] = []
|
|
queue = list(roots)
|
|
seen = set()
|
|
|
|
while queue:
|
|
cur = queue.pop(0)
|
|
if cur in seen:
|
|
continue
|
|
seen.add(cur)
|
|
order.append(cur)
|
|
for ch in children.get(cur, []):
|
|
queue.append(ch)
|
|
|
|
for name in link_infos:
|
|
if name not in seen:
|
|
order.append(name)
|
|
return order
|
|
|
|
|
|
def compute_children_map(link_infos: Dict[str, LinkInfo]) -> Dict[str, List[str]]:
|
|
children: Dict[str, List[str]] = {name: [] for name in link_infos}
|
|
for name, info in link_infos.items():
|
|
if info.parent is not None:
|
|
children.setdefault(info.parent, []).append(name)
|
|
return children
|
|
|
|
|
|
def compute_link_depths(link_infos: Dict[str, LinkInfo]) -> Dict[str, int]:
|
|
depths: Dict[str, int] = {}
|
|
|
|
def depth_of(name: str) -> int:
|
|
if name in depths:
|
|
return depths[name]
|
|
info = link_infos[name]
|
|
if info.parent is None:
|
|
depths[name] = 0
|
|
else:
|
|
depths[name] = depth_of(info.parent) + 1
|
|
return depths[name]
|
|
|
|
for name in link_infos:
|
|
depth_of(name)
|
|
return depths
|
|
|
|
|
|
# =============================================================================
|
|
# State pose params
|
|
# =============================================================================
|
|
|
|
def load_state_pose_params(robot_data: Dict[str, Any]) -> StatePoseParams:
|
|
raw = robot_data.get('state_pose_params', {}) or {}
|
|
return StatePoseParams(
|
|
numbers_of_elements_to_consider_start=int(raw.get('numbers_of_Elements_to_consider_start', 4)),
|
|
numbers_of_elements_to_consider_final=int(raw.get('numbers_of_Elements_to_consider_final', 5)),
|
|
solver_in_between_geometrical=bool(raw.get('solver_in_between_geometrical', False)),
|
|
solver_after_geometrical=bool(raw.get('solver_after_geometrical', False)),
|
|
geometric_passes_per_stage=max(1, int(raw.get('geometric_passes_per_stage', 2))),
|
|
revolute_search_coarse_deg=float(raw.get('revolute_search_coarse_deg', 5.0)),
|
|
revolute_search_fine_deg=float(raw.get('revolute_search_fine_deg', 1.0)),
|
|
root_pose_min_markers=max(1, int(raw.get('root_pose_min_markers', 3))),
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Observations
|
|
# =============================================================================
|
|
|
|
def load_observed_markers(optimized_markers_file: str) -> Dict[int, Dict[str, Any]]:
|
|
data = load_json(optimized_markers_file)
|
|
if isinstance(data, dict):
|
|
markers = data.get('markers', []) or []
|
|
elif isinstance(data, list):
|
|
markers = data
|
|
else:
|
|
markers = []
|
|
|
|
observed: Dict[int, Dict[str, Any]] = {}
|
|
for m in markers:
|
|
if not isinstance(m, dict):
|
|
continue
|
|
if 'marker_id' not in m:
|
|
continue
|
|
marker_id = int(m['marker_id'])
|
|
pos = m.get('position_m', None)
|
|
if pos is None or len(pos) != 3:
|
|
continue
|
|
obs = dict(m)
|
|
obs['marker_id'] = marker_id
|
|
obs['position_m'] = np.asarray(pos, dtype=np.float64)
|
|
observed[marker_id] = obs
|
|
return observed
|
|
|
|
|
|
def marker_weight(marker_info: MarkerInfo, base_weight: float = 1.0, ref_size: float = 25.0) -> float:
|
|
w = base_weight
|
|
if marker_info.size is not None:
|
|
try:
|
|
size = float(marker_info.size)
|
|
if size > 0:
|
|
w *= max(0.35, math.sqrt(size / ref_size))
|
|
except Exception:
|
|
pass
|
|
return w
|
|
|
|
|
|
# =============================================================================
|
|
# Forward kinematics
|
|
# =============================================================================
|
|
|
|
def link_static_transform(link: LinkInfo) -> np.ndarray:
|
|
Rm = rotation_matrix_xyz(*link.mount_rotation_deg, degrees=True)
|
|
return make_transform(Rm, link.mount_position_m)
|
|
|
|
|
|
def joint_motion_transform(link: LinkInfo, q_value: float) -> np.ndarray:
|
|
joint_type = (link.joint_type or '').strip().lower()
|
|
if link.joint_axis is None:
|
|
return np.eye(4, dtype=np.float64)
|
|
|
|
axis = normalize(link.joint_axis)
|
|
if joint_type == 'linear':
|
|
return make_transform(np.eye(3), axis * float(q_value))
|
|
if joint_type == 'revolute':
|
|
return make_transform(axis_angle_rotation(axis, float(q_value)), np.zeros(3))
|
|
return np.eye(4, dtype=np.float64)
|
|
|
|
|
|
def world_to_link_transform(
|
|
link_name: str,
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray],
|
|
joint_override: Optional[Dict[str, float]] = None,
|
|
) -> np.ndarray:
|
|
override_items = tuple(sorted((joint_override or {}).items()))
|
|
cache_key = (link_name, override_items)
|
|
if cache_key in cache:
|
|
return cache[cache_key]
|
|
|
|
link = link_infos[link_name]
|
|
if link.parent is None:
|
|
T = make_transform(state['root_R'], state['root_t'])
|
|
cache[cache_key] = T
|
|
return T
|
|
|
|
parent_T = world_to_link_transform(link.parent, link_infos, state, cache, joint_override=joint_override)
|
|
|
|
T = parent_T @ make_transform(np.eye(3), link.joint_origin_m)
|
|
|
|
joint_values = state['joint_values']
|
|
q = joint_override.get(link.name, joint_values.get(link.joint_variable, 0.0)) if joint_override else joint_values.get(link.joint_variable, 0.0)
|
|
T = T @ joint_motion_transform(link, q)
|
|
|
|
R_static = rotation_matrix_xyz(*link.joint_rotation_deg, degrees=True)
|
|
T = T @ make_transform(R_static, np.zeros(3))
|
|
|
|
T = T @ link_static_transform(link)
|
|
|
|
cache[cache_key] = T
|
|
return T
|
|
|
|
|
|
def predict_marker_positions(
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
joint_override: Optional[Dict[str, float]] = None,
|
|
active_links: Optional[set[str]] = None,
|
|
) -> Dict[int, np.ndarray]:
|
|
pred: Dict[int, np.ndarray] = {}
|
|
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
|
|
for link_name, link in link_infos.items():
|
|
if active_links is not None and link_name not in active_links:
|
|
continue
|
|
T = world_to_link_transform(link_name, link_infos, state, cache, joint_override=joint_override)
|
|
for marker in link.markers:
|
|
pred[marker.marker_id] = transform_point(T, marker.local_pos_m)
|
|
return pred
|
|
|
|
|
|
def collect_observations_by_link(
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
active_links: Optional[set[str]] = None,
|
|
) -> Dict[str, List[int]]:
|
|
by_link: Dict[str, List[int]] = {}
|
|
for marker_id, obs in observed_markers.items():
|
|
link = obs.get('link', None)
|
|
if link is None or link == 'unknown':
|
|
continue
|
|
if active_links is not None and link not in active_links:
|
|
continue
|
|
by_link.setdefault(link, []).append(marker_id)
|
|
return by_link
|
|
|
|
|
|
# =============================================================================
|
|
# Initial geometric root pose
|
|
# =============================================================================
|
|
|
|
def initial_root_pose(
|
|
root_link: LinkInfo,
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
min_markers: int = 3,
|
|
) -> Tuple[np.ndarray, np.ndarray]:
|
|
P = []
|
|
Q = []
|
|
W = []
|
|
|
|
for marker in root_link.markers:
|
|
if marker.marker_id in observed_markers:
|
|
obs = np.asarray(observed_markers[marker.marker_id]['position_m'], dtype=np.float64)
|
|
P.append(marker.local_pos_m)
|
|
Q.append(obs)
|
|
W.append(marker_weight(marker))
|
|
|
|
if len(P) >= min_markers:
|
|
return kabsch(np.asarray(P), np.asarray(Q), np.asarray(W))
|
|
|
|
if len(P) >= 1:
|
|
# Weak fallback: keep world axes and place root so the first marker matches.
|
|
marker = root_link.markers[0]
|
|
obs = np.asarray(observed_markers[marker.marker_id]['position_m'], dtype=np.float64)
|
|
return np.eye(3, dtype=np.float64), obs - marker.local_pos_m
|
|
|
|
return np.eye(3, dtype=np.float64), np.zeros(3, dtype=np.float64)
|
|
|
|
|
|
# =============================================================================
|
|
# Subtree helpers
|
|
# =============================================================================
|
|
|
|
def subtree_links(link_name: str, children_map: Dict[str, List[str]], active_links: set[str]) -> List[str]:
|
|
out: List[str] = []
|
|
queue = [link_name]
|
|
while queue:
|
|
cur = queue.pop(0)
|
|
if cur not in active_links:
|
|
continue
|
|
out.append(cur)
|
|
for ch in children_map.get(cur, []):
|
|
if ch in active_links:
|
|
queue.append(ch)
|
|
return out
|
|
|
|
|
|
def marker_ids_in_links(link_infos: Dict[str, LinkInfo], links: List[str]) -> List[int]:
|
|
ids: List[int] = []
|
|
for link_name in links:
|
|
for m in link_infos[link_name].markers:
|
|
ids.append(m.marker_id)
|
|
return ids
|
|
|
|
|
|
# =============================================================================
|
|
# Geometric joint estimation
|
|
# =============================================================================
|
|
|
|
def current_joint_value(state: Dict[str, Any], link: LinkInfo) -> float:
|
|
return float(state['joint_values'].get(link.joint_variable, 0.0)) if link.joint_variable else 0.0
|
|
|
|
|
|
def world_joint_axis_for_link(
|
|
link_name: str,
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
) -> np.ndarray:
|
|
link = link_infos[link_name]
|
|
if link.parent is None or link.joint_axis is None:
|
|
return np.array([1.0, 0.0, 0.0], dtype=np.float64)
|
|
|
|
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
|
|
parent_T = world_to_link_transform(link.parent, link_infos, state, cache)
|
|
axis_world = parent_T[:3, :3] @ normalize(link.joint_axis)
|
|
return normalize(axis_world)
|
|
|
|
|
|
def estimate_linear_joint_value(
|
|
link_name: str,
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
active_links: set[str],
|
|
children_map: Dict[str, List[str]],
|
|
) -> Tuple[float, Dict[str, Any]]:
|
|
link = link_infos[link_name]
|
|
if link.joint_variable is None:
|
|
return 0.0, {'reason': 'no_joint_variable'}
|
|
|
|
subtree = subtree_links(link_name, children_map, active_links)
|
|
obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers]
|
|
if not obs_ids:
|
|
return current_joint_value(state, link), {'reason': 'no_observations'}
|
|
|
|
axis_world = world_joint_axis_for_link(link_name, link_infos, state)
|
|
|
|
# Evaluate current prediction with this joint forced to zero.
|
|
override = {link_name: 0.0}
|
|
pred0 = predict_marker_positions(link_infos, state, joint_override=override, active_links=active_links)
|
|
|
|
num = 0.0
|
|
den = 0.0
|
|
used = 0
|
|
per_marker = []
|
|
for mid in obs_ids:
|
|
if mid not in pred0:
|
|
continue
|
|
marker = next((m for m in link_infos[observed_markers[mid]['link']].markers if m.marker_id == mid), None)
|
|
w = marker_weight(marker) if marker is not None else 1.0
|
|
p_obs = np.asarray(observed_markers[mid]['position_m'], dtype=np.float64)
|
|
p0 = pred0[mid]
|
|
q_i = float(np.dot(axis_world, p_obs - p0))
|
|
num += w * q_i
|
|
den += w
|
|
used += 1
|
|
per_marker.append({'marker_id': mid, 'q_i': q_i, 'weight': w})
|
|
|
|
if den <= 1e-12:
|
|
return current_joint_value(state, link), {'reason': 'zero_weight'}
|
|
|
|
q = num / den
|
|
return float(q), {
|
|
'reason': 'weighted_projection',
|
|
'used_markers': used,
|
|
'axis_world': [float(x) for x in axis_world],
|
|
'per_marker': per_marker,
|
|
}
|
|
|
|
|
|
def marker_residual_error_for_state(
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
active_links: Optional[set[str]] = None,
|
|
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
|
|
pred = predict_marker_positions(link_infos, state, active_links=active_links)
|
|
|
|
marker_reports: List[Dict[str, Any]] = []
|
|
errors = []
|
|
for marker_id, obs in observed_markers.items():
|
|
if marker_id not in pred:
|
|
continue
|
|
p_obs = np.asarray(obs['position_m'], dtype=np.float64)
|
|
p_pred = pred[marker_id]
|
|
err = p_pred - p_obs
|
|
err_norm = float(np.linalg.norm(err))
|
|
errors.append(err_norm)
|
|
marker_reports.append({
|
|
'marker_id': int(marker_id),
|
|
'link': obs.get('link', 'unknown'),
|
|
'observed_position_m': [float(x) for x in p_obs],
|
|
'predicted_position_m': [float(x) for x in p_pred],
|
|
'error_m': [float(x) for x in err],
|
|
'error_norm_m': err_norm,
|
|
'error_norm_mm': err_norm * 1000.0,
|
|
'marker_size': obs.get('position_mm', None),
|
|
})
|
|
|
|
if errors:
|
|
arr = np.asarray(errors, dtype=np.float64)
|
|
stats = {
|
|
'num_markers_used': int(len(errors)),
|
|
'mean_error_m': float(np.mean(arr)),
|
|
'median_error_m': float(np.median(arr)),
|
|
'rms_error_m': float(np.sqrt(np.mean(arr ** 2))),
|
|
'worst_error_m': float(np.max(arr)),
|
|
'p80_error_m': float(np.quantile(arr, 0.80)),
|
|
'p90_error_m': float(np.quantile(arr, 0.90)),
|
|
}
|
|
else:
|
|
stats = {
|
|
'num_markers_used': 0,
|
|
'mean_error_m': None,
|
|
'median_error_m': None,
|
|
'rms_error_m': None,
|
|
'worst_error_m': None,
|
|
'p80_error_m': None,
|
|
'p90_error_m': None,
|
|
}
|
|
|
|
return marker_reports, stats
|
|
|
|
|
|
def revolute_weighted_error(
|
|
link_name: str,
|
|
q: float,
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
active_links: set[str],
|
|
children_map: Dict[str, List[str]],
|
|
) -> float:
|
|
link = link_infos[link_name]
|
|
if link.joint_variable is None:
|
|
return float('inf')
|
|
|
|
subtree = subtree_links(link_name, children_map, active_links)
|
|
obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers]
|
|
if not obs_ids:
|
|
return float('inf')
|
|
|
|
override = {link_name: q}
|
|
pred = predict_marker_positions(link_infos, state, joint_override=override, active_links=active_links)
|
|
|
|
err = 0.0
|
|
wsum = 0.0
|
|
for mid in obs_ids:
|
|
if mid not in pred:
|
|
continue
|
|
marker = next((m for m in link_infos[observed_markers[mid]['link']].markers if m.marker_id == mid), None)
|
|
w = marker_weight(marker) if marker is not None else 1.0
|
|
d = pred[mid] - np.asarray(observed_markers[mid]['position_m'], dtype=np.float64)
|
|
err += w * float(np.dot(d, d))
|
|
wsum += w
|
|
|
|
if wsum <= 1e-12:
|
|
return float('inf')
|
|
return err / wsum
|
|
|
|
|
|
def estimate_revolute_joint_value(
|
|
link_name: str,
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
active_links: set[str],
|
|
children_map: Dict[str, List[str]],
|
|
coarse_step_deg: float = 5.0,
|
|
fine_step_deg: float = 1.0,
|
|
) -> Tuple[float, Dict[str, Any]]:
|
|
link = link_infos[link_name]
|
|
if link.joint_variable is None:
|
|
return 0.0, {'reason': 'no_joint_variable'}
|
|
|
|
subtree = subtree_links(link_name, children_map, active_links)
|
|
obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers]
|
|
if not obs_ids:
|
|
return current_joint_value(state, link), {'reason': 'no_observations'}
|
|
|
|
q0 = current_joint_value(state, link)
|
|
best_q = q0
|
|
best_e = float('inf')
|
|
used = len(obs_ids)
|
|
|
|
# Coarse-to-fine search around the current value, but still wide enough to catch flips.
|
|
centers = [q0, 0.0]
|
|
span_list = [math.pi, math.pi / 4.0, math.pi / 16.0]
|
|
step_list = [math.radians(coarse_step_deg), math.radians(fine_step_deg), math.radians(max(0.25, fine_step_deg / 2.0))]
|
|
|
|
for center in centers:
|
|
for span, step in zip(span_list, step_list):
|
|
if step <= 0:
|
|
continue
|
|
qs = np.arange(center - span, center + span + 0.5 * step, step)
|
|
for q in qs:
|
|
e = revolute_weighted_error(link_name, float(q), link_infos, state, observed_markers, active_links, children_map)
|
|
if e < best_e:
|
|
best_e = e
|
|
best_q = float(q)
|
|
center = best_q
|
|
|
|
return wrap_angle_rad(best_q), {
|
|
'reason': 'coarse_to_fine_scan',
|
|
'used_markers': used,
|
|
'best_error': best_e,
|
|
'q0': q0,
|
|
'search_span_rad': math.pi,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# Geometric prefix estimation
|
|
# =============================================================================
|
|
|
|
def build_state_template(link_infos: Dict[str, LinkInfo]) -> Dict[str, Any]:
|
|
return {
|
|
'root_R': np.eye(3, dtype=np.float64),
|
|
'root_t': np.zeros(3, dtype=np.float64),
|
|
'joint_values': {li.joint_variable: 0.0 for li in link_infos.values() if li.joint_variable},
|
|
}
|
|
|
|
|
|
def copy_state(state: Dict[str, Any]) -> Dict[str, Any]:
|
|
return {
|
|
'root_R': np.asarray(state['root_R'], dtype=np.float64).copy(),
|
|
'root_t': np.asarray(state['root_t'], dtype=np.float64).copy(),
|
|
'joint_values': dict(state.get('joint_values', {})),
|
|
}
|
|
|
|
|
|
def active_observations_for_links(
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
active_links: set[str],
|
|
) -> Dict[int, Dict[str, Any]]:
|
|
out: Dict[int, Dict[str, Any]] = {}
|
|
for marker_id, obs in observed_markers.items():
|
|
link_name = obs.get('link', None)
|
|
if link_name in active_links:
|
|
out[marker_id] = obs
|
|
return out
|
|
|
|
|
|
def optimize_geometric_prefix(
|
|
link_infos: Dict[str, LinkInfo],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
initial_state: Dict[str, Any],
|
|
active_links: List[str],
|
|
cfg: StatePoseParams,
|
|
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
|
active_set = set(active_links)
|
|
children_map = compute_children_map(link_infos)
|
|
stage_obs = active_observations_for_links(observed_markers, active_set)
|
|
|
|
state = copy_state(initial_state)
|
|
stage_info: Dict[str, Any] = {
|
|
'method': 'geometric_prefix',
|
|
'active_links': active_links,
|
|
'active_observations': len(stage_obs),
|
|
'joint_updates': [],
|
|
}
|
|
|
|
# Root pose is refreshed from the root markers if possible.
|
|
root_link_name = next((ln for ln in active_links if link_infos[ln].parent is None), None)
|
|
if root_link_name is not None:
|
|
root_link = link_infos[root_link_name]
|
|
root_R, root_t = initial_root_pose(root_link, stage_obs, min_markers=cfg.root_pose_min_markers)
|
|
state['root_R'] = root_R
|
|
state['root_t'] = root_t
|
|
stage_info['root_link'] = root_link_name
|
|
stage_info['root_pose_source'] = 'kabsch_root_markers' if len([m for m in root_link.markers if m.marker_id in stage_obs]) >= cfg.root_pose_min_markers else 'weak_single_marker_fallback'
|
|
|
|
# Forward geometric passes.
|
|
active_joint_links = [ln for ln in active_links if link_infos[ln].parent is not None]
|
|
for pass_idx in range(cfg.geometric_passes_per_stage):
|
|
pass_updates = []
|
|
for link_name in active_joint_links:
|
|
link = link_infos[link_name]
|
|
jt = (link.joint_type or '').strip().lower()
|
|
if jt == 'linear':
|
|
q_old = current_joint_value(state, link)
|
|
q_new, info = estimate_linear_joint_value(link_name, link_infos, state, stage_obs, active_set, children_map)
|
|
state['joint_values'][link.joint_variable] = q_new
|
|
pass_updates.append({'link': link_name, 'joint_variable': link.joint_variable, 'joint_type': jt, 'old': q_old, 'new': q_new, 'info': info})
|
|
elif jt == 'revolute':
|
|
q_old = current_joint_value(state, link)
|
|
q_new, info = estimate_revolute_joint_value(
|
|
link_name, link_infos, state, stage_obs, active_set, children_map,
|
|
coarse_step_deg=cfg.revolute_search_coarse_deg,
|
|
fine_step_deg=cfg.revolute_search_fine_deg,
|
|
)
|
|
state['joint_values'][link.joint_variable] = q_new
|
|
pass_updates.append({'link': link_name, 'joint_variable': link.joint_variable, 'joint_type': jt, 'old': q_old, 'new': q_new, 'info': info})
|
|
stage_info[f'pass_{pass_idx+1}_updates'] = pass_updates
|
|
|
|
marker_reports, stats = marker_residual_error_for_state(link_infos, state, stage_obs, active_links=active_set)
|
|
stage_info['fit_stats'] = stats
|
|
stage_info['num_markers_used'] = stats['num_markers_used']
|
|
return state, stage_info
|
|
|
|
|
|
# =============================================================================
|
|
# Optional least-squares refinement
|
|
# =============================================================================
|
|
|
|
def stage_variables_to_vector(state: Dict[str, Any], active_joint_vars: List[str]) -> np.ndarray:
|
|
try:
|
|
from scipy.spatial.transform import Rotation
|
|
root_rvec = Rotation.from_matrix(state['root_R']).as_rotvec()
|
|
except Exception:
|
|
root_rvec = np.zeros(3, dtype=np.float64)
|
|
|
|
root_t = np.asarray(state['root_t'], dtype=np.float64).reshape(3)
|
|
values = list(root_t) + list(np.asarray(root_rvec, dtype=np.float64))
|
|
for var in active_joint_vars:
|
|
values.append(float(state['joint_values'].get(var, 0.0)))
|
|
return np.asarray(values, dtype=np.float64)
|
|
|
|
|
|
def vector_to_state(x: np.ndarray, template_state: Dict[str, Any], active_joint_vars: List[str]) -> Dict[str, Any]:
|
|
try:
|
|
from scipy.spatial.transform import Rotation
|
|
except Exception:
|
|
Rotation = None
|
|
|
|
x = np.asarray(x, dtype=np.float64).reshape(-1)
|
|
out = {
|
|
'root_t': x[0:3].copy(),
|
|
'root_R': template_state['root_R'].copy(),
|
|
'joint_values': dict(template_state['joint_values']),
|
|
}
|
|
|
|
root_rvec = x[3:6].copy()
|
|
if Rotation is not None:
|
|
out['root_R'] = Rotation.from_rotvec(root_rvec).as_matrix()
|
|
else:
|
|
angle = safe_norm(root_rvec)
|
|
out['root_R'] = np.eye(3, dtype=np.float64) if angle < 1e-12 else axis_angle_rotation(root_rvec / angle, angle)
|
|
|
|
idx = 6
|
|
for var in active_joint_vars:
|
|
if idx >= len(x):
|
|
break
|
|
out['joint_values'][var] = float(x[idx])
|
|
idx += 1
|
|
return out
|
|
|
|
|
|
def residuals_stage(
|
|
x: np.ndarray,
|
|
link_infos: Dict[str, LinkInfo],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
template_state: Dict[str, Any],
|
|
active_joint_vars: List[str],
|
|
active_links: set[str],
|
|
cfg: EstimationConfig,
|
|
) -> np.ndarray:
|
|
state = vector_to_state(x, template_state, active_joint_vars)
|
|
pred = predict_marker_positions(link_infos, state, active_links=active_links)
|
|
res: List[float] = []
|
|
|
|
for marker_id, obs in observed_markers.items():
|
|
if marker_id not in pred:
|
|
continue
|
|
link_name = obs.get('link', None)
|
|
marker = None
|
|
if link_name is not None and link_name in link_infos:
|
|
marker = next((m for m in link_infos[link_name].markers if m.marker_id == marker_id), None)
|
|
w = marker_weight(marker, base_weight=cfg.marker_base_weight) if marker is not None else cfg.marker_base_weight
|
|
sqrt_w = math.sqrt(max(1e-12, w))
|
|
p_obs = np.asarray(obs['position_m'], dtype=np.float64)
|
|
p_pred = pred[marker_id]
|
|
d = (p_pred - p_obs) * sqrt_w
|
|
res.extend(d.tolist())
|
|
|
|
if cfg.root_pose_prior_weight > 0:
|
|
root_t_prior = np.asarray(template_state['root_t'], dtype=np.float64)
|
|
try:
|
|
from scipy.spatial.transform import Rotation
|
|
root_rvec_prior = Rotation.from_matrix(template_state['root_R']).as_rotvec()
|
|
except Exception:
|
|
root_rvec_prior = np.zeros(3, dtype=np.float64)
|
|
w = math.sqrt(max(1e-12, cfg.root_pose_prior_weight))
|
|
res.extend(((x[0:3] - root_t_prior) * w).tolist())
|
|
res.extend(((x[3:6] - root_rvec_prior) * w).tolist())
|
|
|
|
if cfg.joint_prior_weight > 0:
|
|
idx = 6
|
|
w = math.sqrt(max(1e-12, cfg.joint_prior_weight))
|
|
for var in active_joint_vars:
|
|
if idx >= len(x):
|
|
break
|
|
q = x[idx]
|
|
q0 = template_state['joint_values'].get(var, 0.0)
|
|
dq = wrap_angle_rad(float(q - q0)) if abs(q0) <= math.pi * 2.5 else float(q - q0)
|
|
res.append(dq * w)
|
|
idx += 1
|
|
|
|
return np.asarray(res, dtype=np.float64)
|
|
|
|
|
|
def optional_solver_refine(
|
|
link_infos: Dict[str, LinkInfo],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
state: Dict[str, Any],
|
|
active_links: List[str],
|
|
cfg: EstimationConfig,
|
|
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
|
try:
|
|
from scipy.optimize import least_squares
|
|
except Exception:
|
|
return state, {'success': False, 'message': 'scipy unavailable'}
|
|
|
|
active_joint_vars = [link_infos[ln].joint_variable for ln in active_links if link_infos[ln].joint_variable is not None]
|
|
x0 = stage_variables_to_vector(state, active_joint_vars)
|
|
active_set = set(active_links)
|
|
|
|
result = least_squares(
|
|
lambda x: residuals_stage(x, link_infos, observed_markers, state, active_joint_vars, active_set, cfg),
|
|
x0,
|
|
loss=cfg.robust_loss,
|
|
f_scale=1.0,
|
|
max_nfev=cfg.max_iterations,
|
|
verbose=1,
|
|
)
|
|
refined = vector_to_state(result.x, state, active_joint_vars)
|
|
info = {
|
|
'success': bool(result.success),
|
|
'status': int(result.status),
|
|
'message': str(result.message),
|
|
'nfev': int(result.nfev),
|
|
'cost': float(np.sum(result.fun ** 2)),
|
|
}
|
|
return refined, info
|
|
|
|
|
|
# =============================================================================
|
|
# Sequential estimation driver
|
|
# =============================================================================
|
|
|
|
def sequential_geometric_estimation(
|
|
link_infos: Dict[str, LinkInfo],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
params: StatePoseParams,
|
|
cfg: EstimationConfig,
|
|
) -> Tuple[Dict[str, Any], List[StageResult]]:
|
|
ordered = topological_links(link_infos)
|
|
if not ordered:
|
|
return build_state_template(link_infos), []
|
|
|
|
start_n = max(1, min(int(params.numbers_of_elements_to_consider_start), len(ordered)))
|
|
final_n = max(start_n, min(int(params.numbers_of_elements_to_consider_final), len(ordered)))
|
|
|
|
state = build_state_template(link_infos)
|
|
stage_results: List[StageResult] = []
|
|
|
|
for n in range(start_n, final_n + 1):
|
|
active_links = ordered[:n]
|
|
stage_state, stage_info = optimize_geometric_prefix(link_infos, observed_markers, state, active_links, params)
|
|
state = stage_state
|
|
|
|
if params.solver_in_between_geometrical:
|
|
state, solver_info = optional_solver_refine(link_infos, observed_markers, state, active_links, cfg)
|
|
stage_info['solver_in_between'] = solver_info
|
|
|
|
stage_obs = active_observations_for_links(observed_markers, set(active_links))
|
|
marker_reports, stats = marker_residual_error_for_state(link_infos, state, stage_obs, active_links=set(active_links))
|
|
|
|
active_joint_vars = [link_infos[ln].joint_variable for ln in active_links if link_infos[ln].joint_variable is not None]
|
|
stage_result = StageResult(
|
|
stage_idx=len(stage_results),
|
|
active_links=active_links,
|
|
active_joint_vars=active_joint_vars,
|
|
num_markers_used=stats['num_markers_used'],
|
|
mean_error_m=stats['mean_error_m'],
|
|
rms_error_m=stats['rms_error_m'],
|
|
worst_error_m=stats['worst_error_m'],
|
|
method='geometric_prefix' + ('+solver' if params.solver_in_between_geometrical else ''),
|
|
optimizer_info=stage_info,
|
|
)
|
|
stage_results.append(stage_result)
|
|
|
|
if cfg.show_stage_reports:
|
|
print(
|
|
f"[INFO] Stage {stage_result.stage_idx} | links={len(active_links)} | joints={len(active_joint_vars)} | "
|
|
f"markers={stage_result.num_markers_used} | mean={None if stats['mean_error_m'] is None else stats['mean_error_m']*1000.0:.2f} mm"
|
|
)
|
|
|
|
if params.solver_after_geometrical:
|
|
state, solver_info = optional_solver_refine(link_infos, observed_markers, state, ordered[:final_n], cfg)
|
|
if stage_results:
|
|
stage_results[-1].optimizer_info['solver_after_geometrical'] = solver_info
|
|
|
|
return state, stage_results
|
|
|
|
|
|
# =============================================================================
|
|
# Output
|
|
# =============================================================================
|
|
|
|
def state_to_json(
|
|
robot_data: Dict[str, Any],
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
marker_reports: List[Dict[str, Any]],
|
|
stats: Dict[str, Any],
|
|
input_file: str,
|
|
robot_file: str,
|
|
stage_results: List[StageResult],
|
|
params: StatePoseParams,
|
|
) -> Dict[str, Any]:
|
|
movements = {}
|
|
for link_name in topological_links(link_infos):
|
|
link = link_infos[link_name]
|
|
if not link.joint_variable:
|
|
continue
|
|
|
|
q = float(state['joint_values'].get(link.joint_variable, 0.0))
|
|
jt = (link.joint_type or '').strip().lower()
|
|
entry = {
|
|
'joint_type': jt,
|
|
'link': link_name,
|
|
'estimated': True,
|
|
}
|
|
if jt == 'revolute':
|
|
entry['value_rad'] = q
|
|
entry['value_deg'] = float(math.degrees(q))
|
|
elif jt == 'linear':
|
|
entry['value_m'] = q
|
|
entry['value_mm'] = q * 1000.0
|
|
else:
|
|
entry['value'] = q
|
|
movements[link.joint_variable] = entry
|
|
|
|
link_pose_entries = []
|
|
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
|
|
for link_name in topological_links(link_infos):
|
|
T = world_to_link_transform(link_name, link_infos, state, cache)
|
|
R = T[:3, :3]
|
|
t = T[:3, 3]
|
|
link = link_infos[link_name]
|
|
num_used = sum(1 for m in link.markers if m.marker_id in observed_markers)
|
|
link_pose_entries.append({
|
|
'link': link_name,
|
|
'parent': link.parent,
|
|
'position_m': [float(x) for x in t],
|
|
'rotation_matrix': [[float(v) for v in row] for row in R],
|
|
'euler_xyz_deg': [float(x) for x in matrix_to_euler_xyz(R)],
|
|
'num_observed_markers': int(num_used),
|
|
'num_markers_total': int(len(link.markers)),
|
|
})
|
|
|
|
root_link = next((name for name, info in link_infos.items() if info.parent is None), None)
|
|
root_T = make_transform(state['root_R'], state['root_t'])
|
|
|
|
out = {
|
|
'schema_version': '3.0',
|
|
'created_utc': time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()),
|
|
'source': {
|
|
'marker_positions_file': input_file,
|
|
'robot_file': robot_file,
|
|
},
|
|
'state_pose_params': {
|
|
'numbers_of_Elements_to_consider_start': params.numbers_of_elements_to_consider_start,
|
|
'numbers_of_Elements_to_consider_final': params.numbers_of_elements_to_consider_final,
|
|
'solver_in_between_geometrical': params.solver_in_between_geometrical,
|
|
'solver_after_geometrical': params.solver_after_geometrical,
|
|
'geometric_passes_per_stage': params.geometric_passes_per_stage,
|
|
'revolute_search_coarse_deg': params.revolute_search_coarse_deg,
|
|
'revolute_search_fine_deg': params.revolute_search_fine_deg,
|
|
'root_pose_min_markers': params.root_pose_min_markers,
|
|
},
|
|
'summary': {
|
|
'num_links': int(len(link_infos)),
|
|
'num_observed_markers': int(len(observed_markers)),
|
|
'num_link_markers': int(sum(len(li.markers) for li in link_infos.values())),
|
|
'root_link': root_link,
|
|
'fit_stats': stats,
|
|
'optimizer': state.get('_optimizer', {}),
|
|
'stages': [
|
|
{
|
|
'stage_idx': int(s.stage_idx),
|
|
'active_links': s.active_links,
|
|
'active_joint_vars': s.active_joint_vars,
|
|
'num_markers_used': int(s.num_markers_used),
|
|
'mean_error_m': s.mean_error_m,
|
|
'rms_error_m': s.rms_error_m,
|
|
'worst_error_m': s.worst_error_m,
|
|
'method': s.method,
|
|
'optimizer_info': s.optimizer_info,
|
|
}
|
|
for s in stage_results
|
|
],
|
|
},
|
|
'world_pose': {
|
|
'root_translation_m': [float(x) for x in root_T[:3, 3]],
|
|
'root_rotation_matrix': [[float(v) for v in row] for row in root_T[:3, :3]],
|
|
'root_euler_xyz_deg': [float(x) for x in matrix_to_euler_xyz(root_T[:3, :3])],
|
|
},
|
|
'movements': movements,
|
|
'links': link_pose_entries,
|
|
'markers': marker_reports,
|
|
}
|
|
return out
|
|
|
|
|
|
# =============================================================================
|
|
# Main
|
|
# =============================================================================
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description='Sequential geometric robot-state estimation from optimized ArUco marker positions'
|
|
)
|
|
parser.add_argument('optimized_markers', help='aruco_positions_optimized*.json')
|
|
parser.add_argument('-robot', '--robot', required=True, help='robot.json')
|
|
parser.add_argument('-out', '--out', default=None, help='Output robot_state.json path')
|
|
parser.add_argument('--maxIterations', type=int, default=120, help='Maximum least-squares iterations per optional refinement stage')
|
|
parser.add_argument('--jointPriorWeight', type=float, default=0.0, help='Prior weight for joint variables during optional solver refinement')
|
|
parser.add_argument('--rootPosePriorWeight', type=float, default=0.0, help='Prior weight for root pose during optional solver refinement')
|
|
parser.add_argument('--markerBaseWeight', type=float, default=1.0, help='Base weight for marker residuals')
|
|
parser.add_argument('--noSequential', action='store_true', help='Disable sequential prefix mode and run one prefix stage only')
|
|
parser.add_argument('--forceSolverAfter', action='store_true', help='Force solver after the geometric prefix regardless of robot.json')
|
|
parser.add_argument('--forceSolverBetween', action='store_true', help='Force solver between geometric stages regardless of robot.json')
|
|
|
|
args = parser.parse_args()
|
|
|
|
print('[STEP 1] Load robot.json and optimized marker positions...')
|
|
robot_data = load_json(args.robot)
|
|
link_infos, marker_by_id, issues = parse_robot(robot_data)
|
|
observed_map = load_observed_markers(args.optimized_markers)
|
|
params = load_state_pose_params(robot_data)
|
|
|
|
if args.noSequential:
|
|
params.numbers_of_elements_to_consider_start = len(topological_links(link_infos))
|
|
params.numbers_of_elements_to_consider_final = len(topological_links(link_infos))
|
|
|
|
if args.forceSolverBetween:
|
|
params.solver_in_between_geometrical = True
|
|
if args.forceSolverAfter:
|
|
params.solver_after_geometrical = True
|
|
|
|
for issue in issues:
|
|
print(issue)
|
|
|
|
print(f"[INFO] Links: {len(link_infos)}")
|
|
print(f"[INFO] Known robot markers: {len(marker_by_id)}")
|
|
print(f"[INFO] Observed optimized markers: {len(observed_map)}")
|
|
print(f"[INFO] state_pose_params: start={params.numbers_of_elements_to_consider_start}, final={params.numbers_of_elements_to_consider_final}, between={params.solver_in_between_geometrical}, after={params.solver_after_geometrical}")
|
|
|
|
cfg = EstimationConfig(
|
|
marker_base_weight=float(args.markerBaseWeight),
|
|
root_pose_prior_weight=float(args.rootPosePriorWeight),
|
|
joint_prior_weight=float(args.jointPriorWeight),
|
|
robust_loss='soft_l1',
|
|
max_iterations=int(args.maxIterations),
|
|
show_stage_reports=True,
|
|
use_geometric_prefix=True,
|
|
)
|
|
|
|
print('\n[STEP 2] Geometric sequential pose estimation...')
|
|
final_state, stage_results = sequential_geometric_estimation(
|
|
link_infos=link_infos,
|
|
observed_markers=observed_map,
|
|
params=params,
|
|
cfg=cfg,
|
|
)
|
|
|
|
print('\n[STEP 3] Build report and save robot_state.json...')
|
|
marker_reports, stats = marker_residual_error_for_state(link_infos, final_state, observed_map)
|
|
|
|
out_data = state_to_json(
|
|
robot_data=robot_data,
|
|
link_infos=link_infos,
|
|
state=final_state,
|
|
observed_markers=observed_map,
|
|
marker_reports=marker_reports,
|
|
stats=stats,
|
|
input_file=args.optimized_markers,
|
|
robot_file=args.robot,
|
|
stage_results=stage_results,
|
|
params=params,
|
|
)
|
|
|
|
out_path = args.out
|
|
if out_path is None:
|
|
out_dir = os.path.dirname(args.optimized_markers) or '.'
|
|
out_path = os.path.join(out_dir, 'robot_state.json')
|
|
|
|
save_json(out_path, out_data)
|
|
|
|
print(f"[INFO] Saved to {out_path}")
|
|
if stats['mean_error_m'] is not None:
|
|
print(f"[INFO] Mean marker error: {stats['mean_error_m'] * 1000.0:.2f} mm")
|
|
print(f"[INFO] RMS marker error : {stats['rms_error_m'] * 1000.0:.2f} mm")
|
|
print(f"[INFO] Worst marker error: {stats['worst_error_m'] * 1000.0:.2f} mm")
|
|
else:
|
|
print('[INFO] No marker error statistics available.')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|