1218 lines
44 KiB
Python
1218 lines
44 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
4_robotState_estimation_v6.py
|
|
|
|
Deterministic geometric robot-state estimation from optimized 3D ArUco marker positions.
|
|
|
|
Mathematical idea
|
|
-----------------
|
|
This script does NOT run a generic minimizer by default. It estimates the robot
|
|
state q with closed-form geometric rules and sequential prefix expansion.
|
|
|
|
Input:
|
|
aruco_positions_optimized*.json
|
|
robot.json
|
|
|
|
Core geometric steps
|
|
--------------------
|
|
1) Root pose estimation
|
|
The root link pose (Board) is estimated from its observed markers using a
|
|
weighted Kabsch fit:
|
|
R_root, t_root = argmin || R P_i + t - Q_i ||^2
|
|
with weights driven by marker size.
|
|
|
|
2) Linear joints / sliders
|
|
For a prismatic joint along axis a, the joint variable q is estimated by
|
|
weighted averaging of the projection residuals:
|
|
q = mean_i w_i * dot(a_world, p_obs_i - p_pred_i(q=0)) / mean_i w_i
|
|
This is a closed-form geometry update, not a numeric optimizer.
|
|
|
|
3) Revolute joints
|
|
For a revolute joint around axis a, the joint angle is estimated by
|
|
projecting the marker vectors into the plane orthogonal to a and solving a
|
|
2D weighted rotation alignment:
|
|
theta = atan2( sum_i w_i * cross2(u_i, v_i),
|
|
sum_i w_i * dot2(u_i, v_i) )
|
|
where u_i are predicted vectors (q=0) and v_i are observed vectors.
|
|
|
|
To handle 180° flip ambiguities, the script optionally compares theta and
|
|
theta + pi and uses a weak normal-based tie-break. Marker normals from
|
|
robot.json are used as a geometric hint, not as hard image evidence.
|
|
|
|
4) Sequential prefix expansion
|
|
The chain is estimated link-by-link from the root outwards. Each stage only
|
|
activates the first N links (controlled by robot.json::state_pose_params),
|
|
re-estimates the active prefix, and keeps the previous prefix as the start
|
|
of the next stage.
|
|
|
|
This is intentionally designed as a fast, deterministic geometric initializer
|
|
for later refinement, not as a full bundle-adjustment solver.
|
|
|
|
Notes
|
|
-----
|
|
- The script uses the robot hierarchy, joint axes, joint origins, and marker
|
|
local positions from robot.json.
|
|
- No least-squares minimizer is used in the default path.
|
|
- Solver-related flags may be present in robot.json for compatibility but are
|
|
not used here.
|
|
|
|
Dependencies
|
|
------------
|
|
numpy only
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
import os
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import numpy as np
|
|
|
|
|
|
# =============================================================================
|
|
# Path / JSON helpers
|
|
# =============================================================================
|
|
|
|
def resolve_path(path: str) -> str:
|
|
path = os.path.expanduser(path)
|
|
if os.path.isabs(path):
|
|
return path
|
|
return os.path.abspath(path)
|
|
|
|
|
|
def load_json(path: str) -> Any:
|
|
with open(resolve_path(path), "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_json(path: str, data: Any) -> None:
|
|
with open(resolve_path(path), "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
# =============================================================================
|
|
# Small geometry helpers
|
|
# =============================================================================
|
|
|
|
def safe_norm(v: np.ndarray, eps: float = 1e-12) -> float:
|
|
return float(np.linalg.norm(v) + eps)
|
|
|
|
|
|
def normalize(v: np.ndarray, eps: float = 1e-12) -> np.ndarray:
|
|
v = np.asarray(v, dtype=np.float64)
|
|
return v / safe_norm(v, eps)
|
|
|
|
|
|
def wrap_angle_rad(a: float) -> float:
|
|
return (a + math.pi) % (2.0 * math.pi) - math.pi
|
|
|
|
|
|
def get_length_scale(robot_data: Dict[str, Any]) -> float:
|
|
units = robot_data.get("units", {}) or {}
|
|
length_unit = str(units.get("length", "")).strip().lower()
|
|
if length_unit in ("mm", "millimeter", "millimeters"):
|
|
return 1.0 / 1000.0
|
|
if length_unit in ("cm", "centimeter", "centimeters"):
|
|
return 1.0 / 100.0
|
|
return 1.0
|
|
|
|
|
|
def rotation_matrix_xyz(rx: float, ry: float, rz: float, degrees: bool = False) -> np.ndarray:
|
|
"""Rotation order: X then Y then Z."""
|
|
if degrees:
|
|
rx, ry, rz = math.radians(rx), math.radians(ry), math.radians(rz)
|
|
|
|
cx, sx = math.cos(rx), math.sin(rx)
|
|
cy, sy = math.cos(ry), math.sin(ry)
|
|
cz, sz = math.cos(rz), math.sin(rz)
|
|
|
|
rx_m = np.array([[1.0, 0.0, 0.0],
|
|
[0.0, cx, -sx],
|
|
[0.0, sx, cx]], dtype=np.float64)
|
|
ry_m = np.array([[cy, 0.0, sy],
|
|
[0.0, 1.0, 0.0],
|
|
[-sy, 0.0, cy]], dtype=np.float64)
|
|
rz_m = np.array([[cz, -sz, 0.0],
|
|
[sz, cz, 0.0],
|
|
[0.0, 0.0, 1.0]], dtype=np.float64)
|
|
return rz_m @ ry_m @ rx_m
|
|
|
|
|
|
def axis_angle_rotation(axis: np.ndarray, angle_rad: float) -> np.ndarray:
|
|
axis = normalize(axis)
|
|
x, y, z = axis
|
|
c = math.cos(angle_rad)
|
|
s = math.sin(angle_rad)
|
|
C = 1.0 - c
|
|
return np.array([
|
|
[c + x * x * C, x * y * C - z * s, x * z * C + y * s],
|
|
[y * x * C + z * s, c + y * y * C, y * z * C - x * s],
|
|
[z * x * C - y * s, z * y * C + x * s, c + z * z * C],
|
|
], dtype=np.float64)
|
|
|
|
|
|
def make_transform(R: Optional[np.ndarray] = None, t: Optional[np.ndarray] = None) -> np.ndarray:
|
|
T = np.eye(4, dtype=np.float64)
|
|
if R is not None:
|
|
T[:3, :3] = np.asarray(R, dtype=np.float64)
|
|
if t is not None:
|
|
T[:3, 3] = np.asarray(t, dtype=np.float64).reshape(3)
|
|
return T
|
|
|
|
|
|
def transform_point(T: np.ndarray, p: np.ndarray) -> np.ndarray:
|
|
p4 = np.ones(4, dtype=np.float64)
|
|
p4[:3] = np.asarray(p, dtype=np.float64).reshape(3)
|
|
return (T @ p4)[:3]
|
|
|
|
|
|
def transform_vector(T: np.ndarray, v: np.ndarray) -> np.ndarray:
|
|
return T[:3, :3] @ np.asarray(v, dtype=np.float64).reshape(3)
|
|
|
|
|
|
def kabsch(P: np.ndarray, Q: np.ndarray, W: Optional[np.ndarray] = None) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""Weighted Kabsch fit: find R, t so that R @ P + t ≈ Q."""
|
|
P = np.asarray(P, dtype=np.float64)
|
|
Q = np.asarray(Q, dtype=np.float64)
|
|
assert P.shape == Q.shape and P.shape[1] == 3
|
|
|
|
if W is None:
|
|
W = np.ones(len(P), dtype=np.float64)
|
|
W = np.asarray(W, dtype=np.float64).reshape(-1)
|
|
W = np.maximum(W, 1e-12)
|
|
W = W / np.sum(W)
|
|
|
|
p_cent = np.sum(P * W[:, None], axis=0)
|
|
q_cent = np.sum(Q * W[:, None], axis=0)
|
|
|
|
P0 = P - p_cent
|
|
Q0 = Q - q_cent
|
|
H = (P0 * W[:, None]).T @ Q0
|
|
|
|
U, S, Vt = np.linalg.svd(H)
|
|
R = Vt.T @ U.T
|
|
if np.linalg.det(R) < 0:
|
|
Vt[-1, :] *= -1.0
|
|
R = Vt.T @ U.T
|
|
t = q_cent - R @ p_cent
|
|
return R, t
|
|
|
|
|
|
def principal_axis_id(axis: np.ndarray, threshold: float = 0.95) -> Optional[int]:
|
|
a = normalize(np.asarray(axis, dtype=np.float64))
|
|
idx = int(np.argmax(np.abs(a)))
|
|
if abs(a[idx]) >= threshold:
|
|
return idx
|
|
return None
|
|
|
|
|
|
def axis_unit_from_id(axis_id: int, sign: float = 1.0) -> np.ndarray:
|
|
v = np.zeros(3, dtype=np.float64)
|
|
v[axis_id] = 1.0 if sign >= 0 else -1.0
|
|
return v
|
|
|
|
|
|
def plane_basis_for_axis(axis: np.ndarray) -> Tuple[np.ndarray, np.ndarray, int]:
|
|
"""
|
|
Return a 2D basis (e1, e2) spanning the plane orthogonal to axis.
|
|
Also return the principal axis index if axis is close to x/y/z.
|
|
"""
|
|
a = normalize(axis)
|
|
axis_id = int(np.argmax(np.abs(a)))
|
|
|
|
if axis_id == 0: # x -> yz-plane
|
|
e1 = np.array([0.0, 1.0, 0.0], dtype=np.float64)
|
|
e2 = np.array([0.0, 0.0, 1.0], dtype=np.float64)
|
|
elif axis_id == 1: # y -> zx-plane
|
|
e1 = np.array([0.0, 0.0, 1.0], dtype=np.float64)
|
|
e2 = np.array([1.0, 0.0, 0.0], dtype=np.float64)
|
|
else: # z -> xy-plane
|
|
e1 = np.array([1.0, 0.0, 0.0], dtype=np.float64)
|
|
e2 = np.array([0.0, 1.0, 0.0], dtype=np.float64)
|
|
|
|
return e1, e2, axis_id
|
|
|
|
|
|
def project_to_plane(v: np.ndarray, axis: np.ndarray) -> np.ndarray:
|
|
axis = normalize(axis)
|
|
return np.asarray(v, dtype=np.float64) - np.dot(v, axis) * axis
|
|
|
|
|
|
def weighted_mean(values: List[float], weights: List[float]) -> float:
|
|
if not values or not weights:
|
|
return 0.0
|
|
w = np.asarray(weights, dtype=np.float64)
|
|
v = np.asarray(values, dtype=np.float64)
|
|
s = float(np.sum(w))
|
|
if s <= 1e-12:
|
|
return float(np.mean(v))
|
|
return float(np.sum(v * w) / s)
|
|
|
|
|
|
def marker_quality_weight(marker: "MarkerInfo", ref_size: float = 25.0) -> float:
|
|
w = 1.0
|
|
if marker.size is not None:
|
|
try:
|
|
size = float(marker.size)
|
|
if size > 0:
|
|
w *= max(0.35, math.sqrt(size / ref_size))
|
|
except Exception:
|
|
pass
|
|
return float(w)
|
|
|
|
|
|
def normal_flip_bias(marker: "MarkerInfo", world_up: np.ndarray = np.array([0.0, 0.0, 1.0], dtype=np.float64)) -> float:
|
|
"""
|
|
Weak bias for top/bottom flip discrimination.
|
|
Only markers with a strong ±z local normal contribute.
|
|
"""
|
|
if marker.local_normal is None:
|
|
return 0.0
|
|
n = normalize(marker.local_normal)
|
|
if abs(n[2]) < 0.5:
|
|
return 0.0
|
|
pref = 1.0 if n[2] >= 0 else -1.0
|
|
# Positive score means "prefer world_up alignment" for top markers,
|
|
# negative score means "prefer world_down alignment" for bottom markers.
|
|
return pref
|
|
|
|
|
|
# =============================================================================
|
|
# Robot structures
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class MarkerInfo:
|
|
marker_id: int
|
|
name: str
|
|
local_pos_m: np.ndarray
|
|
local_normal: np.ndarray
|
|
size: Optional[float]
|
|
spin: Optional[float]
|
|
link_name: str
|
|
|
|
|
|
@dataclass
|
|
class LinkInfo:
|
|
name: str
|
|
parent: Optional[str]
|
|
joint_type: str
|
|
joint_axis: Optional[np.ndarray]
|
|
joint_variable: Optional[str]
|
|
joint_origin_m: np.ndarray
|
|
joint_rotation_deg: np.ndarray
|
|
mount_position_m: np.ndarray
|
|
mount_rotation_deg: np.ndarray
|
|
markers: List[MarkerInfo]
|
|
|
|
|
|
@dataclass
|
|
class StatePoseParams:
|
|
numbers_of_elements_to_consider_start: int = 4
|
|
numbers_of_elements_to_consider_final: int = 5
|
|
geometric_passes_per_stage: int = 2
|
|
root_pose_min_markers: int = 2
|
|
use_marker_normals_flip_tiebreak: bool = True
|
|
normal_flip_weight: float = 0.05
|
|
|
|
|
|
def load_state_pose_params(robot_data: Dict[str, Any]) -> StatePoseParams:
|
|
raw = robot_data.get("state_pose_params", {}) or {}
|
|
return StatePoseParams(
|
|
numbers_of_elements_to_consider_start=max(1, int(raw.get("numbers_of_Elements_to_consider_start", 4))),
|
|
numbers_of_elements_to_consider_final=max(1, int(raw.get("numbers_of_Elements_to_consider_final", 5))),
|
|
geometric_passes_per_stage=max(1, int(raw.get("geometric_passes_per_stage", 2))),
|
|
root_pose_min_markers=max(1, int(raw.get("root_pose_min_markers", 2))),
|
|
use_marker_normals_flip_tiebreak=bool(raw.get("use_marker_normals_flip_tiebreak", True)),
|
|
normal_flip_weight=float(raw.get("normal_flip_weight", 0.05)),
|
|
)
|
|
|
|
|
|
def parse_robot(robot_data: Dict[str, Any], length_scale: float) -> Tuple[Dict[str, LinkInfo], Dict[int, str], List[str]]:
|
|
links = robot_data.get("links", {}) or {}
|
|
link_infos: Dict[str, LinkInfo] = {}
|
|
marker_by_id: Dict[int, str] = {}
|
|
issues: List[str] = []
|
|
|
|
for link_name, link_data in links.items():
|
|
parent = link_data.get("parent", None)
|
|
|
|
joint = link_data.get("jointToParent", {}) or {}
|
|
joint_type = str(joint.get("type", "")).strip().lower()
|
|
axis = joint.get("axis", None)
|
|
joint_axis = None
|
|
if axis is not None:
|
|
axis_arr = np.asarray(axis, dtype=np.float64)
|
|
if safe_norm(axis_arr) > 1e-12:
|
|
joint_axis = normalize(axis_arr)
|
|
|
|
joint_variable = joint.get("variable", None)
|
|
joint_origin_m = np.asarray(joint.get("origin", [0, 0, 0]), dtype=np.float64) * float(length_scale)
|
|
joint_rotation_deg = np.asarray(joint.get("rotation", [0, 0, 0]), dtype=np.float64)
|
|
|
|
mount_position_m = np.asarray(link_data.get("mountPosition", [0, 0, 0]), dtype=np.float64) * float(length_scale)
|
|
mount_rotation_deg = np.asarray(link_data.get("mountRotation", [0, 0, 0]), dtype=np.float64)
|
|
|
|
markers: List[MarkerInfo] = []
|
|
seen_local: set[int] = set()
|
|
for idx, marker in enumerate(link_data.get("markers", []) or []):
|
|
marker_id = int(marker.get("id", -1))
|
|
pos = marker.get("position", None)
|
|
if marker_id < 0 or pos is None or len(pos) != 3:
|
|
issues.append(f"[WARN] link={link_name}: skipped invalid marker entry at index {idx}")
|
|
continue
|
|
if marker_id in seen_local:
|
|
issues.append(f"[WARN] link={link_name}: duplicate marker id {marker_id} inside same link -> skipped")
|
|
continue
|
|
if marker_id in marker_by_id:
|
|
issues.append(
|
|
f"[WARN] marker id {marker_id} already assigned to link '{marker_by_id[marker_id]}', "
|
|
f"duplicate in link '{link_name}' -> skipped"
|
|
)
|
|
continue
|
|
|
|
seen_local.add(marker_id)
|
|
marker_by_id[marker_id] = link_name
|
|
markers.append(
|
|
MarkerInfo(
|
|
marker_id=marker_id,
|
|
name=str(marker.get("name", f"aruco_{marker_id}")),
|
|
local_pos_m=np.asarray(pos, dtype=np.float64) * float(length_scale),
|
|
local_normal=normalize(np.asarray(marker.get("normal", [0, 0, 1]), dtype=np.float64)),
|
|
size=marker.get("size", None),
|
|
spin=marker.get("spin", None),
|
|
link_name=link_name,
|
|
)
|
|
)
|
|
|
|
link_infos[link_name] = LinkInfo(
|
|
name=link_name,
|
|
parent=parent,
|
|
joint_type=joint_type,
|
|
joint_axis=joint_axis,
|
|
joint_variable=joint_variable,
|
|
joint_origin_m=joint_origin_m,
|
|
joint_rotation_deg=joint_rotation_deg,
|
|
mount_position_m=mount_position_m,
|
|
mount_rotation_deg=mount_rotation_deg,
|
|
markers=markers,
|
|
)
|
|
|
|
return link_infos, marker_by_id, issues
|
|
|
|
|
|
def topological_links(link_infos: Dict[str, LinkInfo]) -> List[str]:
|
|
roots = [ln for ln, li in link_infos.items() if li.parent is None]
|
|
if not roots:
|
|
return []
|
|
root = roots[0]
|
|
|
|
children: Dict[str, List[str]] = {ln: [] for ln in link_infos}
|
|
for ln, li in link_infos.items():
|
|
if li.parent is not None and li.parent in children:
|
|
children[li.parent].append(ln)
|
|
|
|
order: List[str] = []
|
|
queue = [root]
|
|
while queue:
|
|
cur = queue.pop(0)
|
|
if cur in order:
|
|
continue
|
|
order.append(cur)
|
|
for ch in children.get(cur, []):
|
|
queue.append(ch)
|
|
|
|
# Append disconnected subtrees if any.
|
|
for ln in link_infos:
|
|
if ln not in order:
|
|
order.append(ln)
|
|
|
|
return order
|
|
|
|
|
|
def compute_children_map(link_infos: Dict[str, LinkInfo]) -> Dict[str, List[str]]:
|
|
children: Dict[str, List[str]] = {ln: [] for ln in link_infos}
|
|
for ln, li in link_infos.items():
|
|
if li.parent is not None and li.parent in children:
|
|
children[li.parent].append(ln)
|
|
return children
|
|
|
|
|
|
def subtree_links(link_name: str, children_map: Dict[str, List[str]], active_links: set[str]) -> List[str]:
|
|
out: List[str] = []
|
|
queue = [link_name]
|
|
while queue:
|
|
cur = queue.pop(0)
|
|
if cur not in active_links:
|
|
continue
|
|
out.append(cur)
|
|
for ch in children_map.get(cur, []):
|
|
if ch in active_links:
|
|
queue.append(ch)
|
|
return out
|
|
|
|
|
|
def marker_ids_in_links(link_infos: Dict[str, LinkInfo], links: List[str]) -> List[int]:
|
|
ids: List[int] = []
|
|
for link_name in links:
|
|
for m in link_infos[link_name].markers:
|
|
ids.append(m.marker_id)
|
|
return ids
|
|
|
|
|
|
# =============================================================================
|
|
# Observations
|
|
# =============================================================================
|
|
|
|
def load_observed_markers(optimized_markers_file: str) -> Dict[int, Dict[str, Any]]:
|
|
data = load_json(optimized_markers_file)
|
|
if isinstance(data, dict):
|
|
markers = data.get("markers", []) or []
|
|
elif isinstance(data, list):
|
|
markers = data
|
|
else:
|
|
markers = []
|
|
|
|
observed: Dict[int, Dict[str, Any]] = {}
|
|
for m in markers:
|
|
if not isinstance(m, dict) or "marker_id" not in m:
|
|
continue
|
|
marker_id = int(m["marker_id"])
|
|
pos = m.get("position_m", None)
|
|
if pos is None or len(pos) != 3:
|
|
continue
|
|
obs = dict(m)
|
|
obs["marker_id"] = marker_id
|
|
obs["position_m"] = np.asarray(pos, dtype=np.float64)
|
|
observed[marker_id] = obs
|
|
return observed
|
|
|
|
|
|
def active_observations_for_links(
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
link_infos: Dict[str, LinkInfo],
|
|
active_links: set[str],
|
|
) -> Dict[int, Dict[str, Any]]:
|
|
out: Dict[int, Dict[str, Any]] = {}
|
|
for marker_id, obs in observed_markers.items():
|
|
link = obs.get("link", None)
|
|
if link is None or link == "unknown":
|
|
continue
|
|
if link not in active_links:
|
|
continue
|
|
if link not in link_infos:
|
|
continue
|
|
out[marker_id] = obs
|
|
return out
|
|
|
|
|
|
# =============================================================================
|
|
# Forward kinematics
|
|
# =============================================================================
|
|
|
|
def link_static_transform(link: LinkInfo) -> np.ndarray:
|
|
Rm = rotation_matrix_xyz(*link.mount_rotation_deg, degrees=True)
|
|
return make_transform(Rm, link.mount_position_m)
|
|
|
|
|
|
def joint_motion_transform(link: LinkInfo, q_value: float) -> np.ndarray:
|
|
joint_type = (link.joint_type or "").strip().lower()
|
|
if link.joint_axis is None:
|
|
return np.eye(4, dtype=np.float64)
|
|
|
|
axis = normalize(link.joint_axis)
|
|
if joint_type == "linear":
|
|
return make_transform(np.eye(3), axis * float(q_value))
|
|
if joint_type == "revolute":
|
|
return make_transform(axis_angle_rotation(axis, float(q_value)), np.zeros(3))
|
|
return np.eye(4, dtype=np.float64)
|
|
|
|
|
|
def world_to_link_transform(
|
|
link_name: str,
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray],
|
|
joint_override: Optional[Dict[str, float]] = None,
|
|
) -> np.ndarray:
|
|
override_items = tuple(sorted((joint_override or {}).items()))
|
|
cache_key = (link_name, override_items)
|
|
if cache_key in cache:
|
|
return cache[cache_key]
|
|
|
|
link = link_infos[link_name]
|
|
if link.parent is None:
|
|
T = make_transform(state["root_R"], state["root_t"])
|
|
cache[cache_key] = T
|
|
return T
|
|
|
|
parent_T = world_to_link_transform(link.parent, link_infos, state, cache, joint_override=joint_override)
|
|
|
|
# Parent -> joint origin
|
|
T = parent_T @ make_transform(np.eye(3), link.joint_origin_m)
|
|
|
|
# Joint motion
|
|
joint_values = state["joint_values"]
|
|
q = joint_override.get(link.name, joint_values.get(link.joint_variable, 0.0)) if joint_override else joint_values.get(link.joint_variable, 0.0)
|
|
T = T @ joint_motion_transform(link, q)
|
|
|
|
# Static rotation after the joint
|
|
R_static = rotation_matrix_xyz(*link.joint_rotation_deg, degrees=True)
|
|
T = T @ make_transform(R_static, np.zeros(3))
|
|
|
|
# Mount transform / link-local offset
|
|
T = T @ link_static_transform(link)
|
|
|
|
cache[cache_key] = T
|
|
return T
|
|
|
|
|
|
def predict_marker_positions(
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
joint_override: Optional[Dict[str, float]] = None,
|
|
active_links: Optional[set[str]] = None,
|
|
) -> Dict[int, np.ndarray]:
|
|
pred: Dict[int, np.ndarray] = {}
|
|
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
|
|
for link_name, link in link_infos.items():
|
|
if active_links is not None and link_name not in active_links:
|
|
continue
|
|
T = world_to_link_transform(link_name, link_infos, state, cache, joint_override=joint_override)
|
|
for marker in link.markers:
|
|
pred[marker.marker_id] = transform_point(T, marker.local_pos_m)
|
|
return pred
|
|
|
|
|
|
def world_joint_axis_for_link(
|
|
link_name: str,
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
) -> np.ndarray:
|
|
link = link_infos[link_name]
|
|
if link.parent is None or link.joint_axis is None:
|
|
return np.array([1.0, 0.0, 0.0], dtype=np.float64)
|
|
|
|
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
|
|
parent_T = world_to_link_transform(link.parent, link_infos, state, cache)
|
|
axis_world = transform_vector(parent_T, link.joint_axis)
|
|
return normalize(axis_world)
|
|
|
|
|
|
def world_joint_origin_for_link(
|
|
link_name: str,
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
) -> np.ndarray:
|
|
link = link_infos[link_name]
|
|
if link.parent is None:
|
|
return np.asarray(state["root_t"], dtype=np.float64).copy()
|
|
|
|
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
|
|
parent_T = world_to_link_transform(link.parent, link_infos, state, cache)
|
|
return transform_point(parent_T, link.joint_origin_m)
|
|
|
|
|
|
# =============================================================================
|
|
# Geometric estimation
|
|
# =============================================================================
|
|
|
|
def initial_root_pose(
|
|
root_link: LinkInfo,
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
min_markers: int = 2,
|
|
) -> Tuple[np.ndarray, np.ndarray, Dict[str, Any]]:
|
|
P = []
|
|
Q = []
|
|
W = []
|
|
matched = []
|
|
|
|
for marker in root_link.markers:
|
|
if marker.marker_id in observed_markers:
|
|
obs = np.asarray(observed_markers[marker.marker_id]["position_m"], dtype=np.float64)
|
|
P.append(marker.local_pos_m)
|
|
Q.append(obs)
|
|
W.append(marker_quality_weight(marker))
|
|
matched.append(marker.marker_id)
|
|
|
|
if len(P) >= min_markers:
|
|
R, t = kabsch(np.asarray(P), np.asarray(Q), np.asarray(W))
|
|
return R, t, {"reason": "kabsch", "used_markers": matched}
|
|
|
|
if len(P) >= 1:
|
|
marker = root_link.markers[0]
|
|
obs = np.asarray(observed_markers[marker.marker_id]["position_m"], dtype=np.float64)
|
|
return np.eye(3, dtype=np.float64), obs - marker.local_pos_m, {"reason": "single_marker_fallback", "used_markers": matched}
|
|
|
|
return np.eye(3, dtype=np.float64), np.zeros(3, dtype=np.float64), {"reason": "no_root_markers"}
|
|
|
|
|
|
def estimate_linear_joint_value(
|
|
link_name: str,
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
active_links: set[str],
|
|
children_map: Dict[str, List[str]],
|
|
) -> Tuple[float, Dict[str, Any]]:
|
|
link = link_infos[link_name]
|
|
if link.joint_variable is None:
|
|
return 0.0, {"reason": "no_joint_variable"}
|
|
|
|
subtree = subtree_links(link_name, children_map, active_links)
|
|
obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers]
|
|
if not obs_ids:
|
|
return float(state["joint_values"].get(link.joint_variable, 0.0)), {"reason": "no_observations"}
|
|
|
|
axis_world = world_joint_axis_for_link(link_name, link_infos, state)
|
|
|
|
pred0 = predict_marker_positions(link_infos, state, joint_override={link_name: 0.0}, active_links=active_links)
|
|
|
|
num = 0.0
|
|
den = 0.0
|
|
per_marker = []
|
|
for mid in obs_ids:
|
|
if mid not in pred0:
|
|
continue
|
|
marker = next((m for m in link_infos[observed_markers[mid]["link"]].markers if m.marker_id == mid), None)
|
|
w = marker_quality_weight(marker) if marker is not None else 1.0
|
|
p_obs = np.asarray(observed_markers[mid]["position_m"], dtype=np.float64)
|
|
p0 = pred0[mid]
|
|
q_i = float(np.dot(axis_world, p_obs - p0))
|
|
num += w * q_i
|
|
den += w
|
|
per_marker.append({"marker_id": mid, "q_i": q_i, "weight": w})
|
|
|
|
if den <= 1e-12:
|
|
return float(state["joint_values"].get(link.joint_variable, 0.0)), {"reason": "zero_weight"}
|
|
|
|
q = num / den
|
|
return float(q), {
|
|
"reason": "weighted_projection",
|
|
"used_markers": len(per_marker),
|
|
"axis_world": [float(x) for x in axis_world],
|
|
"per_marker": per_marker,
|
|
}
|
|
|
|
|
|
def revolute_score_with_normals(
|
|
link_name: str,
|
|
q_value: float,
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
active_links: set[str],
|
|
) -> float:
|
|
"""
|
|
Weak tiebreak for q vs q + pi using marker normals.
|
|
Lower is better.
|
|
"""
|
|
link = link_infos[link_name]
|
|
if not link.markers:
|
|
return 0.0
|
|
|
|
pred = predict_marker_positions(link_infos, state, joint_override={link_name: q_value}, active_links=active_links)
|
|
T_link = world_to_link_transform(link_name, link_infos, state, cache={}, joint_override={link_name: q_value})
|
|
R_link = T_link[:3, :3]
|
|
up = np.array([0.0, 0.0, 1.0], dtype=np.float64)
|
|
|
|
score = 0.0
|
|
for marker in link.markers:
|
|
if marker.marker_id not in observed_markers:
|
|
continue
|
|
if marker.local_normal is None:
|
|
continue
|
|
n_local = normalize(marker.local_normal)
|
|
if abs(n_local[2]) < 0.5:
|
|
continue
|
|
n_world = normalize(R_link @ n_local)
|
|
sign_pref = 1.0 if n_local[2] >= 0 else -1.0
|
|
score += marker_quality_weight(marker) * (1.0 - sign_pref * float(np.dot(n_world, up)))
|
|
|
|
return float(score)
|
|
|
|
|
|
def estimate_revolute_joint_value(
|
|
link_name: str,
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
active_links: set[str],
|
|
children_map: Dict[str, List[str]],
|
|
use_normal_tiebreak: bool = True,
|
|
normal_weight: float = 0.05,
|
|
) -> Tuple[float, Dict[str, Any]]:
|
|
link = link_infos[link_name]
|
|
if link.joint_variable is None:
|
|
return 0.0, {"reason": "no_joint_variable"}
|
|
|
|
subtree = subtree_links(link_name, children_map, active_links)
|
|
obs_ids = [mid for mid in marker_ids_in_links(link_infos, subtree) if mid in observed_markers]
|
|
if not obs_ids:
|
|
return float(state["joint_values"].get(link.joint_variable, 0.0)), {"reason": "no_observations"}
|
|
|
|
q0 = float(state["joint_values"].get(link.joint_variable, 0.0))
|
|
|
|
# Prediction with joint set to zero (only upstream geometry active).
|
|
pred0 = predict_marker_positions(link_infos, state, joint_override={link_name: 0.0}, active_links=active_links)
|
|
origin_world = world_joint_origin_for_link(link_name, link_infos, state)
|
|
axis_world = world_joint_axis_for_link(link_name, link_infos, state)
|
|
e1, e2, axis_id = plane_basis_for_axis(axis_world)
|
|
|
|
S = 0.0
|
|
C = 0.0
|
|
used = 0
|
|
per_marker = []
|
|
|
|
for mid in obs_ids:
|
|
if mid not in pred0:
|
|
continue
|
|
marker = next((m for m in link_infos[observed_markers[mid]["link"]].markers if m.marker_id == mid), None)
|
|
w = marker_quality_weight(marker) if marker is not None else 1.0
|
|
|
|
p_obs = np.asarray(observed_markers[mid]["position_m"], dtype=np.float64)
|
|
p0 = pred0[mid]
|
|
|
|
u = project_to_plane(p0 - origin_world, axis_world)
|
|
v = project_to_plane(p_obs - origin_world, axis_world)
|
|
|
|
u2 = np.array([np.dot(u, e1), np.dot(u, e2)], dtype=np.float64)
|
|
v2 = np.array([np.dot(v, e1), np.dot(v, e2)], dtype=np.float64)
|
|
|
|
nu = float(np.linalg.norm(u2))
|
|
nv = float(np.linalg.norm(v2))
|
|
if nu <= 1e-9 or nv <= 1e-9:
|
|
continue
|
|
|
|
S += w * float(u2[0] * v2[1] - u2[1] * v2[0])
|
|
C += w * float(u2[0] * v2[0] + u2[1] * v2[1])
|
|
used += 1
|
|
per_marker.append({"marker_id": mid, "weight": w})
|
|
|
|
if used == 0:
|
|
return q0, {"reason": "no_valid_vectors"}
|
|
|
|
theta = math.atan2(S, C)
|
|
theta_alt = wrap_angle_rad(theta + math.pi)
|
|
|
|
# Evaluate both candidates deterministically, choose the better one.
|
|
def score_candidate(qcand: float) -> float:
|
|
pred = predict_marker_positions(link_infos, state, joint_override={link_name: qcand}, active_links=active_links)
|
|
err = 0.0
|
|
for mid in obs_ids:
|
|
if mid not in pred:
|
|
continue
|
|
marker = next((m for m in link_infos[observed_markers[mid]["link"]].markers if m.marker_id == mid), None)
|
|
w = marker_quality_weight(marker) if marker is not None else 1.0
|
|
p_obs = np.asarray(observed_markers[mid]["position_m"], dtype=np.float64)
|
|
d = pred[mid] - p_obs
|
|
err += w * float(np.dot(d, d))
|
|
if use_normal_tiebreak:
|
|
err += normal_weight * revolute_score_with_normals(link_name, qcand, link_infos, state, observed_markers, active_links)
|
|
return float(err)
|
|
|
|
score_theta = score_candidate(theta)
|
|
score_alt = score_candidate(theta_alt)
|
|
|
|
best_q = theta if score_theta <= score_alt else theta_alt
|
|
best_score = min(score_theta, score_alt)
|
|
|
|
return wrap_angle_rad(best_q), {
|
|
"reason": "2d_alignment" + ("+normal_tiebreak" if use_normal_tiebreak else ""),
|
|
"used_markers": used,
|
|
"axis_world": [float(x) for x in axis_world],
|
|
"axis_id": axis_id,
|
|
"theta_rad": float(theta),
|
|
"theta_alt_rad": float(theta_alt),
|
|
"score_theta": float(score_theta),
|
|
"score_theta_alt": float(score_alt),
|
|
"best_score": float(best_score),
|
|
"per_marker": per_marker,
|
|
}
|
|
|
|
|
|
def estimate_joint_geometrically(
|
|
link_name: str,
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
active_links: set[str],
|
|
children_map: Dict[str, List[str]],
|
|
params: StatePoseParams,
|
|
) -> Tuple[float, Dict[str, Any]]:
|
|
link = link_infos[link_name]
|
|
jt = (link.joint_type or "").strip().lower()
|
|
|
|
if jt == "linear":
|
|
return estimate_linear_joint_value(link_name, link_infos, state, observed_markers, active_links, children_map)
|
|
if jt == "revolute":
|
|
return estimate_revolute_joint_value(
|
|
link_name,
|
|
link_infos,
|
|
state,
|
|
observed_markers,
|
|
active_links,
|
|
children_map,
|
|
use_normal_tiebreak=params.use_marker_normals_flip_tiebreak,
|
|
normal_weight=params.normal_flip_weight,
|
|
)
|
|
|
|
return float(state["joint_values"].get(link.joint_variable, 0.0)) if link.joint_variable else 0.0, {"reason": "unsupported_joint_type"}
|
|
|
|
|
|
# =============================================================================
|
|
# Scoring / reporting
|
|
# =============================================================================
|
|
|
|
def build_state_template(link_infos: Dict[str, LinkInfo]) -> Dict[str, Any]:
|
|
return {
|
|
"root_R": np.eye(3, dtype=np.float64),
|
|
"root_t": np.zeros(3, dtype=np.float64),
|
|
"joint_values": {li.joint_variable: 0.0 for li in link_infos.values() if li.joint_variable},
|
|
}
|
|
|
|
|
|
def copy_state(state: Dict[str, Any]) -> Dict[str, Any]:
|
|
return {
|
|
"root_R": np.asarray(state["root_R"], dtype=np.float64).copy(),
|
|
"root_t": np.asarray(state["root_t"], dtype=np.float64).copy(),
|
|
"joint_values": dict(state["joint_values"]),
|
|
}
|
|
|
|
|
|
def marker_error_report(
|
|
link_infos: Dict[str, LinkInfo],
|
|
state: Dict[str, Any],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
active_links: Optional[set[str]] = None,
|
|
) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
|
|
pred = predict_marker_positions(link_infos, state, active_links=active_links)
|
|
marker_reports: List[Dict[str, Any]] = []
|
|
errors = []
|
|
|
|
for marker_id, obs in observed_markers.items():
|
|
if marker_id not in pred:
|
|
continue
|
|
p_obs = np.asarray(obs["position_m"], dtype=np.float64)
|
|
p_pred = pred[marker_id]
|
|
err = p_pred - p_obs
|
|
err_norm = float(np.linalg.norm(err))
|
|
errors.append(err_norm)
|
|
marker_reports.append(
|
|
{
|
|
"marker_id": int(marker_id),
|
|
"link": obs.get("link", "unknown"),
|
|
"error_m": [float(x) for x in err],
|
|
"error_norm_m": err_norm,
|
|
"predicted_m": [float(x) for x in p_pred],
|
|
"observed_m": [float(x) for x in p_obs],
|
|
}
|
|
)
|
|
|
|
if errors:
|
|
arr = np.asarray(errors, dtype=np.float64)
|
|
stats = {
|
|
"num_markers_used": int(len(errors)),
|
|
"mean_error_m": float(np.mean(arr)),
|
|
"rms_error_m": float(math.sqrt(np.mean(arr ** 2))),
|
|
"median_error_m": float(np.median(arr)),
|
|
"worst_error_m": float(np.max(arr)),
|
|
}
|
|
else:
|
|
stats = {
|
|
"num_markers_used": 0,
|
|
"mean_error_m": None,
|
|
"rms_error_m": None,
|
|
"median_error_m": None,
|
|
"worst_error_m": None,
|
|
}
|
|
|
|
return marker_reports, stats
|
|
|
|
|
|
def summarize_state(state: Dict[str, Any], link_infos: Dict[str, LinkInfo]) -> Dict[str, Any]:
|
|
movements: Dict[str, Any] = {}
|
|
for link_name, link in link_infos.items():
|
|
q = state["joint_values"].get(link.joint_variable, None) if link.joint_variable else None
|
|
if q is None:
|
|
continue
|
|
jt = (link.joint_type or "").strip().lower()
|
|
if jt == "linear":
|
|
movements[link.joint_variable] = {
|
|
"value_m": float(q),
|
|
"value_mm": float(q * 1000.0),
|
|
"joint_type": jt,
|
|
"link": link_name,
|
|
}
|
|
elif jt == "revolute":
|
|
movements[link.joint_variable] = {
|
|
"value_rad": float(q),
|
|
"value_deg": float(math.degrees(q)),
|
|
"joint_type": jt,
|
|
"link": link_name,
|
|
}
|
|
else:
|
|
movements[link.joint_variable] = {
|
|
"value": float(q),
|
|
"joint_type": jt,
|
|
"link": link_name,
|
|
}
|
|
|
|
root_R = np.asarray(state["root_R"], dtype=np.float64)
|
|
root_t = np.asarray(state["root_t"], dtype=np.float64)
|
|
return {
|
|
"root_pose": {
|
|
"translation_m": [float(x) for x in root_t],
|
|
"rotation_matrix": [[float(x) for x in row] for row in root_R],
|
|
"euler_xyz_deg": [float(x) for x in np.degrees(np.array([
|
|
math.atan2(root_R[2, 1], root_R[2, 2]),
|
|
math.atan2(-root_R[2, 0], math.sqrt(root_R[0, 0] ** 2 + root_R[1, 0] ** 2)),
|
|
math.atan2(root_R[1, 0], root_R[0, 0]),
|
|
]))],
|
|
},
|
|
"movements": movements,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# Sequential geometric estimation
|
|
# =============================================================================
|
|
|
|
def optimize_geometric_prefix(
|
|
link_infos: Dict[str, LinkInfo],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
initial_state: Dict[str, Any],
|
|
active_links: List[str],
|
|
params: StatePoseParams,
|
|
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
|
active_set = set(active_links)
|
|
children_map = compute_children_map(link_infos)
|
|
stage_obs = active_observations_for_links(observed_markers, link_infos, active_set)
|
|
|
|
state = copy_state(initial_state)
|
|
stage_info: Dict[str, Any] = {
|
|
"method": "deterministic_geometric_prefix",
|
|
"active_links": active_links,
|
|
"active_observations": len(stage_obs),
|
|
"joint_updates": [],
|
|
}
|
|
|
|
root_link_name = next((ln for ln in active_links if link_infos[ln].parent is None), None)
|
|
if root_link_name is not None:
|
|
root_link = link_infos[root_link_name]
|
|
root_R, root_t, root_info = initial_root_pose(root_link, stage_obs, min_markers=params.root_pose_min_markers)
|
|
state["root_R"] = root_R
|
|
state["root_t"] = root_t
|
|
stage_info["root_link"] = root_link_name
|
|
stage_info["root_pose"] = root_info
|
|
|
|
active_joint_links = [ln for ln in active_links if link_infos[ln].parent is not None]
|
|
|
|
for pass_idx in range(params.geometric_passes_per_stage):
|
|
pass_updates = []
|
|
for link_name in active_joint_links:
|
|
link = link_infos[link_name]
|
|
if not link.joint_variable:
|
|
continue
|
|
q_old = float(state["joint_values"].get(link.joint_variable, 0.0))
|
|
q_new, info = estimate_joint_geometrically(
|
|
link_name,
|
|
link_infos,
|
|
state,
|
|
stage_obs,
|
|
active_set,
|
|
children_map,
|
|
params,
|
|
)
|
|
state["joint_values"][link.joint_variable] = q_new
|
|
pass_updates.append(
|
|
{
|
|
"link": link_name,
|
|
"joint_variable": link.joint_variable,
|
|
"joint_type": link.joint_type,
|
|
"old": q_old,
|
|
"new": q_new,
|
|
"info": info,
|
|
}
|
|
)
|
|
stage_info["joint_updates"].append({"pass": pass_idx, "updates": pass_updates})
|
|
|
|
marker_reports, stats = marker_error_report(link_infos, state, stage_obs, active_links=active_set)
|
|
stage_info["marker_stats"] = stats
|
|
stage_info["marker_reports"] = marker_reports[:]
|
|
|
|
return state, stage_info
|
|
|
|
|
|
def sequential_geometric_estimation(
|
|
link_infos: Dict[str, LinkInfo],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
params: StatePoseParams,
|
|
) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
|
|
ordered = topological_links(link_infos)
|
|
if not ordered:
|
|
return build_state_template(link_infos), []
|
|
|
|
start_n = max(1, min(int(params.numbers_of_elements_to_consider_start), len(ordered)))
|
|
final_n = max(start_n, min(int(params.numbers_of_elements_to_consider_final), len(ordered)))
|
|
|
|
state = build_state_template(link_infos)
|
|
stage_reports: List[Dict[str, Any]] = []
|
|
|
|
for n in range(start_n, final_n + 1):
|
|
active_links = ordered[:n]
|
|
state, stage_info = optimize_geometric_prefix(link_infos, observed_markers, state, active_links, params)
|
|
stage_info["stage_idx"] = len(stage_reports)
|
|
stage_info["num_active_links"] = len(active_links)
|
|
stage_info["active_links"] = active_links
|
|
|
|
stage_obs = active_observations_for_links(observed_markers, link_infos, set(active_links))
|
|
marker_reports, stats = marker_error_report(link_infos, state, stage_obs, active_links=set(active_links))
|
|
stage_info["marker_stats"] = stats
|
|
stage_info["marker_reports"] = marker_reports
|
|
stage_reports.append(stage_info)
|
|
|
|
mean_mm = None if stats["mean_error_m"] is None else stats["mean_error_m"] * 1000.0
|
|
print(
|
|
f"[INFO] Stage {stage_info['stage_idx']} | links={len(active_links)} | "
|
|
f"markers_used={stats['num_markers_used']} | "
|
|
f"mean_error={'n/a' if mean_mm is None else f'{mean_mm:.2f} mm'}"
|
|
)
|
|
|
|
return state, stage_reports
|
|
|
|
|
|
# =============================================================================
|
|
# Serialization
|
|
# =============================================================================
|
|
|
|
def link_world_poses(link_infos: Dict[str, LinkInfo], state: Dict[str, Any]) -> Dict[str, Any]:
|
|
poses: Dict[str, Any] = {}
|
|
cache: Dict[Tuple[str, Tuple[Tuple[str, float], ...]], np.ndarray] = {}
|
|
for link_name in link_infos:
|
|
T = world_to_link_transform(link_name, link_infos, state, cache)
|
|
R = T[:3, :3]
|
|
t = T[:3, 3]
|
|
poses[link_name] = {
|
|
"translation_m": [float(x) for x in t],
|
|
"rotation_matrix": [[float(x) for x in row] for row in R],
|
|
}
|
|
return poses
|
|
|
|
|
|
def state_to_json(
|
|
state: Dict[str, Any],
|
|
link_infos: Dict[str, LinkInfo],
|
|
stage_reports: List[Dict[str, Any]],
|
|
observed_markers: Dict[int, Dict[str, Any]],
|
|
) -> Dict[str, Any]:
|
|
movements = summarize_state(state, link_infos)["movements"]
|
|
root_pose = summarize_state(state, link_infos)["root_pose"]
|
|
predicted = predict_marker_positions(link_infos, state)
|
|
|
|
markers_out = []
|
|
for marker_id, obs in sorted(observed_markers.items()):
|
|
entry = {
|
|
"marker_id": int(marker_id),
|
|
"link": obs.get("link", "unknown"),
|
|
"observed_position_m": [float(x) for x in np.asarray(obs["position_m"], dtype=np.float64)],
|
|
}
|
|
if marker_id in predicted:
|
|
entry["predicted_position_m"] = [float(x) for x in predicted[marker_id]]
|
|
err = predicted[marker_id] - np.asarray(obs["position_m"], dtype=np.float64)
|
|
entry["error_m"] = [float(x) for x in err]
|
|
entry["error_norm_m"] = float(np.linalg.norm(err))
|
|
markers_out.append(entry)
|
|
|
|
return {
|
|
"schema_version": "1.0",
|
|
"method": "deterministic_geometric_sequential_prefix",
|
|
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
|
"root_pose": root_pose,
|
|
"movements": movements,
|
|
"link_poses": link_world_poses(link_infos, state),
|
|
"stage_reports": stage_reports,
|
|
"markers": markers_out,
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# Main
|
|
# =============================================================================
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Sequential deterministic geometric robot-state estimation from optimized ArUco marker positions"
|
|
)
|
|
parser.add_argument("optimized_markers", help="aruco_positions_optimized*.json")
|
|
parser.add_argument("-robot", "--robot", required=True, help="robot.json")
|
|
parser.add_argument("-out", "--out", default=None, help="Output robot_state.json path")
|
|
parser.add_argument("--prefixStart", type=int, default=None, help="Override state_pose_params start")
|
|
parser.add_argument("--prefixFinal", type=int, default=None, help="Override state_pose_params final")
|
|
parser.add_argument("--passes", type=int, default=None, help="Override geometric passes per stage")
|
|
|
|
args = parser.parse_args()
|
|
|
|
print("[STEP 1] Load robot.json and optimized marker positions...")
|
|
robot_data = load_json(args.robot)
|
|
length_scale = get_length_scale(robot_data)
|
|
params = load_state_pose_params(robot_data)
|
|
|
|
if args.prefixStart is not None:
|
|
params.numbers_of_elements_to_consider_start = max(1, int(args.prefixStart))
|
|
if args.prefixFinal is not None:
|
|
params.numbers_of_elements_to_consider_final = max(1, int(args.prefixFinal))
|
|
if args.passes is not None:
|
|
params.geometric_passes_per_stage = max(1, int(args.passes))
|
|
|
|
link_infos, marker_by_id, issues = parse_robot(robot_data, length_scale)
|
|
observed_map = load_observed_markers(args.optimized_markers)
|
|
|
|
for issue in issues:
|
|
print(issue)
|
|
|
|
print(f"[INFO] Links: {len(link_infos)}")
|
|
print(f"[INFO] Known robot markers: {len(marker_by_id)}")
|
|
print(f"[INFO] Observed optimized markers: {len(observed_map)}")
|
|
print(
|
|
f"[INFO] state_pose_params: start={params.numbers_of_elements_to_consider_start}, "
|
|
f"final={params.numbers_of_elements_to_consider_final}, "
|
|
f"passes={params.geometric_passes_per_stage}, "
|
|
f"normal_flip_tiebreak={params.use_marker_normals_flip_tiebreak}"
|
|
)
|
|
|
|
print("[STEP 2] Sequential geometric reconstruction...")
|
|
state, stage_reports = sequential_geometric_estimation(link_infos, observed_map, params)
|
|
|
|
out_data = state_to_json(state, link_infos, stage_reports, observed_map)
|
|
|
|
out_dir = os.path.dirname(resolve_path(args.out)) if args.out else os.path.dirname(resolve_path(args.optimized_markers))
|
|
if not out_dir:
|
|
out_dir = "."
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
|
out_file = args.out or os.path.join(out_dir, "robot_state.json")
|
|
save_json(out_file, out_data)
|
|
|
|
print(f"[INFO] Saved robot state to {out_file}")
|
|
|
|
# Compact summary of the resulting movements
|
|
print("\n[INFO] Estimated movements:")
|
|
for key, info in out_data["movements"].items():
|
|
if "value_mm" in info:
|
|
print(f" {key}: {info['value_mm']:.3f} mm ({info['link']}, {info['joint_type']})")
|
|
elif "value_deg" in info:
|
|
print(f" {key}: {info['value_deg']:.3f} deg ({info['link']}, {info['joint_type']})")
|
|
else:
|
|
print(f" {key}: {info.get('value', 0.0):.6f} ({info.get('link', 'unknown')}, {info.get('joint_type', 'unknown')})")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|