Files
appRobotRender/pipeline/4_robotState_estimation_v6.py
2026-05-30 07:52:59 +02:00

1218 lines
44 KiB
Python

#!/usr/bin/env python3
"""
4_robotState_estimation_v6.py
Deterministic geometric robot-state estimation from optimized 3D ArUco marker positions.
Mathematical idea
-----------------
This script does NOT run a generic minimizer by default. It estimates the robot
state q with closed-form geometric rules and sequential prefix expansion.
Input:
aruco_positions_optimized*.json
robot.json
Core geometric steps
--------------------
1) Root pose estimation
The root link pose (Board) is estimated from its observed markers using a
weighted Kabsch fit:
R_root, t_root = argmin || R P_i + t - Q_i ||^2
with weights driven by marker size.
2) Linear joints / sliders
For a prismatic joint along axis a, the joint variable q is estimated by
weighted averaging of the projection residuals:
q = mean_i w_i * dot(a_world, p_obs_i - p_pred_i(q=0)) / mean_i w_i
This is a closed-form geometry update, not a numeric optimizer.
3) Revolute joints
For a revolute joint around axis a, the joint angle is estimated by
projecting the marker vectors into the plane orthogonal to a and solving a
2D weighted rotation alignment:
theta = atan2( sum_i w_i * cross2(u_i, v_i),
sum_i w_i * dot2(u_i, v_i) )
where u_i are predicted vectors (q=0) and v_i are observed vectors.
To handle 180° flip ambiguities, the script optionally compares theta and
theta + pi and uses a weak normal-based tie-break. Marker normals from
robot.json are used as a geometric hint, not as hard image evidence.
4) Sequential prefix expansion
The chain is estimated link-by-link from the root outwards. Each stage only
activates the first N links (controlled by robot.json::state_pose_params),
re-estimates the active prefix, and keeps the previous prefix as the start
of the next stage.
This is intentionally designed as a fast, deterministic geometric initializer
for later refinement, not as a full bundle-adjustment solver.
Notes
-----
- The script uses the robot hierarchy, joint axes, joint origins, and marker
local positions from robot.json.
- No least-squares minimizer is used in the default path.
- Solver-related flags may be present in robot.json for compatibility but are
not used here.
Dependencies
------------
numpy only
"""
from __future__ import annotations
import argparse
import json
import math
import os
import sys
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
# =============================================================================
# Path / JSON helpers
# =============================================================================
def resolve_path(path: str) -> str:
path = os.path.expanduser(path)
if os.path.isabs(path):
return path
return os.path.abspath(path)
def load_json(path: str) -> Any:
with open(resolve_path(path), "r", encoding="utf-8") as f:
return json.load(f)
def save_json(path: str, data: Any) -> None:
with open(resolve_path(path), "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
# =============================================================================
# Small geometry helpers
# =============================================================================
def safe_norm(v: np.ndarray, eps: float = 1e-12) -> float:
return float(np.linalg.norm(v) + eps)
def normalize(v: np.ndarray, eps: float = 1e-12) -> np.ndarray:
v = np.asarray(v, dtype=np.float64)
return v / safe_norm(v, eps)
def wrap_angle_rad(a: float) -> float:
return (a + math.pi) % (2.0 * math.pi) - math.pi
def get_length_scale(robot_data: Dict[str, Any]) -> float:
units = robot_data.get("units", {}) or {}
length_unit = str(units.get("length", "")).strip().lower()
if length_unit in ("mm", "millimeter", "millimeters"):
return 1.0 / 1000.0
if length_unit in ("cm", "centimeter", "centimeters"):
return 1.0 / 100.0
return 1.0
def rotation_matrix_xyz(rx: float, ry: float, rz: float, degrees: bool = False) -> np.ndarray:
"""Rotation order: X then Y then Z."""
if degrees:
rx, ry, rz = math.radians(rx), math.radians(ry), math.radians(rz)
cx, sx = math.cos(rx), math.sin(rx)
cy, sy = math.cos(ry), math.sin(ry)
cz, sz = math.cos(rz), math.sin(rz)
rx_m = np.array([[1.0, 0.0, 0.0],
[0.0, cx, -sx],
[0.0, sx, cx]], dtype=np.float64)
ry_m = np.array([[cy, 0.0, sy],
[0.0, 1.0, 0.0],
[-sy, 0.0, cy]], dtype=np.float64)
rz_m = np.array([[cz, -sz, 0.0],
[sz, cz, 0.0],
[0.0, 0.0, 1.0]], dtype=np.float64)
return rz_m @ ry_m @ rx_m
def axis_angle_rotation(axis: np.ndarray, angle_rad: float) -> np.ndarray:
axis = normalize(axis)
x, y, z = axis
c = math.cos(angle_rad)
s = math.sin(angle_rad)
C = 1.0 - c
return np.array([
[c + x * x * C, x * y * C - z * s, x * z * C + y * s],
[y * x * C + z * s, c + y * y * C, y * z * C - x * s],
[z * x * C - y * s, z * y * C + x * s, c + z * z * C],
], dtype=np.float64)
def make_transform(R: Optional[np.ndarray] = None, t: Optional[np.ndarray] = None) -> np.ndarray:
T = np.eye(4, dtype=np.float64)
if R is not None:
T[:3, :3] = np.asarray(R, dtype=np.float64)
if t is not None:
T[:3, 3] = np.asarray(t, dtype=np.float64).reshape(3)
return T
def transform_point(T: np.ndarray, p: np.ndarray) -> np.ndarray:
p4 = np.ones(4, dtype=np.float64)
p4[:3] = np.asarray(p, dtype=np.float64).reshape(3)
return (T @ p4)[:3]
def transform_vector(T: np.ndarray, v: np.ndarray) -> np.ndarray:
return T[:3, :3] @ np.asarray(v, dtype=np.float64).reshape(3)
def kabsch(P: np.ndarray, Q: np.ndarray, W: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]:
"""Weighted Kabsch fit: find R, t so that R @ P + t ≈ Q."""
P = np.asarray(P, dtype=np.float64)
Q = np.asarray(Q, dtype=np.float64)
assert P.shape == Q.shape and P.shape[1] == 3
if W is None:
W = np.ones(len(P), dtype=np.float64)
W = np.asarray(W, dtype=np.float64).reshape(-1)
W = np.maximum(W, 1e-12)
W = W / np.sum(W)
p_cent = np.sum(P * W[:, None], axis=0)
q_cent = np.sum(Q * W[:, None], axis=0)
P0 = P - p_cent
Q0 = Q - q_cent
H = (P0 * W[:, None]).T @ Q0
U, S, Vt = np.linalg.svd(H)
R = Vt.T @ U.T
if np.linalg.det(R) < 0:
Vt[-1, :] *= -1.0
R = Vt.T @ U.T
t = q_cent - R @ p_cent
return R, t
def principal_axis_id(axis: np.ndarray, threshold: float = 0.95) -> Optional[int]:
a = normalize(np.asarray(axis, dtype=np.float64))
idx = int(np.argmax(np.abs(a)))
if abs(a[idx]) >= threshold:
return idx
return None
def axis_unit_from_id(axis_id: int, sign: float = 1.0) -> np.ndarray:
v = np.zeros(3, dtype=np.float64)
v[axis_id] = 1.0 if sign >= 0 else -1.0
return v
def plane_basis_for_axis(axis: np.ndarray) -> Tuple[np.ndarray, np.ndarray, int]:
"""
Return a 2D basis (e1, e2) spanning the plane orthogonal to axis.
Also return the principal axis index if axis is close to x/y/z.
"""
a = normalize(axis)
axis_id = int(np.argmax(np.abs(a)))
if axis_id == 0: # x -> yz-plane
e1 = np.array([0.0, 1.0, 0.0], dtype=np.float64)
e2 = np.array([0.0, 0.0, 1.0], dtype=np.float64)
elif axis_id == 1: # y -> zx-plane
e1 = np.array([0.0, 0.0, 1.0], dtype=np.float64)
e2 = np.array([1.0, 0.0, 0.0], dtype=np.float64)
else: # z -> xy-plane
e1 = np.array([1.0, 0.0, 0.0], dtype=np.float64)
e2 = np.array([0.0, 1.0, 0.0], dtype=np.float64)
return e1, e2, axis_id
def project_to_plane(v: np.ndarray, axis: np.ndarray) -> np.ndarray:
axis = normalize(axis)
return np.asarray(v, dtype=np.float64) - np.dot(v, axis) * axis
def weighted_mean(values: List[float], weights: List[float]) -> float:
if not values or not weights:
return 0.0
w = np.asarray(weights, dtype=np.float64)
v = np.asarray(values, dtype=np.float64)
s = float(np.sum(w))
if s <= 1e-12:
return float(np.mean(v))
return float(np.sum(v * w) / s)
def marker_quality_weight(marker: "MarkerInfo", ref_size: float = 25.0) -> float:
w = 1.0
if marker.size is not None:
try:
size = float(marker.size)
if size > 0:
w *= max(0.35, math.sqrt(size / ref_size))
except Exception:
pass
return float(w)
def normal_flip_bias(marker: "MarkerInfo", world_up: np.ndarray = np.array([0.0, 0.0, 1.0], dtype=np.float64)) -> float:
"""
Weak bias for top/bottom flip discrimination.
Only markers with a strong ±z local normal contribute.
"""
if marker.local_normal is None:
return 0.0
n = normalize(marker.local_normal)
if abs(n[2]) < 0.5:
return 0.0
pref = 1.0 if n[2] >= 0 else -1.0
# Positive score means "prefer world_up alignment" for top markers,
# negative score means "prefer world_down alignment" for bottom markers.
return pref
# =============================================================================
# Robot structures
# =============================================================================
@dataclass
class MarkerInfo:
marker_id: int
name: str
local_pos_m: np.ndarray
local_normal: np.ndarray
size: Optional[float]
spin: Optional[float]
link_name: str
@dataclass
class LinkInfo:
name: str
parent: Optional[str]
joint_type: str
joint_axis: Optional[np.ndarray]
joint_variable: Optional[str]
joint_origin_m: np.ndarray
joint_rotation_deg: np.ndarray
mount_position_m: np.ndarray
mount_rotation_deg: np.ndarray
markers: List[MarkerInfo]
@dataclass
class StatePoseParams:
numbers_of_elements_to_consider_start: int = 4
numbers_of_elements_to_consider_final: int = 5
geometric_passes_per_stage: int = 2
root_pose_min_markers: int = 2
use_marker_normals_flip_tiebreak: bool = True
normal_flip_weight: float = 0.05
def load_state_pose_params(robot_data: Dict[str, Any]) -> StatePoseParams:
raw = robot_data.get("state_pose_params", {}) or {}
return StatePoseParams(
numbers_of_elements_to_consider_start=max(1, int(raw.get("numbers_of_Elements_to_consider_start", 4))),
numbers_of_elements_to_consider_final=max(1, int(raw.get("numbers_of_Elements_to_consider_final", 5))),
geometric_passes_per_stage=max(1, int(raw.get("geometric_passes_per_stage", 2))),
root_pose_min_markers=max(1, int(raw.get("root_pose_min_markers", 2))),
use_marker_normals_flip_tiebreak=bool(raw.get("use_marker_normals_flip_tiebreak", True)),
normal_flip_weight=float(raw.get("normal_flip_weight", 0.05)),
)
def parse_robot(robot_data: Dict[str, Any], length_scale: float) -> Tuple[Dict[str, LinkInfo], Dict[int, str], List[str]]:
links = robot_data.get("links", {}) or {}
link_infos: Dict[str, LinkInfo] = {}
marker_by_id: Dict[int, str] = {}
issues: List[str] = []
for link_name, link_data in links.items():
parent = link_data.get("parent", None)
joint = link_data.get("jointToParent", {}) or {}
joint_type = str(joint.get("type", "")).strip().lower()
axis = joint.get("axis", None)
joint_axis = None
if axis is not None:
axis_arr = np.asarray(axis, dtype=np.float64)
if safe_norm(axis_arr) > 1e-12:
joint_axis = normalize(axis_arr)
joint_variable = joint.get("variable", None)
joint_origin_m = np.asarray(joint.get("origin", [0, 0, 0]), dtype=np.float64) * float(length_scale)
joint_rotation_deg = np.asarray(joint.get("rotation", [0, 0, 0]), dtype=np.float64)
mount_position_m = np.asarray(link_data.get("mountPosition", [0, 0, 0]), dtype=np.float64) * float(length_scale)
mount_rotation_deg = np.asarray(link_data.get("mountRotation", [0, 0, 0]), dtype=np.float64)
markers: List[MarkerInfo] = []
seen_local: set[int] = set()
for idx, marker in enumerate(link_data.get("markers", []) or []):
marker_id = int(marker.get("id", -1))
pos = marker.get("position", None)
if marker_id < 0 or pos is None or len(pos) != 3:
issues.append(f"[WARN] link={link_name}: skipped invalid marker entry at index {idx}")
continue
if marker_id in seen_local:
issues.append(f"[WARN] link={link_name}: duplicate marker id {marker_id} inside same link -> skipped")
continue
if marker_id in marker_by_id:
issues.append(
f"[WARN] marker id {marker_id} already assigned to link '{marker_by_id[marker_id]}', "
f"duplicate in link '{link_name}' -> skipped"
)
continue
seen_local.add(marker_id)
marker_by_id[marker_id] = link_name
markers.append(
MarkerInfo(
marker_id=marker_id,
name=str(marker.get("name", f"aruco_{marker_id}")),
local_pos_m=np.asarray(pos, dtype=np.float64) * float(length_scale),
local_normal=normalize(np.asarray(marker.get("normal", [0, 0, 1]), dtype=np.float64)),
size=marker.get("size", None),
spin=marker.get("spin", None),
link_name=link_name,
)
)
link_infos[link_name] = LinkInfo(
name=link_name,
parent=parent,
joint_type=joint_type,
joint_axis=joint_axis,
joint_variable=joint_variable,
joint_origin_m=joint_origin_m,
joint_rotation_deg=joint_rotation_deg,
mount_position_m=mount_position_m,
mount_rotation_deg=mount_rotation_deg,
markers=markers,
)
return link_infos, marker_by_id, issues
def topological_links(link_infos: Dict[str, LinkInfo]) -> List[str]:
roots = [ln for ln, li in link_infos.items() if li.parent is None]
if not roots:
return []
root = roots[0]
children: Dict[str, List[str]] = {ln: [] for ln in link_infos}
for ln, li in link_infos.items():
if li.parent is not None and li.parent in children:
children[li.parent].append(ln)
order: List[str] = []
queue = [root]
while queue:
cur = queue.pop(0)
if cur in order:
continue
order.append(cur)
for ch in children.get(cur, []):
queue.append(ch)
# Append disconnected subtrees if any.
for ln in link_infos:
if ln not in order:
order.append(ln)
return order
def compute_children_map(link_infos: Dict[str, LinkInfo]) -> Dict[str, List[str]]:
children: Dict[str, List[str]] = {ln: [] for ln in link_infos}
for ln, li in link_infos.items():
if li.parent is not None and li.parent in children:
children[li.parent].append(ln)
return children
def subtree_links(link_name: str, children_map: Dict[str, List[str]], active_links: set[str]) -> List[str]:
out: List[str] = []
queue = [link_name]
while queue:
cur = queue.pop(0)
if cur not in active_links:
continue
out.append(cur)
for ch in children_map.get(cur, []):
if ch in active_links:
queue.append(ch)
return out
def marker_ids_in_links(link_infos: Dict[str, LinkInfo], links: List[str]) -> List[int]:
ids: List[int] = []
for link_name in links:
for m in link_infos[link_name].markers:
ids.append(m.marker_id)
return ids
# =============================================================================
# Observations
# =============================================================================
def load_observed_markers(optimized_markers_file: str) -> Dict[int, Dict[str, Any]]:
data = load_json(optimized_markers_file)
if isinstance(data, dict):
markers = data.get("markers", []) or []
elif isinstance(data, list):
markers = data
else:
markers = []
observed: Dict[int, Dict[str, Any]] = {}
for m in markers:
if not isinstance(m, dict) or "marker_id" not in m:
continue
marker_id = int(m["marker_id"])
pos = m.get("position_m", None)
if pos is None or len(pos) != 3:
continue
obs = dict(m)
obs["marker_id"] = marker_id
obs["position_m"] = np.asarray(pos, dtype=np.float64)
observed[marker_id] = obs
return observed
def active_observations_for_links(
observed_markers: Dict[int, Dict[str, Any]],
link_infos: Dict[str, LinkInfo],
active_links: set[str],
) -> Dict[int, Dict[str, Any]]:
out: Dict[int, Dict[str, Any]] = {}
for marker_id, obs in observed_markers.items():
link = obs.get("link", None)
if link is None or link == "unknown":
continue
if link not in active_links:
continue
if link not in link_infos:
continue
out[marker_id] = obs
return out
# =============================================================================
# Forward kinematics
# =============================================================================
def link_static_transform(link: LinkInfo) -> np.ndarray:
Rm = rotation_matrix_xyz(*link.mount_rotation_deg, degrees=True)
return make_transform(Rm, link.mount_position_m)
def joint_motion_transform(link: LinkInfo, q_value: float) -> np.ndarray:
joint_type = (link.joint_type or "").strip().lower()
if link.joint_axis is None:
return np.eye(4, dtype=np.float64)
axis = normalize(link.joint_axis)
if joint_type == "linear":
return make_transform(np.eye(3), axis * float(q_value))
if joint_type == "revolute":
return make_transform(axis_angle_rotation(axis, float(q_value)), np.zeros(3))
return np.eye(4, dtype=np.float64)
def world_to_link_transform(
link_name: str,
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray],
joint_override: Optional[Dict[str, float]] = None,
) -> np.ndarray:
override_items = tuple(sorted((joint_override or {}).items()))
cache_key = (link_name, override_items)
if cache_key in cache:
return cache[cache_key]
link = link_infos[link_name]
if link.parent is None:
T = make_transform(state["root_R"], state["root_t"])
cache[cache_key] = T
return T
parent_T = world_to_link_transform(link.parent, link_infos, state, cache, joint_override=joint_override)
# Parent -> joint origin
T = parent_T @ make_transform(np.eye(3), link.joint_origin_m)
# Joint motion
joint_values = state["joint_values"]
q = joint_override.get(link.name, joint_values.get(link.joint_variable, 0.0)) if joint_override else joint_values.get(link.joint_variable, 0.0)
T = T @ joint_motion_transform(link, q)
# Static rotation after the joint
R_static = rotation_matrix_xyz(*link.joint_rotation_deg, degrees=True)
T = T @ make_transform(R_static, np.zeros(3))
# Mount transform / link-local offset
T = T @ link_static_transform(link)
cache[cache_key] = T
return T
def predict_marker_positions(
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
joint_override: Optional[Dict[str, float]] = None,
active_links: Optional[set[str]] = None,
) -> Dict[int, np.ndarray]:
pred: Dict[int, np.ndarray] = {}
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
for link_name, link in link_infos.items():
if active_links is not None and link_name not in active_links:
continue
T = world_to_link_transform(link_name, link_infos, state, cache, joint_override=joint_override)
for marker in link.markers:
pred[marker.marker_id] = transform_point(T, marker.local_pos_m)
return pred
def world_joint_axis_for_link(
link_name: str,
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
) -> np.ndarray:
link = link_infos[link_name]
if link.parent is None or link.joint_axis is None:
return np.array([1.0, 0.0, 0.0], dtype=np.float64)
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
parent_T = world_to_link_transform(link.parent, link_infos, state, cache)
axis_world = transform_vector(parent_T, link.joint_axis)
return normalize(axis_world)
def world_joint_origin_for_link(
link_name: str,
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
) -> np.ndarray:
link = link_infos[link_name]
if link.parent is None:
return np.asarray(state["root_t"], dtype=np.float64).copy()
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
parent_T = world_to_link_transform(link.parent, link_infos, state, cache)
return transform_point(parent_T, link.joint_origin_m)
# =============================================================================
# Geometric estimation
# =============================================================================
def initial_root_pose(
root_link: LinkInfo,
observed_markers: Dict[int, Dict[str, Any]],
min_markers: int = 2,
) -> Tuple[np.ndarray, np.ndarray, Dict[str, Any]]:
P = []
Q = []
W = []
matched = []
for marker in root_link.markers:
if marker.marker_id in observed_markers:
obs = np.asarray(observed_markers[marker.marker_id]["position_m"], dtype=np.float64)
P.append(marker.local_pos_m)
Q.append(obs)
W.append(marker_quality_weight(marker))
matched.append(marker.marker_id)
if len(P) >= min_markers:
R, t = kabsch(np.asarray(P), np.asarray(Q), np.asarray(W))
return R, t, {"reason": "kabsch", "used_markers": matched}
if len(P) >= 1:
marker = root_link.markers[0]
obs = np.asarray(observed_markers[marker.marker_id]["position_m"], dtype=np.float64)
return np.eye(3, dtype=np.float64), obs - marker.local_pos_m, {"reason": "single_marker_fallback", "used_markers": matched}
return np.eye(3, dtype=np.float64), np.zeros(3, dtype=np.float64), {"reason": "no_root_markers"}
def estimate_linear_joint_value(
link_name: str,
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
observed_markers: Dict[int, Dict[str, Any]],
active_links: set[str],
children_map: Dict[str, List[str]],
) -> Tuple[float, Dict[str, Any]]:
link = link_infos[link_name]
if link.joint_variable is None:
return 0.0, {"reason": "no_joint_variable"}
subtree = subtree_links(link_name, children_map, active_links)
obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers]
if not obs_ids:
return float(state["joint_values"].get(link.joint_variable, 0.0)), {"reason": "no_observations"}
axis_world = world_joint_axis_for_link(link_name, link_infos, state)
pred0 = predict_marker_positions(link_infos, state, joint_override={link_name: 0.0}, active_links=active_links)
num = 0.0
den = 0.0
per_marker = []
for mid in obs_ids:
if mid not in pred0:
continue
marker = next((m for m in link_infos[observed_markers[mid]["link"]].markers if m.marker_id == mid), None)
w = marker_quality_weight(marker) if marker is not None else 1.0
p_obs = np.asarray(observed_markers[mid]["position_m"], dtype=np.float64)
p0 = pred0[mid]
q_i = float(np.dot(axis_world, p_obs - p0))
num += w * q_i
den += w
per_marker.append({"marker_id": mid, "q_i": q_i, "weight": w})
if den <= 1e-12:
return float(state["joint_values"].get(link.joint_variable, 0.0)), {"reason": "zero_weight"}
q = num / den
return float(q), {
"reason": "weighted_projection",
"used_markers": len(per_marker),
"axis_world": [float(x) for x in axis_world],
"per_marker": per_marker,
}
def revolute_score_with_normals(
link_name: str,
q_value: float,
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
observed_markers: Dict[int, Dict[str, Any]],
active_links: set[str],
) -> float:
"""
Weak tiebreak for q vs q + pi using marker normals.
Lower is better.
"""
link = link_infos[link_name]
if not link.markers:
return 0.0
pred = predict_marker_positions(link_infos, state, joint_override={link_name: q_value}, active_links=active_links)
T_link = world_to_link_transform(link_name, link_infos, state, cache={}, joint_override={link_name: q_value})
R_link = T_link[:3, :3]
up = np.array([0.0, 0.0, 1.0], dtype=np.float64)
score = 0.0
for marker in link.markers:
if marker.marker_id not in observed_markers:
continue
if marker.local_normal is None:
continue
n_local = normalize(marker.local_normal)
if abs(n_local[2]) < 0.5:
continue
n_world = normalize(R_link @ n_local)
sign_pref = 1.0 if n_local[2] >= 0 else -1.0
score += marker_quality_weight(marker) * (1.0 - sign_pref * float(np.dot(n_world, up)))
return float(score)
def estimate_revolute_joint_value(
link_name: str,
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
observed_markers: Dict[int, Dict[str, Any]],
active_links: set[str],
children_map: Dict[str, List[str]],
use_normal_tiebreak: bool = True,
normal_weight: float = 0.05,
) -> Tuple[float, Dict[str, Any]]:
link = link_infos[link_name]
if link.joint_variable is None:
return 0.0, {"reason": "no_joint_variable"}
subtree = subtree_links(link_name, children_map, active_links)
obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers]
if not obs_ids:
return float(state["joint_values"].get(link.joint_variable, 0.0)), {"reason": "no_observations"}
q0 = float(state["joint_values"].get(link.joint_variable, 0.0))
# Prediction with joint set to zero (only upstream geometry active).
pred0 = predict_marker_positions(link_infos, state, joint_override={link_name: 0.0}, active_links=active_links)
origin_world = world_joint_origin_for_link(link_name, link_infos, state)
axis_world = world_joint_axis_for_link(link_name, link_infos, state)
e1, e2, axis_id = plane_basis_for_axis(axis_world)
S = 0.0
C = 0.0
used = 0
per_marker = []
for mid in obs_ids:
if mid not in pred0:
continue
marker = next((m for m in link_infos[observed_markers[mid]["link"]].markers if m.marker_id == mid), None)
w = marker_quality_weight(marker) if marker is not None else 1.0
p_obs = np.asarray(observed_markers[mid]["position_m"], dtype=np.float64)
p0 = pred0[mid]
u = project_to_plane(p0 - origin_world, axis_world)
v = project_to_plane(p_obs - origin_world, axis_world)
u2 = np.array([np.dot(u, e1), np.dot(u, e2)], dtype=np.float64)
v2 = np.array([np.dot(v, e1), np.dot(v, e2)], dtype=np.float64)
nu = float(np.linalg.norm(u2))
nv = float(np.linalg.norm(v2))
if nu <= 1e-9 or nv <= 1e-9:
continue
S += w * float(u2[0] * v2[1] - u2[1] * v2[0])
C += w * float(u2[0] * v2[0] + u2[1] * v2[1])
used += 1
per_marker.append({"marker_id": mid, "weight": w})
if used == 0:
return q0, {"reason": "no_valid_vectors"}
theta = math.atan2(S, C)
theta_alt = wrap_angle_rad(theta + math.pi)
# Evaluate both candidates deterministically, choose the better one.
def score_candidate(qcand: float) -> float:
pred = predict_marker_positions(link_infos, state, joint_override={link_name: qcand}, active_links=active_links)
err = 0.0
for mid in obs_ids:
if mid not in pred:
continue
marker = next((m for m in link_infos[observed_markers[mid]["link"]].markers if m.marker_id == mid), None)
w = marker_quality_weight(marker) if marker is not None else 1.0
p_obs = np.asarray(observed_markers[mid]["position_m"], dtype=np.float64)
d = pred[mid] - p_obs
err += w * float(np.dot(d, d))
if use_normal_tiebreak:
err += normal_weight * revolute_score_with_normals(link_name, qcand, link_infos, state, observed_markers, active_links)
return float(err)
score_theta = score_candidate(theta)
score_alt = score_candidate(theta_alt)
best_q = theta if score_theta <= score_alt else theta_alt
best_score = min(score_theta, score_alt)
return wrap_angle_rad(best_q), {
"reason": "2d_alignment" + ("+normal_tiebreak" if use_normal_tiebreak else ""),
"used_markers": used,
"axis_world": [float(x) for x in axis_world],
"axis_id": axis_id,
"theta_rad": float(theta),
"theta_alt_rad": float(theta_alt),
"score_theta": float(score_theta),
"score_theta_alt": float(score_alt),
"best_score": float(best_score),
"per_marker": per_marker,
}
def estimate_joint_geometrically(
link_name: str,
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
observed_markers: Dict[int, Dict[str, Any]],
active_links: set[str],
children_map: Dict[str, List[str]],
params: StatePoseParams,
) -> Tuple[float, Dict[str, Any]]:
link = link_infos[link_name]
jt = (link.joint_type or "").strip().lower()
if jt == "linear":
return estimate_linear_joint_value(link_name, link_infos, state, observed_markers, active_links, children_map)
if jt == "revolute":
return estimate_revolute_joint_value(
link_name,
link_infos,
state,
observed_markers,
active_links,
children_map,
use_normal_tiebreak=params.use_marker_normals_flip_tiebreak,
normal_weight=params.normal_flip_weight,
)
return float(state["joint_values"].get(link.joint_variable, 0.0)) if link.joint_variable else 0.0, {"reason": "unsupported_joint_type"}
# =============================================================================
# Scoring / reporting
# =============================================================================
def build_state_template(link_infos: Dict[str, LinkInfo]) -> Dict[str, Any]:
return {
"root_R": np.eye(3, dtype=np.float64),
"root_t": np.zeros(3, dtype=np.float64),
"joint_values": {li.joint_variable: 0.0 for li in link_infos.values() if li.joint_variable},
}
def copy_state(state: Dict[str, Any]) -> Dict[str, Any]:
return {
"root_R": np.asarray(state["root_R"], dtype=np.float64).copy(),
"root_t": np.asarray(state["root_t"], dtype=np.float64).copy(),
"joint_values": dict(state["joint_values"]),
}
def marker_error_report(
link_infos: Dict[str, LinkInfo],
state: Dict[str, Any],
observed_markers: Dict[int, Dict[str, Any]],
active_links: Optional[set[str]] = None,
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
pred = predict_marker_positions(link_infos, state, active_links=active_links)
marker_reports: List[Dict[str, Any]] = []
errors = []
for marker_id, obs in observed_markers.items():
if marker_id not in pred:
continue
p_obs = np.asarray(obs["position_m"], dtype=np.float64)
p_pred = pred[marker_id]
err = p_pred - p_obs
err_norm = float(np.linalg.norm(err))
errors.append(err_norm)
marker_reports.append(
{
"marker_id": int(marker_id),
"link": obs.get("link", "unknown"),
"error_m": [float(x) for x in err],
"error_norm_m": err_norm,
"predicted_m": [float(x) for x in p_pred],
"observed_m": [float(x) for x in p_obs],
}
)
if errors:
arr = np.asarray(errors, dtype=np.float64)
stats = {
"num_markers_used": int(len(errors)),
"mean_error_m": float(np.mean(arr)),
"rms_error_m": float(math.sqrt(np.mean(arr ** 2))),
"median_error_m": float(np.median(arr)),
"worst_error_m": float(np.max(arr)),
}
else:
stats = {
"num_markers_used": 0,
"mean_error_m": None,
"rms_error_m": None,
"median_error_m": None,
"worst_error_m": None,
}
return marker_reports, stats
def summarize_state(state: Dict[str, Any], link_infos: Dict[str, LinkInfo]) -> Dict[str, Any]:
movements: Dict[str, Any] = {}
for link_name, link in link_infos.items():
q = state["joint_values"].get(link.joint_variable, None) if link.joint_variable else None
if q is None:
continue
jt = (link.joint_type or "").strip().lower()
if jt == "linear":
movements[link.joint_variable] = {
"value_m": float(q),
"value_mm": float(q * 1000.0),
"joint_type": jt,
"link": link_name,
}
elif jt == "revolute":
movements[link.joint_variable] = {
"value_rad": float(q),
"value_deg": float(math.degrees(q)),
"joint_type": jt,
"link": link_name,
}
else:
movements[link.joint_variable] = {
"value": float(q),
"joint_type": jt,
"link": link_name,
}
root_R = np.asarray(state["root_R"], dtype=np.float64)
root_t = np.asarray(state["root_t"], dtype=np.float64)
return {
"root_pose": {
"translation_m": [float(x) for x in root_t],
"rotation_matrix": [[float(x) for x in row] for row in root_R],
"euler_xyz_deg": [float(x) for x in np.degrees(np.array([
math.atan2(root_R[2, 1], root_R[2, 2]),
math.atan2(-root_R[2, 0], math.sqrt(root_R[0, 0] ** 2 + root_R[1, 0] ** 2)),
math.atan2(root_R[1, 0], root_R[0, 0]),
]))],
},
"movements": movements,
}
# =============================================================================
# Sequential geometric estimation
# =============================================================================
def optimize_geometric_prefix(
link_infos: Dict[str, LinkInfo],
observed_markers: Dict[int, Dict[str, Any]],
initial_state: Dict[str, Any],
active_links: List[str],
params: StatePoseParams,
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
active_set = set(active_links)
children_map = compute_children_map(link_infos)
stage_obs = active_observations_for_links(observed_markers, link_infos, active_set)
state = copy_state(initial_state)
stage_info: Dict[str, Any] = {
"method": "deterministic_geometric_prefix",
"active_links": active_links,
"active_observations": len(stage_obs),
"joint_updates": [],
}
root_link_name = next((ln for ln in active_links if link_infos[ln].parent is None), None)
if root_link_name is not None:
root_link = link_infos[root_link_name]
root_R, root_t, root_info = initial_root_pose(root_link, stage_obs, min_markers=params.root_pose_min_markers)
state["root_R"] = root_R
state["root_t"] = root_t
stage_info["root_link"] = root_link_name
stage_info["root_pose"] = root_info
active_joint_links = [ln for ln in active_links if link_infos[ln].parent is not None]
for pass_idx in range(params.geometric_passes_per_stage):
pass_updates = []
for link_name in active_joint_links:
link = link_infos[link_name]
if not link.joint_variable:
continue
q_old = float(state["joint_values"].get(link.joint_variable, 0.0))
q_new, info = estimate_joint_geometrically(
link_name,
link_infos,
state,
stage_obs,
active_set,
children_map,
params,
)
state["joint_values"][link.joint_variable] = q_new
pass_updates.append(
{
"link": link_name,
"joint_variable": link.joint_variable,
"joint_type": link.joint_type,
"old": q_old,
"new": q_new,
"info": info,
}
)
stage_info["joint_updates"].append({"pass": pass_idx, "updates": pass_updates})
marker_reports, stats = marker_error_report(link_infos, state, stage_obs, active_links=active_set)
stage_info["marker_stats"] = stats
stage_info["marker_reports"] = marker_reports[:]
return state, stage_info
def sequential_geometric_estimation(
link_infos: Dict[str, LinkInfo],
observed_markers: Dict[int, Dict[str, Any]],
params: StatePoseParams,
) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
ordered = topological_links(link_infos)
if not ordered:
return build_state_template(link_infos), []
start_n = max(1, min(int(params.numbers_of_elements_to_consider_start), len(ordered)))
final_n = max(start_n, min(int(params.numbers_of_elements_to_consider_final), len(ordered)))
state = build_state_template(link_infos)
stage_reports: List[Dict[str, Any]] = []
for n in range(start_n, final_n + 1):
active_links = ordered[:n]
state, stage_info = optimize_geometric_prefix(link_infos, observed_markers, state, active_links, params)
stage_info["stage_idx"] = len(stage_reports)
stage_info["num_active_links"] = len(active_links)
stage_info["active_links"] = active_links
stage_obs = active_observations_for_links(observed_markers, link_infos, set(active_links))
marker_reports, stats = marker_error_report(link_infos, state, stage_obs, active_links=set(active_links))
stage_info["marker_stats"] = stats
stage_info["marker_reports"] = marker_reports
stage_reports.append(stage_info)
mean_mm = None if stats["mean_error_m"] is None else stats["mean_error_m"] * 1000.0
print(
f"[INFO] Stage {stage_info['stage_idx']} | links={len(active_links)} | "
f"markers_used={stats['num_markers_used']} | "
f"mean_error={'n/a' if mean_mm is None else f'{mean_mm:.2f} mm'}"
)
return state, stage_reports
# =============================================================================
# Serialization
# =============================================================================
def link_world_poses(link_infos: Dict[str, LinkInfo], state: Dict[str, Any]) -> Dict[str, Any]:
poses: Dict[str, Any] = {}
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
for link_name in link_infos:
T = world_to_link_transform(link_name, link_infos, state, cache)
R = T[:3, :3]
t = T[:3, 3]
poses[link_name] = {
"translation_m": [float(x) for x in t],
"rotation_matrix": [[float(x) for x in row] for row in R],
}
return poses
def state_to_json(
state: Dict[str, Any],
link_infos: Dict[str, LinkInfo],
stage_reports: List[Dict[str, Any]],
observed_markers: Dict[int, Dict[str, Any]],
) -> Dict[str, Any]:
movements = summarize_state(state, link_infos)["movements"]
root_pose = summarize_state(state, link_infos)["root_pose"]
predicted = predict_marker_positions(link_infos, state)
markers_out = []
for marker_id, obs in sorted(observed_markers.items()):
entry = {
"marker_id": int(marker_id),
"link": obs.get("link", "unknown"),
"observed_position_m": [float(x) for x in np.asarray(obs["position_m"], dtype=np.float64)],
}
if marker_id in predicted:
entry["predicted_position_m"] = [float(x) for x in predicted[marker_id]]
err = predicted[marker_id] - np.asarray(obs["position_m"], dtype=np.float64)
entry["error_m"] = [float(x) for x in err]
entry["error_norm_m"] = float(np.linalg.norm(err))
markers_out.append(entry)
return {
"schema_version": "1.0",
"method": "deterministic_geometric_sequential_prefix",
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"root_pose": root_pose,
"movements": movements,
"link_poses": link_world_poses(link_infos, state),
"stage_reports": stage_reports,
"markers": markers_out,
}
# =============================================================================
# Main
# =============================================================================
def main() -> None:
parser = argparse.ArgumentParser(
description="Sequential deterministic geometric robot-state estimation from optimized ArUco marker positions"
)
parser.add_argument("optimized_markers", help="aruco_positions_optimized*.json")
parser.add_argument("-robot", "--robot", required=True, help="robot.json")
parser.add_argument("-out", "--out", default=None, help="Output robot_state.json path")
parser.add_argument("--prefixStart", type=int, default=None, help="Override state_pose_params start")
parser.add_argument("--prefixFinal", type=int, default=None, help="Override state_pose_params final")
parser.add_argument("--passes", type=int, default=None, help="Override geometric passes per stage")
args = parser.parse_args()
print("[STEP 1] Load robot.json and optimized marker positions...")
robot_data = load_json(args.robot)
length_scale = get_length_scale(robot_data)
params = load_state_pose_params(robot_data)
if args.prefixStart is not None:
params.numbers_of_elements_to_consider_start = max(1, int(args.prefixStart))
if args.prefixFinal is not None:
params.numbers_of_elements_to_consider_final = max(1, int(args.prefixFinal))
if args.passes is not None:
params.geometric_passes_per_stage = max(1, int(args.passes))
link_infos, marker_by_id, issues = parse_robot(robot_data, length_scale)
observed_map = load_observed_markers(args.optimized_markers)
for issue in issues:
print(issue)
print(f"[INFO] Links: {len(link_infos)}")
print(f"[INFO] Known robot markers: {len(marker_by_id)}")
print(f"[INFO] Observed optimized markers: {len(observed_map)}")
print(
f"[INFO] state_pose_params: start={params.numbers_of_elements_to_consider_start}, "
f"final={params.numbers_of_elements_to_consider_final}, "
f"passes={params.geometric_passes_per_stage}, "
f"normal_flip_tiebreak={params.use_marker_normals_flip_tiebreak}"
)
print("[STEP 2] Sequential geometric reconstruction...")
state, stage_reports = sequential_geometric_estimation(link_infos, observed_map, params)
out_data = state_to_json(state, link_infos, stage_reports, observed_map)
out_dir = os.path.dirname(resolve_path(args.out)) if args.out else os.path.dirname(resolve_path(args.optimized_markers))
if not out_dir:
out_dir = "."
os.makedirs(out_dir, exist_ok=True)
out_file = args.out or os.path.join(out_dir, "robot_state.json")
save_json(out_file, out_data)
print(f"[INFO] Saved robot state to {out_file}")
# Compact summary of the resulting movements
print("\n[INFO] Estimated movements:")
for key, info in out_data["movements"].items():
if "value_mm" in info:
print(f" {key}: {info['value_mm']:.3f} mm ({info['link']}, {info['joint_type']})")
elif "value_deg" in info:
print(f" {key}: {info['value_deg']:.3f} deg ({info['link']}, {info['joint_type']})")
else:
print(f" {key}: {info.get('value', 0.0):.6f} ({info.get('link', 'unknown')}, {info.get('joint_type', 'unknown')})")
if __name__ == "__main__":
main()