4b kind-marker für winkel beachten

This commit is contained in:
chk
2026-06-16 15:28:14 +02:00
parent 0234c1ef1d
commit 578b955508
7 changed files with 1155 additions and 70 deletions

View File

@@ -5,10 +5,12 @@
Generic revolute-joint angle estimator.
For each movable link (Arm1, Ellbow, Arm2 …) whose joint type is 'revolute',
this script estimates the rotation angle using the pairwise-vector method
(PRIMARY), with a single-marker pivot method as FALLBACK:
this script estimates the rotation angle using one of three methods, tried
in order — each next TIER is a pure fallback, only used when the previous
one found NOT A SINGLE usable (non axis-degenerate) pair:
PRIMARY for every PAIR (m1, m2) of markers belonging to the target link:
TIER 0 — PRIMARY: for every PAIR (m1, m2) of markers belonging to the
target link itself:
v_model = spoke_world(m2) - spoke_world(m1) (model, world-oriented)
v_obs = world_pos_m2 - world_pos_m1 (observed, world frame)
@@ -26,15 +28,28 @@ this script estimates the rotation angle using the pairwise-vector method
Pair weights = baseline_model × baseline_obs (longer baselines → more reliable).
FALLBACK — only used when the PRIMARY method has no usable pair at all
(e.g. just one marker visible, or every visible pair happens to lie
parallel to the joint axis, as for two markers spaced along a forearm):
the joint PIVOT itself stands in for the missing second marker, i.e. the
"pair" becomes (pivot, m1). This needs only ONE matched marker, but —
unlike the primary method — its accuracy additionally depends on the
already-estimated PARENT joint *values* being correct (not just their
axis direction), since the pivot's world position comes from FK. See
`PIVOT_FALLBACK_ID` / `used_fallback` in the code.
TIER 1 — FALLBACK-1 (child-axis): only entered when TIER 0 has nothing.
Uses a PAIR of markers on the DIRECT CHILD link instead of the target
link, picking only pairs whose LOCAL connecting vector is (nearly)
parallel to the CHILD's OWN joint axis. A rotation about an axis never
moves a vector parallel to that very axis, so such a pair is invariant
to the child's own (still-unknown) rotation and transforms purely under
the chain up to and including the TARGET joint — exactly like a TIER-0
pair, just sourced one link further down. Like TIER 0 (and unlike
TIER 2), this only needs the axis DIRECTION to be correct, not the
pivot's position, so it is preferred over TIER 2 whenever available.
Example: Ellbow (axis X) ← Arm2 markers 144/148 or 143/146 (Arm2's own
axis Y, ⟂ to X, both pairs exactly axis-aligned in Arm2's local frame).
TIER 2 — FALLBACK-2 (pivot): only entered when TIER 1 ALSO has nothing
(e.g. no markers visible at all besides one on the target link itself,
or no child link exists). The joint PIVOT itself stands in for a
missing second marker, i.e. the "pair" becomes (pivot, m1). This needs
only ONE matched marker on the target link, but — unlike TIER 0/1 —
its accuracy additionally depends on the already-estimated PARENT joint
*values* being correct (not just their axis direction), since the
pivot's world position comes from FK. See `PIVOT_FALLBACK_ID` /
`TIER_*` / `tier_used` in the code.
How to use sequentially
-----------------------
@@ -52,7 +67,7 @@ Output JSON
{
"link": "Arm1",
"joint": "y",
"method": "marker_pair", // or "pivot_fallback" — see FALLBACK above
"method": "primary", // or "fallback_1_child_axis" / "fallback_2_pivot" — see TIERs above
"mean_angle_deg": 86.3,
"circular_std_deg": 0.7,
"num_pairs": 6,
@@ -80,11 +95,19 @@ from robot_fk import RobotFK
STATE_KEYS = ("x", "y", "z", "a", "b", "c", "e")
# Sentinel "marker id" used in `per_pair` reports for the joint pivot.
# Only ever appears when the FALLBACK path (pivot vs. a single marker)
# was used instead of a real marker-to-marker pair — see the
# `used_fallback` block inside `estimate_revolute_angle()` below.
# Only ever appears in TIER_FALLBACK_2 entries (pivot vs. a single marker)
# — see the TIER_FALLBACK_2 block inside `estimate_revolute_angle()` below.
PIVOT_FALLBACK_ID = -1
# Tier labels — reported in `per_pair[].tier` and the top-level `method`
# field, so it's always traceable which method actually produced a given
# estimate. Tried in this order; each next one is a pure fallback (see
# module docstring above for what each tier means and why it's ordered
# this way).
TIER_PRIMARY = "primary" # pair of markers on the target link itself
TIER_FALLBACK_1 = "fallback_1_child_axis" # pair on a CHILD link, aligned with the child's OWN axis
TIER_FALLBACK_2 = "fallback_2_pivot" # single marker on the target link vs. the joint pivot
# ──────────────────────────────────────────────────────────────
# I/O
@@ -186,16 +209,18 @@ def _pair_estimate(v_model: np.ndarray,
axis_world: np.ndarray,
marker_ids: Tuple[int, int],
min_baseline_mm: float,
fallback: bool) -> Tuple[Optional[float], Optional[float], dict]:
tier: str,
source_link: str) -> Tuple[Optional[float], Optional[float], dict]:
"""
Project model/observed vectors perpendicular to the joint axis and
derive one angle estimate from them. Returns (angle_rad, weight,
per_pair_entry) — angle_rad/weight are None when skipped (baseline
too short).
`fallback=True` marks entries produced by the pivot FALLBACK (one of
the two "markers" is actually the joint pivot, see PIVOT_FALLBACK_ID)
so callers/reports can always tell primary and fallback data apart.
`tier` (one of the TIER_* constants) and `source_link` (the link the
two marker_ids actually belong to — may differ from the target link
for TIER_FALLBACK_1) are purely descriptive, so callers/reports can
always tell where a given estimate came from.
"""
v_model_perp = _project_perp(v_model, axis_world)
v_obs_perp = _project_perp(v_obs, axis_world)
@@ -206,7 +231,8 @@ def _pair_estimate(v_model: np.ndarray,
if bl_model < min_baseline_mm or bl_obs < min_baseline_mm:
return None, None, {
"marker_ids": list(marker_ids),
"fallback": fallback,
"link": source_link,
"tier": tier,
"skipped": True,
"reason": f"bl_model={bl_model:.1f} bl_obs={bl_obs:.1f} < {min_baseline_mm}",
}
@@ -215,7 +241,8 @@ def _pair_estimate(v_model: np.ndarray,
weight = bl_model * bl_obs
entry = {
"marker_ids": list(marker_ids),
"fallback": fallback,
"link": source_link,
"tier": tier,
"skipped": False,
"angle_deg": math.degrees(angle),
"baseline_model_mm": bl_model,
@@ -225,6 +252,36 @@ def _pair_estimate(v_model: np.ndarray,
return angle, weight, entry
def _child_links(fk: RobotFK, link_name: str) -> List[str]:
"""Direct children of `link_name` in the kinematic tree (robot.json `parent` field)."""
return [n for n, d in fk.links.items() if d.get("parent") == link_name]
def _axis_aligned_pairs(local_positions: Dict[int, np.ndarray],
own_axis_local: np.ndarray,
tol_mm: float) -> List[Tuple[int, int]]:
"""
Among marker pairs on a CHILD link, return those whose LOCAL connecting
vector is (nearly) parallel to the CHILD's OWN joint axis — i.e. the
component perpendicular to that axis is within `tol_mm` of zero.
Such a pair is invariant to the child's own (still-unknown) rotation
(a rotation about an axis never moves a vector parallel to that same
axis), which is exactly what TIER_FALLBACK_1 relies on. Pairs that
fail this check are skipped here — using them would silently mix in
the child's unknown rotation and bias the result (see module
docstring / TIER 1).
"""
a_hat = own_axis_local / (np.linalg.norm(own_axis_local) + 1e-15)
good: List[Tuple[int, int]] = []
for id1, id2 in combinations(sorted(local_positions.keys()), 2):
v_local = local_positions[id2] - local_positions[id1]
v_radial = v_local - np.dot(v_local, a_hat) * a_hat
if float(np.linalg.norm(v_radial)) <= tol_mm:
good.append((id1, id2))
return good
# ──────────────────────────────────────────────────────────────
# Core estimator (generic — works for any revolute joint)
# ──────────────────────────────────────────────────────────────
@@ -235,6 +292,7 @@ def estimate_revolute_angle(
link_name: str,
known_state: Dict[str, float],
min_baseline_mm: float = 15.0,
child_axis_tol_mm: float = 1.0,
verbose: bool = True,
) -> dict:
"""
@@ -247,7 +305,10 @@ def estimate_revolute_angle(
link_name : e.g. "Arm1", "Ellbow", "Arm2"
known_state : already-estimated joint values (e.g. {"x": 180.0, "y": 86.0})
The target joint variable should NOT be in this dict.
min_baseline_mm : skip pairs with model or observed baseline shorter than this
min_baseline_mm : skip pairs with model or observed baseline shorter than this
child_axis_tol_mm : TIER_FALLBACK_1 only — max perpendicular component (mm)
a child-link marker pair may have relative to the
child's OWN axis to still count as "axis-aligned"
verbose : print report
Returns
@@ -288,20 +349,14 @@ def estimate_revolute_angle(
matched = {mid: (model_local[mid], observed_mm[mid])
for mid in model_local if mid in observed_mm}
# Only 1 matched marker is enough to *attempt* an estimate — the
# PIVOT FALLBACK below can work with a single marker. With 0 there
# is nothing to go on at all.
if len(matched) < 1:
return {
"status": "failed",
"reason": (f"Need ≥1 matched marker, found {len(matched)}. "
f"Model marker IDs: {list(model_local.keys())}"),
}
# No early return here even if `matched` is empty: TIER_FALLBACK_1
# below needs zero markers on the TARGET link itself — only on its
# child. Whether ANY tier found anything is checked once, at the end.
def _spoke(local_pos: np.ndarray) -> np.ndarray:
return _model_spoke_world(fk, zero_transforms, link_name, origin_world, local_pos)
# ── PRIMARY: marker-to-marker pairs within this link ──────
# ── TIER 0 — PRIMARY: marker-to-marker pairs within this link ──
# Preferred whenever ≥2 markers with a usable (non axis-parallel)
# baseline are visible. Only the AXIS DIRECTION needs to be correct
# for this — not the pivot's position — so it is the more robust
@@ -319,45 +374,98 @@ def estimate_revolute_angle(
v_obs = o2 - o1 # observed, world frame
angle, weight, entry = _pair_estimate(
v_model, v_obs, axis_world, (id1, id2), min_baseline_mm, fallback=False)
v_model, v_obs, axis_world, (id1, id2), min_baseline_mm,
tier=TIER_PRIMARY, source_link=link_name)
per_pair.append(entry)
if angle is not None:
angle_rad_list.append(angle)
weight_list.append(weight)
# ── FALLBACK: pivot + single marker, axis from predecessor ────
# Only entered when the PRIMARY method above produced NOT A SINGLE
# usable pair (e.g. only one marker visible at all, or every visible
# pair happens to lie parallel to the joint axis — as for two
# markers spaced along a forearm). Each matched marker is paired
# with the joint PIVOT instead of another marker, using the
# rotation axis already known from the predecessor joints.
# This is strictly a fallback: compared to a real 2-marker baseline
# it additionally relies on the predecessor joints' *values* (not
# just their axis direction) being accurate, since the pivot's
# world position is computed via FK rather than observed directly.
used_fallback = False
if not angle_rad_list:
used_fallback = True
for mid in ids:
l, o = matched[mid]
v_model = _spoke(l) # pivot → marker, model, world-oriented
v_obs = o - origin_world # pivot → marker, observed
tier_used = TIER_PRIMARY
children_tried: List[str] = [] # for the diagnostic message if everything fails
angle, weight, entry = _pair_estimate(
v_model, v_obs, axis_world,
(PIVOT_FALLBACK_ID, mid), min_baseline_mm, fallback=True)
per_pair.append(entry)
if angle is not None:
angle_rad_list.append(angle)
weight_list.append(weight)
# ── TIER 1 — FALLBACK-1: axis-aligned pair on a CHILD link ────
# Only entered when TIER 0 produced NOT A SINGLE usable pair. Looks
# at every DIRECT child of this link and picks marker pairs whose
# local vector is parallel to the CHILD's OWN axis (see
# `_axis_aligned_pairs()`) — those are invariant to the child's own
# still-unknown rotation, so they can stand in for a TIER-0 pair.
# Like TIER 0, this needs only the axis DIRECTION, not the pivot's
# position, so it is preferred over TIER 2.
if not angle_rad_list:
tier_used = TIER_FALLBACK_1
children_tried = _child_links(fk, link_name)
for child_name in children_tried:
child = links[child_name]
child_ji = child.get("jointToParent", {}) or {}
child_axis_local = np.asarray(child_ji.get("axis", [1, 0, 0]), dtype=float)
child_model_local: Dict[int, np.ndarray] = {}
for m in child.get("markers", []):
mid = int(m.get("id", -1))
if mid >= 0 and "position" in m:
child_model_local[mid] = np.array(m["position"], dtype=float)
child_matched = {mid: (child_model_local[mid], observed_mm[mid])
for mid in child_model_local if mid in observed_mm}
if len(child_matched) < 2:
continue
aligned_pairs = _axis_aligned_pairs(
{mid: l for mid, (l, _o) in child_matched.items()},
child_axis_local, child_axis_tol_mm)
for id1, id2 in aligned_pairs:
l1, o1 = child_matched[id1]
l2, o2 = child_matched[id2]
v_model = (_model_spoke_world(fk, zero_transforms, child_name, origin_world, l2)
- _model_spoke_world(fk, zero_transforms, child_name, origin_world, l1))
v_obs = o2 - o1
angle, weight, entry = _pair_estimate(
v_model, v_obs, axis_world, (id1, id2), min_baseline_mm,
tier=TIER_FALLBACK_1, source_link=child_name)
per_pair.append(entry)
if angle is not None:
angle_rad_list.append(angle)
weight_list.append(weight)
# ── TIER 2 — FALLBACK-2: pivot + single marker on the target link ──
# Only entered when TIER 1 ALSO produced nothing (e.g. no child
# link, or its markers aren't visible/aligned either). Each
# matched marker on the TARGET link is paired with the joint
# PIVOT instead of another marker, using the rotation axis
# already known from the predecessor joints. This is the last
# resort: unlike TIER 0/1 it additionally relies on the
# predecessor joints' *values* (not just their axis direction)
# being accurate, since the pivot's world position comes from FK
# rather than being observed directly.
if not angle_rad_list:
tier_used = TIER_FALLBACK_2
for mid in ids:
l, o = matched[mid]
v_model = _spoke(l) # pivot → marker, model, world-oriented
v_obs = o - origin_world # pivot → marker, observed
angle, weight, entry = _pair_estimate(
v_model, v_obs, axis_world,
(PIVOT_FALLBACK_ID, mid), min_baseline_mm,
tier=TIER_FALLBACK_2, source_link=link_name)
per_pair.append(entry)
if angle is not None:
angle_rad_list.append(angle)
weight_list.append(weight)
if not angle_rad_list:
return {
"status": "failed",
"reason": "All pairs below min_baseline_mm, including the "
"pivot fallback. Try --min-baseline 5 or check "
"step-3 output.",
"reason": (f"No usable pair at any tier: primary ({len(matched)} "
f"marker(s) on '{link_name}'), fallback-1 (children "
f"tried: {children_tried or 'none'}), fallback-2 "
f"(pivot, same {len(matched)} marker(s)). Try "
f"--min-baseline / --child-axis-tol, or check step-3 output."),
}
mean_deg, c_var, c_std_deg = _circular_mean_deg(
@@ -370,9 +478,14 @@ def estimate_revolute_angle(
print(f" Joint origin (world): [{', '.join(f'{v:.1f}' for v in origin_world)}] mm")
print(f" Joint axis (world): [{', '.join(f'{v:.3f}' for v in axis_world)}]")
print(f" Matched markers: {list(matched.keys())}")
if used_fallback:
print(f" [FALLBACK] No usable marker-marker pair — estimating from "
f"pivot + predecessor axis instead (single-marker spokes).")
if tier_used == TIER_FALLBACK_1:
print(f" [FALLBACK-1] No usable same-link pair — estimating from "
f"axis-aligned marker pair(s) on child link(s) "
f"{children_tried} instead.")
elif tier_used == TIER_FALLBACK_2:
print(f" [FALLBACK-2] No usable pair on this link or its children — "
f"estimating from pivot + predecessor axis instead "
f"(single-marker spokes).")
print(f" Pairs used: {len(angle_rad_list)} / {len(per_pair)}")
print(f" Angle: {mean_deg:+.2f} ° circular_σ {c_std_deg:.2f} °")
if c_std_deg > 5.0:
@@ -382,11 +495,13 @@ def estimate_revolute_angle(
id0, id1_ = pp["marker_ids"]
m0 = "PIVOT" if id0 == PIVOT_FALLBACK_ID else f"M{id0}"
m1 = "PIVOT" if id1_ == PIVOT_FALLBACK_ID else f"M{id1_}"
tag = " [fallback]" if pp.get("fallback") else ""
link_prefix = f"{pp['link']}:" if pp["link"] != link_name else ""
tag = {TIER_PRIMARY: "", TIER_FALLBACK_1: " [fallback-1]",
TIER_FALLBACK_2: " [fallback-2]"}.get(pp.get("tier"), "")
if pp["skipped"]:
print(f" {m0}{m1}{tag}: SKIPPED {pp['reason']}")
print(f" {link_prefix}{m0}{link_prefix}{m1}{tag}: SKIPPED {pp['reason']}")
else:
print(f" {m0}{m1}{tag}: "
print(f" {link_prefix}{m0}{link_prefix}{m1}{tag}: "
f"{pp['angle_deg']:+7.2f}° "
f"bl_model={pp['baseline_model_mm']:.1f} "
f"bl_obs={pp['baseline_obs_mm']:.1f}")
@@ -399,7 +514,7 @@ def estimate_revolute_angle(
"status": "ok",
"link": link_name,
"joint": var,
"method": "pivot_fallback" if used_fallback else "marker_pair",
"method": tier_used,
"joint_origin_world_mm": origin_world.tolist(),
"joint_axis_world": axis_world.tolist(),
"mean_angle_deg": mean_deg,
@@ -436,6 +551,10 @@ def main() -> int:
p.add_argument("--min-baseline", type=float, default=15.0,
help="Skip pairs with perpendicular baseline < this (mm)")
p.add_argument("--child-axis-tol", type=float, default=1.0,
help="FALLBACK-1 only: max perpendicular component (mm) a "
"child-link marker pair may have relative to the "
"child's own axis to still count as axis-aligned")
p.add_argument("--output", type=Path, default=None,
help="Save result JSON (readable by next 4b as --from-state)")
args = p.parse_args()
@@ -463,7 +582,8 @@ def main() -> int:
observed_mm = observed_mm,
link_name = args.link,
known_state = known_state,
min_baseline_mm = args.min_baseline,
min_baseline_mm = args.min_baseline,
child_axis_tol_mm = args.child_axis_tol,
verbose = True,
)

View File

@@ -0,0 +1,539 @@
#!/usr/bin/env python3
"""
pose_estimation.py
==================
Estimate robot joint angles (x, y, z, a, b, c, e) from triangulated marker
poses, using the kinematic model in robot.json (via robot_fk.py).
Design
------
The estimator is parametrised over JOINT VARIABLES, not links. This handles the
tricky cases of this robot family generically:
* Links with zero own markers (Base/x, Hand/b, Palm/c) — their variable is
observable only through descendant markers.
* A variable shared by several links (FingerA & FingerB share 'e').
* Occluded middle links — global BA reconstructs them from the fingers.
Four switchable methods (robot.json -> pose_estimation.method):
sequential_vector : analytic per joint from marker-pair / normal vectors (fast)
sequential_fk : block-wise least squares along the chain (robust, 1 marker ok)
global_ba : all variables jointly, position + normal residuals, robust loss
hybrid : sequential_fk init -> global_ba refine (default, most stable)
Observation input:
marker_observation = "corner_pose" -> aruco_marker_poses.json (pos + measured normal)
marker_observation = "center_point" -> aruco_positions_*.json (pos only)
Both the engine (estimate_pose) and a CLI (main) live here.
"""
from __future__ import annotations
import argparse
import json
import math
import os
import sys
import time
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
sys.path.insert(0, str(Path(__file__).parent))
from robot_fk import RobotFK, STATE_KEYS # noqa: E402
try:
from scipy.optimize import least_squares
HAVE_SCIPY = True
except ImportError:
HAVE_SCIPY = False
# ==================================================================
# Config
# ==================================================================
DEFAULT_CFG: Dict[str, Any] = {
"method": "hybrid",
"marker_observation": "corner_pose",
"use_normals": True,
"normal_weight": 100.0,
"robust_loss": "huber",
"huber_delta_mm": 8.0,
"max_iterations": 200,
"min_cameras_per_marker": 2,
"finger_block_joints": ["b", "c", "e"],
"per_link_method": {},
}
def load_pose_cfg(robot_data: Dict[str, Any]) -> Dict[str, Any]:
cfg = dict(DEFAULT_CFG)
cfg.update(robot_data.get("pose_estimation", {}) or {})
return cfg
# ==================================================================
# Observations
# ==================================================================
def load_observations(path: str, use_normals: bool, min_cams: int = 2) -> Dict[int, Dict[str, Any]]:
"""
Load marker observations. Accepts aruco_marker_poses.json (with measured
normal + num_cameras) or aruco_positions_*.json (position only).
Returns: marker_id -> {pos_mm:(3,), normal:(3,)|None, link:str, n_cams:int}
"""
data = json.load(open(path, "r", encoding="utf-8"))
out: Dict[int, Dict[str, Any]] = {}
for m in data.get("markers", []):
mid = int(m.get("marker_id", m.get("id", -1)))
if mid < 0:
continue
n_cams = int(m.get("num_cameras", 99))
if n_cams < min_cams:
continue
if "position_mm" in m:
pos = np.array(m["position_mm"], dtype=float)
elif "position_m" in m:
pos = np.array(m["position_m"], dtype=float) * 1000.0
else:
continue
nrm = None
if use_normals and m.get("normal") is not None:
nv = np.array(m["normal"], dtype=float)
nn = np.linalg.norm(nv)
if nn > 1e-9:
nrm = nv / nn
out[mid] = {"pos_mm": pos, "normal": nrm, "link": m.get("link", "?"), "n_cams": n_cams}
return out
# ==================================================================
# Kinematic chain analysis
# ==================================================================
def analyze_chain(fk: RobotFK) -> Dict[str, Any]:
"""
Derive, generically from the FK topology:
ordered_vars : movable joint variables, root->tip order, de-duplicated
var_links : variable -> list of links it drives
link_markers : link -> [model marker ids]
blocks : sequential estimation blocks; each block groups the
zero-marker ancestor variables with the next marker-
bearing joint variable, estimated from that link's own
markers (+ siblings sharing the same variable).
"""
links = fk.links
topo = fk._topo
link_markers: Dict[str, List[int]] = {}
for ln, ld in links.items():
ids = []
for mk in ld.get("markers", []) or []:
if "id" in mk and "position" in mk:
ids.append(int(mk["id"]))
link_markers[ln] = ids
link_var: Dict[str, str] = {}
for ln, ld in links.items():
j = ld.get("jointToParent", {}) or {}
if str(j.get("type", "")).lower() in ("revolute", "linear"):
v = str(j.get("variable", "")).lower()
if v:
link_var[ln] = v
var_type: Dict[str, str] = {}
var_links: Dict[str, List[str]] = defaultdict(list)
for ln, v in link_var.items():
var_links[v].append(ln)
var_type[v] = str(links[ln].get("jointToParent", {}).get("type", "")).lower()
ordered_vars: List[str] = []
for ln in topo:
if ln in link_var and link_var[ln] not in ordered_vars:
ordered_vars.append(link_var[ln])
# ---- build blocks ----
blocks: List[Dict[str, Any]] = []
var_block: Dict[str, int] = {}
pending: List[str] = []
for ln in topo:
if ln not in link_var:
continue
v = link_var[ln]
own = link_markers.get(ln, [])
if v in var_block:
# shared variable already in a block -> add this link's markers there
if own:
blocks[var_block[v]]["markers"].extend(own)
continue
if own:
bvars = []
for x in pending + [v]:
if x not in bvars and x not in var_block:
bvars.append(x)
blocks.append({"vars": bvars, "markers": list(own), "anchor": ln})
for x in bvars:
var_block[x] = len(blocks) - 1
pending = []
else:
if v not in pending:
pending.append(v)
if pending:
blocks.append({"vars": pending, "markers": [], "anchor": None})
for x in pending:
var_block[x] = len(blocks) - 1
return {
"ordered_vars": ordered_vars,
"var_type": var_type,
"var_links": dict(var_links),
"link_markers": link_markers,
"blocks": blocks,
}
# ==================================================================
# Residuals
# ==================================================================
def model_markers(fk: RobotFK, state: Dict[str, float]) -> Dict[int, Dict[str, np.ndarray]]:
T = fk.compute(state)
return fk.all_markers_world(T) # mid -> {world_mm, normal_world, link, local_mm}
def residual_vector(state: Dict[str, float], fk: RobotFK, obs: Dict[int, Dict[str, Any]],
marker_ids: List[int], cfg: Dict[str, Any]) -> np.ndarray:
"""Position (mm) + optional normal (scaled) residuals over the given markers."""
model = model_markers(fk, state)
res: List[float] = []
w_n = float(cfg.get("normal_weight", 30.0))
use_n = bool(cfg.get("use_normals", True))
for mid in marker_ids:
if mid not in model or mid not in obs:
continue
mm = model[mid]
dp = np.asarray(mm["world_mm"], float) - obs[mid]["pos_mm"]
res.extend(dp.tolist())
if use_n and obs[mid]["normal"] is not None and "normal_world" in mm:
dn = (np.asarray(mm["normal_world"], float) - obs[mid]["normal"]) * w_n
res.extend(dn.tolist())
return np.asarray(res, dtype=float)
def _state_from_vec(var_names: List[str], vec: np.ndarray, base: Dict[str, float]) -> Dict[str, float]:
s = dict(base)
for name, val in zip(var_names, vec):
s[name] = float(val)
return s
# ==================================================================
# Method: global bundle adjustment
# ==================================================================
def estimate_global_ba(fk: RobotFK, obs: Dict[int, Dict[str, Any]], var_names: List[str],
x0: Dict[str, float], cfg: Dict[str, Any]) -> Dict[str, float]:
if not HAVE_SCIPY:
print("[WARN] scipy missing — global_ba skipped, returning init")
return dict(x0)
marker_ids = list(obs.keys())
base = {k: 0.0 for k in STATE_KEYS}
base.update(x0)
vec0 = np.array([base.get(v, 0.0) for v in var_names], dtype=float)
def fun(vec):
st = _state_from_vec(var_names, vec, base)
return residual_vector(st, fk, obs, marker_ids, cfg)
loss = cfg.get("robust_loss", "huber")
f_scale = float(cfg.get("huber_delta_mm", 8.0))
try:
sol = least_squares(fun, vec0, loss=loss, f_scale=f_scale,
max_nfev=int(cfg.get("max_iterations", 200)) * max(1, len(var_names)))
return _state_from_vec(var_names, sol.x, base)
except Exception as exc:
print(f"[WARN] global_ba failed: {exc}")
return dict(base)
# ==================================================================
# Method: sequential block-wise FK fit
# ==================================================================
def _multistart_values(vtype: str) -> List[float]:
# revolute: scan the circle to escape local minima at large angles
if vtype == "revolute":
return [0.0, 60.0, 120.0, 180.0, 240.0, 300.0]
return [0.0]
def estimate_sequential_fk(fk: RobotFK, obs: Dict[int, Dict[str, Any]], chain: Dict[str, Any],
cfg: Dict[str, Any]) -> Dict[str, float]:
"""Estimate block by block along the chain, freezing already-solved variables."""
state = {k: 0.0 for k in STATE_KEYS}
var_type = chain["var_type"]
for block in chain["blocks"]:
bvars = block["vars"]
bmarkers = [m for m in block["markers"] if m in obs]
if not bvars:
continue
if not bmarkers:
# unobservable block: leave at 0, flag later
continue
if not HAVE_SCIPY:
continue
base = dict(state)
def fun(vec, _bvars=bvars, _bm=bmarkers, _base=base):
st = _state_from_vec(_bvars, vec, _base)
return residual_vector(st, fk, obs, _bm, cfg)
# multi-start over the first revolute variable in the block
starts = [[0.0] * len(bvars)]
lead_type = var_type.get(bvars[0], "linear")
if lead_type == "revolute":
starts = []
for a0 in _multistart_values("revolute"):
s = [0.0] * len(bvars)
s[0] = a0
starts.append(s)
best, best_cost = None, float("inf")
for s0 in starts:
try:
sol = least_squares(fun, np.array(s0, dtype=float),
loss=cfg.get("robust_loss", "huber"),
f_scale=float(cfg.get("huber_delta_mm", 8.0)),
max_nfev=200 * max(1, len(bvars)))
if sol.cost < best_cost:
best_cost, best = sol.cost, sol.x
except Exception:
continue
if best is not None:
for name, val in zip(bvars, best):
state[name] = float(val)
# wrap revolute angles to (-180, 180]
for v, vt in var_type.items():
if vt == "revolute":
state[v] = (state[v] + 180.0) % 360.0 - 180.0
return state
# ==================================================================
# Method: sequential analytic vector (per revolute joint)
# ==================================================================
def estimate_sequential_vector(fk: RobotFK, obs: Dict[int, Dict[str, Any]], chain: Dict[str, Any],
cfg: Dict[str, Any]) -> Dict[str, float]:
"""
Analytic angle from marker geometry where possible. For revolute joints with
>=2 markers on the link, use the perpendicular marker-pair vector. Falls back
to the FK block solver for linear / zero-marker / single-marker cases, so it
always returns a full state (still cheaper than full sequential_fk because
well-populated joints are solved in closed form).
"""
state = {k: 0.0 for k in STATE_KEYS}
var_type = chain["var_type"]
link_markers = chain["link_markers"]
var_links = chain["var_links"]
for block in chain["blocks"]:
bvars = block["vars"]
if len(bvars) == 1 and var_type.get(bvars[0]) == "revolute":
v = bvars[0]
ln = var_links[v][0]
mids = [m for m in link_markers.get(ln, []) if m in obs]
if len(mids) >= 2:
# model vectors must be expressed in the WORLD frame at angle=0
# (the link frame is already rotated by the parents y,z,...), so
# use FK marker world positions with this joint set to 0.
state_v0 = dict(state)
state_v0[v] = 0.0
model_v0 = model_markers(fk, state_v0)
axis_world = fk.joint_axis_world(ln, state_v0)
ang = _angle_from_pairs_world(mids, model_v0, obs, axis_world)
if ang is not None:
state[v] = ang
continue
# fallback: block FK fit for this single block
_fit_single_block(fk, obs, block, var_type, cfg, state)
for v, vt in var_type.items():
if vt == "revolute":
state[v] = (state[v] + 180.0) % 360.0 - 180.0
return state
def _angle_from_pairs_world(mids: List[int], model_v0: Dict[int, Dict[str, np.ndarray]],
obs: Dict[int, Dict[str, Any]], axis_world: np.ndarray) -> Optional[float]:
from itertools import combinations
a = np.asarray(axis_world, float)
a = a / (np.linalg.norm(a) + 1e-12)
angs, ws = [], []
for i, j in combinations(mids, 2):
if i not in model_v0 or j not in model_v0:
continue
vm = np.asarray(model_v0[j]["world_mm"], float) - np.asarray(model_v0[i]["world_mm"], float) # world @ angle 0
vo = obs[j]["pos_mm"] - obs[i]["pos_mm"] # observed vector (world, mm)
vm_p = vm - np.dot(vm, a) * a
vo_p = vo - np.dot(vo, a) * a
if np.linalg.norm(vm_p) < 5 or np.linalg.norm(vo_p) < 5:
continue
ang = math.atan2(float(np.dot(a, np.cross(vm_p, vo_p))), float(np.dot(vm_p, vo_p)))
angs.append(ang)
ws.append(np.linalg.norm(vm_p) * np.linalg.norm(vo_p))
if not angs:
return None
s = sum(w * math.sin(x) for w, x in zip(ws, angs))
c = sum(w * math.cos(x) for w, x in zip(ws, angs))
return math.degrees(math.atan2(s, c))
def _fit_single_block(fk, obs, block, var_type, cfg, state):
if not HAVE_SCIPY:
return
bvars = block["vars"]
bmarkers = [m for m in block["markers"] if m in obs]
if not bvars or not bmarkers:
return
base = dict(state)
def fun(vec):
return residual_vector(_state_from_vec(bvars, vec, base), fk, obs, bmarkers, cfg)
starts = [[0.0] * len(bvars)]
if var_type.get(bvars[0]) == "revolute":
starts = [[a0] + [0.0] * (len(bvars) - 1) for a0 in _multistart_values("revolute")]
best, best_cost = None, float("inf")
for s0 in starts:
try:
sol = least_squares(fun, np.array(s0, float), loss=cfg.get("robust_loss", "huber"),
f_scale=float(cfg.get("huber_delta_mm", 8.0)), max_nfev=200 * max(1, len(bvars)))
if sol.cost < best_cost:
best_cost, best = sol.cost, sol.x
except Exception:
continue
if best is not None:
for name, val in zip(bvars, best):
state[name] = float(val)
# ==================================================================
# Dispatch
# ==================================================================
def observability(chain: Dict[str, Any], obs: Dict[int, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""
Per-variable confidence from how well its estimation block is determined.
A block groups coupled variables (e.g. b,c,e on the fingers); confidence is
driven by markers-per-variable in that block:
high : >= 2 markers per variable (well over-determined)
medium : >= 1 marker per variable
low : fewer markers than variables (under-determined — distrust!)
none : no markers at all (variable left at 0)
"""
info: Dict[str, Dict[str, Any]] = {}
for block in chain["blocks"]:
seen = [m for m in block["markers"] if m in obs]
nvars = max(1, len(block["vars"]))
ratio = len(seen) / nvars
if len(seen) == 0:
conf = "none"
elif ratio >= 2.0:
conf = "high"
elif ratio >= 1.0:
conf = "medium"
else:
conf = "low"
for v in block["vars"]:
info[v] = {"observable": len(seen) > 0, "n_markers": len(seen),
"block_vars": len(block["vars"]), "confidence": conf,
"block_anchor": block["anchor"]}
return info
def estimate_pose(fk: RobotFK, obs: Dict[int, Dict[str, Any]], cfg: Dict[str, Any]) -> Dict[str, Any]:
chain = analyze_chain(fk)
var_names = chain["ordered_vars"]
method = str(cfg.get("method", "hybrid")).lower()
obsv = observability(chain, obs)
if method == "sequential_vector":
state = estimate_sequential_vector(fk, obs, chain, cfg)
elif method == "sequential_fk":
state = estimate_sequential_fk(fk, obs, chain, cfg)
elif method == "global_ba":
init = estimate_sequential_fk(fk, obs, chain, cfg) # cheap robust init
state = estimate_global_ba(fk, obs, var_names, init, cfg)
else: # hybrid (default)
init = estimate_sequential_fk(fk, obs, chain, cfg)
state = estimate_global_ba(fk, obs, var_names, init, cfg)
# final residual stats over all observed markers
final_res = residual_vector(state, fk, obs, list(obs.keys()), cfg)
rms = float(np.sqrt(np.mean(final_res ** 2))) if final_res.size else 0.0
return {"state": state, "method": method, "observability": obsv,
"residual_rms": rms, "num_markers": len(obs)}
# ==================================================================
# CLI
# ==================================================================
def main() -> None:
ap = argparse.ArgumentParser(description="Estimate robot joint angles from marker poses")
ap.add_argument("markers", help="aruco_marker_poses.json (corner_pose) or aruco_positions_*.json (center)")
ap.add_argument("-robot", "--robot", required=True)
ap.add_argument("-out", "--out", default=None)
ap.add_argument("--method", default=None, help="override robot.json method")
args = ap.parse_args()
robot_data = json.load(open(args.robot, "r", encoding="utf-8"))
cfg = load_pose_cfg(robot_data)
if args.method:
cfg["method"] = args.method
fk = RobotFK(robot_data)
obs = load_observations(args.markers, cfg.get("use_normals", True),
int(cfg.get("min_cameras_per_marker", 2)))
print(f"[INFO] method={cfg['method']} | observed markers={len(obs)} | use_normals={cfg.get('use_normals')}")
result = estimate_pose(fk, obs, cfg)
st = result["state"]
print("\nEstimated joint values:")
for v in ["x", "y", "z", "a", "b", "c", "e"]:
ob = result["observability"].get(v, {})
unit = "mm" if v in ("x", "e") else "deg"
conf = ob.get("confidence", "?")
tag = "" if ob.get("observable", False) else " [UNOBSERVABLE -> 0]"
print(f" {v}: {st.get(v, 0.0):8.2f} {unit} (markers={ob.get('n_markers','?')}, conf={conf}){tag}")
print(f"\n[INFO] residual RMS over {result['num_markers']} markers: {result['residual_rms']:.3f}")
out = {
"schema_version": "1.0",
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"method": result["method"],
"movements": {v: {"value": st.get(v, 0.0),
"unit": "mm" if v in ("x", "e") else "deg",
"observable": result["observability"].get(v, {}).get("observable", False),
"confidence": result["observability"].get(v, {}).get("confidence", "none"),
"n_markers": result["observability"].get(v, {}).get("n_markers", 0)}
for v in ["x", "y", "z", "a", "b", "c", "e"]},
"residual_rms": result["residual_rms"],
"num_markers": result["num_markers"],
}
out_path = args.out or os.path.join(os.path.dirname(args.markers), "robot_state.json")
json.dump(out, open(out_path, "w", encoding="utf-8"), indent=2)
print(f"[INFO] wrote {out_path}")
if __name__ == "__main__":
main()

Binary file not shown.