601 lines
27 KiB
Python
601 lines
27 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
4b_revolute_angle.py
|
||
--------------------
|
||
Generic revolute-joint angle estimator.
|
||
|
||
For each movable link (Arm1, Ellbow, Arm2 …) whose joint type is 'revolute',
|
||
this script estimates the rotation angle using one of three methods, tried
|
||
in order — each next TIER is a pure fallback, only used when the previous
|
||
one found NOT A SINGLE usable (non axis-degenerate) pair:
|
||
|
||
TIER 0 — PRIMARY: for every PAIR (m1, m2) of markers belonging to the
|
||
target link itself:
|
||
v_model = spoke_world(m2) - spoke_world(m1) (model, world-oriented)
|
||
v_obs = world_pos_m2 - world_pos_m1 (observed, world frame)
|
||
|
||
where spoke_world(m) rotates the marker's local `position` (from
|
||
robot.json) through the already-known PARENT joints via FK, so it is
|
||
expressed in world orientation at this joint's own angle = 0 — exactly
|
||
the frame v_obs lives in.
|
||
|
||
Both vectors are projected perpendicular to the joint axis (in world frame),
|
||
and the signed angle from v_model_perp to v_obs_perp is measured.
|
||
|
||
The joint axis in world frame is computed via FK using the already-known
|
||
joint values (from 4a, 4b-prev …), so it is ALWAYS correct — even for
|
||
deeply-nested joints whose world-frame axis differs from their local axis.
|
||
|
||
Pair weights = baseline_model × baseline_obs (longer baselines → more reliable).
|
||
|
||
TIER 1 — FALLBACK-1 (child-axis): only entered when TIER 0 has nothing.
|
||
Uses a PAIR of markers on the DIRECT CHILD link instead of the target
|
||
link, picking only pairs whose LOCAL connecting vector is (nearly)
|
||
parallel to the CHILD's OWN joint axis. A rotation about an axis never
|
||
moves a vector parallel to that very axis, so such a pair is invariant
|
||
to the child's own (still-unknown) rotation and transforms purely under
|
||
the chain up to and including the TARGET joint — exactly like a TIER-0
|
||
pair, just sourced one link further down. Like TIER 0 (and unlike
|
||
TIER 2), this only needs the axis DIRECTION to be correct, not the
|
||
pivot's position, so it is preferred over TIER 2 whenever available.
|
||
Example: Ellbow (axis X) ← Arm2 markers 144/148 or 143/146 (Arm2's own
|
||
axis Y, ⟂ to X, both pairs exactly axis-aligned in Arm2's local frame).
|
||
|
||
TIER 2 — FALLBACK-2 (pivot): only entered when TIER 1 ALSO has nothing
|
||
(e.g. no markers visible at all besides one on the target link itself,
|
||
or no child link exists). The joint PIVOT itself stands in for a
|
||
missing second marker, i.e. the "pair" becomes (pivot, m1). This needs
|
||
only ONE matched marker on the target link, but — unlike TIER 0/1 —
|
||
its accuracy additionally depends on the already-estimated PARENT joint
|
||
*values* being correct (not just their axis direction), since the
|
||
pivot's world position comes from FK. See `PIVOT_FALLBACK_ID` /
|
||
`TIER_*` / `tier_used` in the code.
|
||
|
||
How to use sequentially
|
||
-----------------------
|
||
Run 4b once per revolute joint, from root to tip:
|
||
|
||
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm1 --x-mm 180
|
||
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Ellbow --from-state state.json
|
||
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm2 --from-state state.json
|
||
|
||
The --from-state flag reads the accumulated joint state JSON so you don't have
|
||
to pass every preceding value on the command line.
|
||
|
||
Output JSON
|
||
-----------
|
||
{
|
||
"link": "Arm1",
|
||
"joint": "y",
|
||
"method": "primary", // or "fallback_1_child_axis" / "fallback_2_pivot" — see TIERs above
|
||
"mean_angle_deg": 86.3,
|
||
"circular_std_deg": 0.7,
|
||
"num_pairs": 6,
|
||
"joint_origin_world_mm": [290, 108, 61],
|
||
"joint_axis_world": [-1, 0, 0],
|
||
...
|
||
}
|
||
|
||
The file also contains the full accumulated state so the next 4b invocation
|
||
can read it via --from-state.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import math
|
||
from itertools import combinations
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Sequence, Tuple
|
||
|
||
import numpy as np
|
||
from robot_fk import RobotFK
|
||
|
||
STATE_KEYS = ("x", "y", "z", "a", "b", "c", "e")
|
||
|
||
# Sentinel "marker id" used in `per_pair` reports for the joint pivot.
|
||
# Only ever appears in TIER_FALLBACK_2 entries (pivot vs. a single marker)
|
||
# — see the TIER_FALLBACK_2 block inside `estimate_revolute_angle()` below.
|
||
PIVOT_FALLBACK_ID = -1
|
||
|
||
# Tier labels — reported in `per_pair[].tier` and the top-level `method`
|
||
# field, so it's always traceable which method actually produced a given
|
||
# estimate. Tried in this order; each next one is a pure fallback (see
|
||
# module docstring above for what each tier means and why it's ordered
|
||
# this way).
|
||
TIER_PRIMARY = "primary" # pair of markers on the target link itself
|
||
TIER_FALLBACK_1 = "fallback_1_child_axis" # pair on a CHILD link, aligned with the child's OWN axis
|
||
TIER_FALLBACK_2 = "fallback_2_pivot" # single marker on the target link vs. the joint pivot
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────
|
||
# I/O
|
||
# ──────────────────────────────────────────────────────────────
|
||
|
||
def _load_json(path: Path) -> dict:
|
||
with path.open("r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
|
||
|
||
def _load_observed_mm(aruco_json: dict) -> Dict[int, np.ndarray]:
|
||
result: Dict[int, np.ndarray] = {}
|
||
for m in aruco_json.get("markers", []):
|
||
mid = int(m.get("marker_id", m.get("id", -1)))
|
||
if mid < 0:
|
||
continue
|
||
if "position_mm" in m:
|
||
result[mid] = np.array(m["position_mm"], dtype=float)
|
||
elif "position_m" in m:
|
||
result[mid] = np.array(m["position_m"], dtype=float) * 1000.0
|
||
return result
|
||
|
||
|
||
def _load_state(path: Path) -> Dict[str, float]:
|
||
"""Load accumulated joint state from a previous 4b output JSON."""
|
||
raw = _load_json(path)
|
||
state = raw.get("accumulated_state", raw.get("state", {}))
|
||
return {k: float(v) for k, v in state.items() if k in STATE_KEYS}
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────
|
||
# Angle maths
|
||
# ──────────────────────────────────────────────────────────────
|
||
|
||
def _project_perp(v: np.ndarray, axis: np.ndarray) -> np.ndarray:
|
||
"""Remove the component of v along axis."""
|
||
a = axis / (np.linalg.norm(axis) + 1e-15)
|
||
return v - np.dot(v, a) * a
|
||
|
||
|
||
def _signed_angle_rad(v_from: np.ndarray, v_to: np.ndarray,
|
||
axis: np.ndarray) -> float:
|
||
"""Signed angle rotating v_from onto v_to around axis (radians)."""
|
||
a = axis / (np.linalg.norm(axis) + 1e-15)
|
||
return math.atan2(float(np.dot(a, np.cross(v_from, v_to))),
|
||
float(np.dot(v_from, v_to)))
|
||
|
||
|
||
def _wrap(angle: float) -> float:
|
||
"""Wrap to (−π, π]."""
|
||
return (angle + math.pi) % (2.0 * math.pi) - math.pi
|
||
|
||
|
||
def _circular_mean_deg(angles_rad: np.ndarray,
|
||
weights: np.ndarray) -> Tuple[float, float, float]:
|
||
"""Returns mean_deg, circular_variance, circular_std_deg."""
|
||
w = np.clip(weights, 0, None)
|
||
if w.sum() < 1e-15:
|
||
w = np.ones_like(w)
|
||
s, c = np.sum(w * np.sin(angles_rad)), np.sum(w * np.cos(angles_rad))
|
||
mean = math.atan2(s, c)
|
||
R = math.sqrt(s*s + c*c) / w.sum()
|
||
R = float(np.clip(R, 0, 1))
|
||
c_var = 1.0 - R
|
||
c_std = math.sqrt(max(0.0, -2.0 * math.log(max(R, 1e-15))))
|
||
return math.degrees(mean), c_var, math.degrees(c_std)
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────
|
||
# Shared spoke/pair math
|
||
# (used by BOTH the primary marker-pair method and the pivot FALLBACK)
|
||
# ──────────────────────────────────────────────────────────────
|
||
|
||
def _model_spoke_world(fk: RobotFK,
|
||
zero_transforms: Dict[str, np.ndarray],
|
||
link_name: str,
|
||
origin_world: np.ndarray,
|
||
local_pos: np.ndarray) -> np.ndarray:
|
||
"""
|
||
Vector from the joint PIVOT to a marker, in WORLD ORIENTATION, as it
|
||
would be if this joint's own angle were 0 (only the already-known
|
||
PARENT rotations applied).
|
||
|
||
This is what `v_model` must be expressed as before comparing it to
|
||
`axis_world` / an observed vector: the raw `position` field from
|
||
robot.json lives in the link's own pre-rotation local frame and is
|
||
NOT yet rotated into world orientation by the parent chain. Skipping
|
||
this rotation silently biases the estimated angle by (roughly) the
|
||
parent joint's own angle — invisible whenever the parent chain has
|
||
no rotation yet (e.g. Arm1, whose parent 'Base' is purely linear),
|
||
but wrong for anything further down the chain (Ellbow, Arm2, Hand …).
|
||
"""
|
||
p0 = fk.marker_world(zero_transforms, link_name, local_pos)
|
||
return p0 - origin_world
|
||
|
||
|
||
def _pair_estimate(v_model: np.ndarray,
|
||
v_obs: np.ndarray,
|
||
axis_world: np.ndarray,
|
||
marker_ids: Tuple[int, int],
|
||
min_baseline_mm: float,
|
||
tier: str,
|
||
source_link: str) -> Tuple[Optional[float], Optional[float], dict]:
|
||
"""
|
||
Project model/observed vectors perpendicular to the joint axis and
|
||
derive one angle estimate from them. Returns (angle_rad, weight,
|
||
per_pair_entry) — angle_rad/weight are None when skipped (baseline
|
||
too short).
|
||
|
||
`tier` (one of the TIER_* constants) and `source_link` (the link the
|
||
two marker_ids actually belong to — may differ from the target link
|
||
for TIER_FALLBACK_1) are purely descriptive, so callers/reports can
|
||
always tell where a given estimate came from.
|
||
"""
|
||
v_model_perp = _project_perp(v_model, axis_world)
|
||
v_obs_perp = _project_perp(v_obs, axis_world)
|
||
|
||
bl_model = float(np.linalg.norm(v_model_perp))
|
||
bl_obs = float(np.linalg.norm(v_obs_perp))
|
||
|
||
if bl_model < min_baseline_mm or bl_obs < min_baseline_mm:
|
||
return None, None, {
|
||
"marker_ids": list(marker_ids),
|
||
"link": source_link,
|
||
"tier": tier,
|
||
"skipped": True,
|
||
"reason": f"bl_model={bl_model:.1f} bl_obs={bl_obs:.1f} < {min_baseline_mm}",
|
||
}
|
||
|
||
angle = _wrap(_signed_angle_rad(v_model_perp, v_obs_perp, axis_world))
|
||
weight = bl_model * bl_obs
|
||
entry = {
|
||
"marker_ids": list(marker_ids),
|
||
"link": source_link,
|
||
"tier": tier,
|
||
"skipped": False,
|
||
"angle_deg": math.degrees(angle),
|
||
"baseline_model_mm": bl_model,
|
||
"baseline_obs_mm": bl_obs,
|
||
"weight": weight,
|
||
}
|
||
return angle, weight, entry
|
||
|
||
|
||
def _child_links(fk: RobotFK, link_name: str) -> List[str]:
|
||
"""Direct children of `link_name` in the kinematic tree (robot.json `parent` field)."""
|
||
return [n for n, d in fk.links.items() if d.get("parent") == link_name]
|
||
|
||
|
||
def _axis_aligned_pairs(local_positions: Dict[int, np.ndarray],
|
||
own_axis_local: np.ndarray,
|
||
tol_mm: float) -> List[Tuple[int, int]]:
|
||
"""
|
||
Among marker pairs on a CHILD link, return those whose LOCAL connecting
|
||
vector is (nearly) parallel to the CHILD's OWN joint axis — i.e. the
|
||
component perpendicular to that axis is within `tol_mm` of zero.
|
||
|
||
Such a pair is invariant to the child's own (still-unknown) rotation
|
||
(a rotation about an axis never moves a vector parallel to that same
|
||
axis), which is exactly what TIER_FALLBACK_1 relies on. Pairs that
|
||
fail this check are skipped here — using them would silently mix in
|
||
the child's unknown rotation and bias the result (see module
|
||
docstring / TIER 1).
|
||
"""
|
||
a_hat = own_axis_local / (np.linalg.norm(own_axis_local) + 1e-15)
|
||
good: List[Tuple[int, int]] = []
|
||
for id1, id2 in combinations(sorted(local_positions.keys()), 2):
|
||
v_local = local_positions[id2] - local_positions[id1]
|
||
v_radial = v_local - np.dot(v_local, a_hat) * a_hat
|
||
if float(np.linalg.norm(v_radial)) <= tol_mm:
|
||
good.append((id1, id2))
|
||
return good
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────
|
||
# Core estimator (generic — works for any revolute joint)
|
||
# ──────────────────────────────────────────────────────────────
|
||
|
||
def estimate_revolute_angle(
|
||
fk: RobotFK,
|
||
observed_mm: Dict[int, np.ndarray],
|
||
link_name: str,
|
||
known_state: Dict[str, float],
|
||
min_baseline_mm: float = 15.0,
|
||
child_axis_tol_mm: float = 1.0,
|
||
verbose: bool = True,
|
||
) -> dict:
|
||
"""
|
||
Estimate the revolute joint angle for `link_name`.
|
||
|
||
Parameters
|
||
----------
|
||
fk : RobotFK
|
||
observed_mm : world marker positions from step 3
|
||
link_name : e.g. "Arm1", "Ellbow", "Arm2"
|
||
known_state : already-estimated joint values (e.g. {"x": 180.0, "y": 86.0})
|
||
The target joint variable should NOT be in this dict.
|
||
min_baseline_mm : skip pairs with model or observed baseline shorter than this
|
||
child_axis_tol_mm : TIER_FALLBACK_1 only — max perpendicular component (mm)
|
||
a child-link marker pair may have relative to the
|
||
child's OWN axis to still count as "axis-aligned"
|
||
verbose : print report
|
||
|
||
Returns
|
||
-------
|
||
dict with estimation results + updated accumulated_state
|
||
"""
|
||
|
||
# ── sanity checks ─────────────────────────────────────────
|
||
links = fk.links
|
||
if link_name not in links:
|
||
return {"status": "failed",
|
||
"reason": f"Link '{link_name}' not in robot.json"}
|
||
|
||
ji = links[link_name].get("jointToParent", {}) or {}
|
||
jtype = str(ji.get("type", "")).lower()
|
||
if jtype != "revolute":
|
||
return {"status": "failed",
|
||
"reason": f"Joint of '{link_name}' is not revolute (type='{jtype}')"}
|
||
|
||
var = str(ji.get("variable", "")).lower()
|
||
|
||
# ── FK: joint origin and axis in world ───────────────────
|
||
# Use known_state with the TARGET joint at 0
|
||
zero_state = dict(known_state)
|
||
zero_state[var] = 0.0
|
||
|
||
origin_world = fk.joint_origin_world(link_name, zero_state)
|
||
axis_world = fk.joint_axis_world(link_name, zero_state)
|
||
zero_transforms = fk.compute(zero_state) # needed to rotate model spokes into world orientation
|
||
|
||
# ── collect matched markers ───────────────────────────────
|
||
model_local: Dict[int, np.ndarray] = {}
|
||
for m in links[link_name].get("markers", []):
|
||
mid = int(m.get("id", -1))
|
||
if mid >= 0 and "position" in m:
|
||
model_local[mid] = np.array(m["position"], dtype=float)
|
||
|
||
matched = {mid: (model_local[mid], observed_mm[mid])
|
||
for mid in model_local if mid in observed_mm}
|
||
|
||
# No early return here even if `matched` is empty: TIER_FALLBACK_1
|
||
# below needs zero markers on the TARGET link itself — only on its
|
||
# child. Whether ANY tier found anything is checked once, at the end.
|
||
|
||
def _spoke(local_pos: np.ndarray) -> np.ndarray:
|
||
return _model_spoke_world(fk, zero_transforms, link_name, origin_world, local_pos)
|
||
|
||
# ── TIER 0 — PRIMARY: marker-to-marker pairs within this link ──
|
||
# Preferred whenever ≥2 markers with a usable (non axis-parallel)
|
||
# baseline are visible. Only the AXIS DIRECTION needs to be correct
|
||
# for this — not the pivot's position — so it is the more robust
|
||
# source of truth and is always tried first.
|
||
ids = sorted(matched.keys())
|
||
angle_rad_list: List[float] = []
|
||
weight_list: List[float] = []
|
||
per_pair: List[dict] = []
|
||
|
||
for id1, id2 in combinations(ids, 2):
|
||
l1, o1 = matched[id1]
|
||
l2, o2 = matched[id2]
|
||
|
||
v_model = _spoke(l2) - _spoke(l1) # model, world-oriented
|
||
v_obs = o2 - o1 # observed, world frame
|
||
|
||
angle, weight, entry = _pair_estimate(
|
||
v_model, v_obs, axis_world, (id1, id2), min_baseline_mm,
|
||
tier=TIER_PRIMARY, source_link=link_name)
|
||
per_pair.append(entry)
|
||
if angle is not None:
|
||
angle_rad_list.append(angle)
|
||
weight_list.append(weight)
|
||
|
||
tier_used = TIER_PRIMARY
|
||
children_tried: List[str] = [] # for the diagnostic message if everything fails
|
||
|
||
# ── TIER 1 — FALLBACK-1: axis-aligned pair on a CHILD link ────
|
||
# Only entered when TIER 0 produced NOT A SINGLE usable pair. Looks
|
||
# at every DIRECT child of this link and picks marker pairs whose
|
||
# local vector is parallel to the CHILD's OWN axis (see
|
||
# `_axis_aligned_pairs()`) — those are invariant to the child's own
|
||
# still-unknown rotation, so they can stand in for a TIER-0 pair.
|
||
# Like TIER 0, this needs only the axis DIRECTION, not the pivot's
|
||
# position, so it is preferred over TIER 2.
|
||
if not angle_rad_list:
|
||
tier_used = TIER_FALLBACK_1
|
||
children_tried = _child_links(fk, link_name)
|
||
|
||
for child_name in children_tried:
|
||
child = links[child_name]
|
||
child_ji = child.get("jointToParent", {}) or {}
|
||
child_axis_local = np.asarray(child_ji.get("axis", [1, 0, 0]), dtype=float)
|
||
|
||
child_model_local: Dict[int, np.ndarray] = {}
|
||
for m in child.get("markers", []):
|
||
mid = int(m.get("id", -1))
|
||
if mid >= 0 and "position" in m:
|
||
child_model_local[mid] = np.array(m["position"], dtype=float)
|
||
|
||
child_matched = {mid: (child_model_local[mid], observed_mm[mid])
|
||
for mid in child_model_local if mid in observed_mm}
|
||
if len(child_matched) < 2:
|
||
continue
|
||
|
||
aligned_pairs = _axis_aligned_pairs(
|
||
{mid: l for mid, (l, _o) in child_matched.items()},
|
||
child_axis_local, child_axis_tol_mm)
|
||
|
||
for id1, id2 in aligned_pairs:
|
||
l1, o1 = child_matched[id1]
|
||
l2, o2 = child_matched[id2]
|
||
|
||
v_model = (_model_spoke_world(fk, zero_transforms, child_name, origin_world, l2)
|
||
- _model_spoke_world(fk, zero_transforms, child_name, origin_world, l1))
|
||
v_obs = o2 - o1
|
||
|
||
angle, weight, entry = _pair_estimate(
|
||
v_model, v_obs, axis_world, (id1, id2), min_baseline_mm,
|
||
tier=TIER_FALLBACK_1, source_link=child_name)
|
||
per_pair.append(entry)
|
||
if angle is not None:
|
||
angle_rad_list.append(angle)
|
||
weight_list.append(weight)
|
||
|
||
# ── TIER 2 — FALLBACK-2: pivot + single marker on the target link ──
|
||
# Only entered when TIER 1 ALSO produced nothing (e.g. no child
|
||
# link, or its markers aren't visible/aligned either). Each
|
||
# matched marker on the TARGET link is paired with the joint
|
||
# PIVOT instead of another marker, using the rotation axis
|
||
# already known from the predecessor joints. This is the last
|
||
# resort: unlike TIER 0/1 it additionally relies on the
|
||
# predecessor joints' *values* (not just their axis direction)
|
||
# being accurate, since the pivot's world position comes from FK
|
||
# rather than being observed directly.
|
||
if not angle_rad_list:
|
||
tier_used = TIER_FALLBACK_2
|
||
for mid in ids:
|
||
l, o = matched[mid]
|
||
v_model = _spoke(l) # pivot → marker, model, world-oriented
|
||
v_obs = o - origin_world # pivot → marker, observed
|
||
|
||
angle, weight, entry = _pair_estimate(
|
||
v_model, v_obs, axis_world,
|
||
(PIVOT_FALLBACK_ID, mid), min_baseline_mm,
|
||
tier=TIER_FALLBACK_2, source_link=link_name)
|
||
per_pair.append(entry)
|
||
if angle is not None:
|
||
angle_rad_list.append(angle)
|
||
weight_list.append(weight)
|
||
|
||
if not angle_rad_list:
|
||
return {
|
||
"status": "failed",
|
||
"reason": (f"No usable pair at any tier: primary ({len(matched)} "
|
||
f"marker(s) on '{link_name}'), fallback-1 (children "
|
||
f"tried: {children_tried or 'none'}), fallback-2 "
|
||
f"(pivot, same {len(matched)} marker(s)). Try "
|
||
f"--min-baseline / --child-axis-tol, or check step-3 output."),
|
||
}
|
||
|
||
mean_deg, c_var, c_std_deg = _circular_mean_deg(
|
||
np.array(angle_rad_list), np.array(weight_list)
|
||
)
|
||
|
||
# ── verbose report ────────────────────────────────────────
|
||
if verbose:
|
||
print(f"\n── 4b: '{link_name}' angle ({var}) ──────────────────────")
|
||
print(f" Joint origin (world): [{', '.join(f'{v:.1f}' for v in origin_world)}] mm")
|
||
print(f" Joint axis (world): [{', '.join(f'{v:.3f}' for v in axis_world)}]")
|
||
print(f" Matched markers: {list(matched.keys())}")
|
||
if tier_used == TIER_FALLBACK_1:
|
||
print(f" [FALLBACK-1] No usable same-link pair — estimating from "
|
||
f"axis-aligned marker pair(s) on child link(s) "
|
||
f"{children_tried} instead.")
|
||
elif tier_used == TIER_FALLBACK_2:
|
||
print(f" [FALLBACK-2] No usable pair on this link or its children — "
|
||
f"estimating from pivot + predecessor axis instead "
|
||
f"(single-marker spokes).")
|
||
print(f" Pairs used: {len(angle_rad_list)} / {len(per_pair)}")
|
||
print(f" Angle: {mean_deg:+.2f} ° circular_σ {c_std_deg:.2f} °")
|
||
if c_std_deg > 5.0:
|
||
print(f" [WARN] high spread – step-3 errors or marker overlap")
|
||
print(f"\n Pair detail:")
|
||
for pp in per_pair:
|
||
id0, id1_ = pp["marker_ids"]
|
||
m0 = "PIVOT" if id0 == PIVOT_FALLBACK_ID else f"M{id0}"
|
||
m1 = "PIVOT" if id1_ == PIVOT_FALLBACK_ID else f"M{id1_}"
|
||
link_prefix = f"{pp['link']}:" if pp["link"] != link_name else ""
|
||
tag = {TIER_PRIMARY: "", TIER_FALLBACK_1: " [fallback-1]",
|
||
TIER_FALLBACK_2: " [fallback-2]"}.get(pp.get("tier"), "")
|
||
if pp["skipped"]:
|
||
print(f" {link_prefix}{m0}↔{link_prefix}{m1}{tag}: SKIPPED – {pp['reason']}")
|
||
else:
|
||
print(f" {link_prefix}{m0}↔{link_prefix}{m1}{tag}: "
|
||
f"{pp['angle_deg']:+7.2f}° "
|
||
f"bl_model={pp['baseline_model_mm']:.1f} "
|
||
f"bl_obs={pp['baseline_obs_mm']:.1f}")
|
||
|
||
# ── build accumulated state ───────────────────────────────
|
||
accumulated = dict(known_state)
|
||
accumulated[var] = mean_deg
|
||
|
||
return {
|
||
"status": "ok",
|
||
"link": link_name,
|
||
"joint": var,
|
||
"method": tier_used,
|
||
"joint_origin_world_mm": origin_world.tolist(),
|
||
"joint_axis_world": axis_world.tolist(),
|
||
"mean_angle_deg": mean_deg,
|
||
"circular_variance": c_var,
|
||
"circular_std_deg": c_std_deg,
|
||
"num_pairs_used": len(angle_rad_list),
|
||
"num_markers_matched": len(matched),
|
||
"per_pair": per_pair,
|
||
"accumulated_state": accumulated,
|
||
}
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────
|
||
# CLI
|
||
# ──────────────────────────────────────────────────────────────
|
||
|
||
def main() -> int:
|
||
p = argparse.ArgumentParser(
|
||
description="4b: estimate revolute joint angle for any link")
|
||
p.add_argument("--robot", type=Path, default=Path("robot.json"))
|
||
p.add_argument("--aruco", type=Path,
|
||
default=Path("aruco_positions_optimized.json"))
|
||
p.add_argument("--link", type=str, required=True,
|
||
help="Link name, e.g. Arm1, Ellbow, Arm2")
|
||
p.add_argument("--from-state", type=Path, default=None,
|
||
help="JSON from a previous 4b run (carries accumulated state)")
|
||
|
||
# Manual joint-value overrides (for use without --from-state)
|
||
for k in STATE_KEYS:
|
||
p.add_argument(f"--{k}-mm" if k in ("x", "e") else f"--{k}-deg",
|
||
type=float, default=None,
|
||
help=f"Known value for joint '{k}'"
|
||
+ (" (mm)" if k in ("x", "e") else " (deg)"))
|
||
|
||
p.add_argument("--min-baseline", type=float, default=15.0,
|
||
help="Skip pairs with perpendicular baseline < this (mm)")
|
||
p.add_argument("--child-axis-tol", type=float, default=1.0,
|
||
help="FALLBACK-1 only: max perpendicular component (mm) a "
|
||
"child-link marker pair may have relative to the "
|
||
"child's own axis to still count as axis-aligned")
|
||
p.add_argument("--output", type=Path, default=None,
|
||
help="Save result JSON (readable by next 4b as --from-state)")
|
||
args = p.parse_args()
|
||
|
||
# Assemble known state
|
||
if args.from_state:
|
||
known_state = _load_state(args.from_state)
|
||
print(f" Loaded state from {args.from_state}: {known_state}")
|
||
else:
|
||
known_state = {}
|
||
|
||
# CLI overrides
|
||
for k in STATE_KEYS:
|
||
attr = f"{k}_mm" if k in ("x", "e") else f"{k}_deg"
|
||
v = getattr(args, attr, None)
|
||
if v is not None:
|
||
known_state[k] = float(v)
|
||
|
||
fk = RobotFK.from_file(args.robot)
|
||
aruco_json = _load_json(args.aruco)
|
||
observed_mm = _load_observed_mm(aruco_json)
|
||
|
||
result = estimate_revolute_angle(
|
||
fk = fk,
|
||
observed_mm = observed_mm,
|
||
link_name = args.link,
|
||
known_state = known_state,
|
||
min_baseline_mm = args.min_baseline,
|
||
child_axis_tol_mm = args.child_axis_tol,
|
||
verbose = True,
|
||
)
|
||
|
||
if args.output and result["status"] == "ok":
|
||
args.output.parent.mkdir(parents=True, exist_ok=True)
|
||
with args.output.open("w", encoding="utf-8") as f:
|
||
json.dump(result, f, indent=2)
|
||
print(f"\n Saved → {args.output}")
|
||
|
||
return 0 if result["status"] == "ok" else 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|