Files
appRobotHoming/scripts/4b_revolute_angle.py
2026-06-16 15:28:14 +02:00

601 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
4b_revolute_angle.py
--------------------
Generic revolute-joint angle estimator.
For each movable link (Arm1, Ellbow, Arm2 …) whose joint type is 'revolute',
this script estimates the rotation angle using one of three methods, tried
in order — each next TIER is a pure fallback, only used when the previous
one found NOT A SINGLE usable (non axis-degenerate) pair:
TIER 0 — PRIMARY: for every PAIR (m1, m2) of markers belonging to the
target link itself:
v_model = spoke_world(m2) - spoke_world(m1) (model, world-oriented)
v_obs = world_pos_m2 - world_pos_m1 (observed, world frame)
where spoke_world(m) rotates the marker's local `position` (from
robot.json) through the already-known PARENT joints via FK, so it is
expressed in world orientation at this joint's own angle = 0 — exactly
the frame v_obs lives in.
Both vectors are projected perpendicular to the joint axis (in world frame),
and the signed angle from v_model_perp to v_obs_perp is measured.
The joint axis in world frame is computed via FK using the already-known
joint values (from 4a, 4b-prev …), so it is ALWAYS correct — even for
deeply-nested joints whose world-frame axis differs from their local axis.
Pair weights = baseline_model × baseline_obs (longer baselines → more reliable).
TIER 1 — FALLBACK-1 (child-axis): only entered when TIER 0 has nothing.
Uses a PAIR of markers on the DIRECT CHILD link instead of the target
link, picking only pairs whose LOCAL connecting vector is (nearly)
parallel to the CHILD's OWN joint axis. A rotation about an axis never
moves a vector parallel to that very axis, so such a pair is invariant
to the child's own (still-unknown) rotation and transforms purely under
the chain up to and including the TARGET joint — exactly like a TIER-0
pair, just sourced one link further down. Like TIER 0 (and unlike
TIER 2), this only needs the axis DIRECTION to be correct, not the
pivot's position, so it is preferred over TIER 2 whenever available.
Example: Ellbow (axis X) ← Arm2 markers 144/148 or 143/146 (Arm2's own
axis Y, ⟂ to X, both pairs exactly axis-aligned in Arm2's local frame).
TIER 2 — FALLBACK-2 (pivot): only entered when TIER 1 ALSO has nothing
(e.g. no markers visible at all besides one on the target link itself,
or no child link exists). The joint PIVOT itself stands in for a
missing second marker, i.e. the "pair" becomes (pivot, m1). This needs
only ONE matched marker on the target link, but — unlike TIER 0/1 —
its accuracy additionally depends on the already-estimated PARENT joint
*values* being correct (not just their axis direction), since the
pivot's world position comes from FK. See `PIVOT_FALLBACK_ID` /
`TIER_*` / `tier_used` in the code.
How to use sequentially
-----------------------
Run 4b once per revolute joint, from root to tip:
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm1 --x-mm 180
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Ellbow --from-state state.json
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm2 --from-state state.json
The --from-state flag reads the accumulated joint state JSON so you don't have
to pass every preceding value on the command line.
Output JSON
-----------
{
"link": "Arm1",
"joint": "y",
"method": "primary", // or "fallback_1_child_axis" / "fallback_2_pivot" — see TIERs above
"mean_angle_deg": 86.3,
"circular_std_deg": 0.7,
"num_pairs": 6,
"joint_origin_world_mm": [290, 108, 61],
"joint_axis_world": [-1, 0, 0],
...
}
The file also contains the full accumulated state so the next 4b invocation
can read it via --from-state.
"""
from __future__ import annotations
import argparse
import json
import math
from itertools import combinations
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple
import numpy as np
from robot_fk import RobotFK
STATE_KEYS = ("x", "y", "z", "a", "b", "c", "e")
# Sentinel "marker id" used in `per_pair` reports for the joint pivot.
# Only ever appears in TIER_FALLBACK_2 entries (pivot vs. a single marker)
# — see the TIER_FALLBACK_2 block inside `estimate_revolute_angle()` below.
PIVOT_FALLBACK_ID = -1
# Tier labels — reported in `per_pair[].tier` and the top-level `method`
# field, so it's always traceable which method actually produced a given
# estimate. Tried in this order; each next one is a pure fallback (see
# module docstring above for what each tier means and why it's ordered
# this way).
TIER_PRIMARY = "primary" # pair of markers on the target link itself
TIER_FALLBACK_1 = "fallback_1_child_axis" # pair on a CHILD link, aligned with the child's OWN axis
TIER_FALLBACK_2 = "fallback_2_pivot" # single marker on the target link vs. the joint pivot
# ──────────────────────────────────────────────────────────────
# I/O
# ──────────────────────────────────────────────────────────────
def _load_json(path: Path) -> dict:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def _load_observed_mm(aruco_json: dict) -> Dict[int, np.ndarray]:
result: Dict[int, np.ndarray] = {}
for m in aruco_json.get("markers", []):
mid = int(m.get("marker_id", m.get("id", -1)))
if mid < 0:
continue
if "position_mm" in m:
result[mid] = np.array(m["position_mm"], dtype=float)
elif "position_m" in m:
result[mid] = np.array(m["position_m"], dtype=float) * 1000.0
return result
def _load_state(path: Path) -> Dict[str, float]:
"""Load accumulated joint state from a previous 4b output JSON."""
raw = _load_json(path)
state = raw.get("accumulated_state", raw.get("state", {}))
return {k: float(v) for k, v in state.items() if k in STATE_KEYS}
# ──────────────────────────────────────────────────────────────
# Angle maths
# ──────────────────────────────────────────────────────────────
def _project_perp(v: np.ndarray, axis: np.ndarray) -> np.ndarray:
"""Remove the component of v along axis."""
a = axis / (np.linalg.norm(axis) + 1e-15)
return v - np.dot(v, a) * a
def _signed_angle_rad(v_from: np.ndarray, v_to: np.ndarray,
axis: np.ndarray) -> float:
"""Signed angle rotating v_from onto v_to around axis (radians)."""
a = axis / (np.linalg.norm(axis) + 1e-15)
return math.atan2(float(np.dot(a, np.cross(v_from, v_to))),
float(np.dot(v_from, v_to)))
def _wrap(angle: float) -> float:
"""Wrap to (−π, π]."""
return (angle + math.pi) % (2.0 * math.pi) - math.pi
def _circular_mean_deg(angles_rad: np.ndarray,
weights: np.ndarray) -> Tuple[float, float, float]:
"""Returns mean_deg, circular_variance, circular_std_deg."""
w = np.clip(weights, 0, None)
if w.sum() < 1e-15:
w = np.ones_like(w)
s, c = np.sum(w * np.sin(angles_rad)), np.sum(w * np.cos(angles_rad))
mean = math.atan2(s, c)
R = math.sqrt(s*s + c*c) / w.sum()
R = float(np.clip(R, 0, 1))
c_var = 1.0 - R
c_std = math.sqrt(max(0.0, -2.0 * math.log(max(R, 1e-15))))
return math.degrees(mean), c_var, math.degrees(c_std)
# ──────────────────────────────────────────────────────────────
# Shared spoke/pair math
# (used by BOTH the primary marker-pair method and the pivot FALLBACK)
# ──────────────────────────────────────────────────────────────
def _model_spoke_world(fk: RobotFK,
zero_transforms: Dict[str, np.ndarray],
link_name: str,
origin_world: np.ndarray,
local_pos: np.ndarray) -> np.ndarray:
"""
Vector from the joint PIVOT to a marker, in WORLD ORIENTATION, as it
would be if this joint's own angle were 0 (only the already-known
PARENT rotations applied).
This is what `v_model` must be expressed as before comparing it to
`axis_world` / an observed vector: the raw `position` field from
robot.json lives in the link's own pre-rotation local frame and is
NOT yet rotated into world orientation by the parent chain. Skipping
this rotation silently biases the estimated angle by (roughly) the
parent joint's own angle — invisible whenever the parent chain has
no rotation yet (e.g. Arm1, whose parent 'Base' is purely linear),
but wrong for anything further down the chain (Ellbow, Arm2, Hand …).
"""
p0 = fk.marker_world(zero_transforms, link_name, local_pos)
return p0 - origin_world
def _pair_estimate(v_model: np.ndarray,
v_obs: np.ndarray,
axis_world: np.ndarray,
marker_ids: Tuple[int, int],
min_baseline_mm: float,
tier: str,
source_link: str) -> Tuple[Optional[float], Optional[float], dict]:
"""
Project model/observed vectors perpendicular to the joint axis and
derive one angle estimate from them. Returns (angle_rad, weight,
per_pair_entry) — angle_rad/weight are None when skipped (baseline
too short).
`tier` (one of the TIER_* constants) and `source_link` (the link the
two marker_ids actually belong to — may differ from the target link
for TIER_FALLBACK_1) are purely descriptive, so callers/reports can
always tell where a given estimate came from.
"""
v_model_perp = _project_perp(v_model, axis_world)
v_obs_perp = _project_perp(v_obs, axis_world)
bl_model = float(np.linalg.norm(v_model_perp))
bl_obs = float(np.linalg.norm(v_obs_perp))
if bl_model < min_baseline_mm or bl_obs < min_baseline_mm:
return None, None, {
"marker_ids": list(marker_ids),
"link": source_link,
"tier": tier,
"skipped": True,
"reason": f"bl_model={bl_model:.1f} bl_obs={bl_obs:.1f} < {min_baseline_mm}",
}
angle = _wrap(_signed_angle_rad(v_model_perp, v_obs_perp, axis_world))
weight = bl_model * bl_obs
entry = {
"marker_ids": list(marker_ids),
"link": source_link,
"tier": tier,
"skipped": False,
"angle_deg": math.degrees(angle),
"baseline_model_mm": bl_model,
"baseline_obs_mm": bl_obs,
"weight": weight,
}
return angle, weight, entry
def _child_links(fk: RobotFK, link_name: str) -> List[str]:
"""Direct children of `link_name` in the kinematic tree (robot.json `parent` field)."""
return [n for n, d in fk.links.items() if d.get("parent") == link_name]
def _axis_aligned_pairs(local_positions: Dict[int, np.ndarray],
own_axis_local: np.ndarray,
tol_mm: float) -> List[Tuple[int, int]]:
"""
Among marker pairs on a CHILD link, return those whose LOCAL connecting
vector is (nearly) parallel to the CHILD's OWN joint axis — i.e. the
component perpendicular to that axis is within `tol_mm` of zero.
Such a pair is invariant to the child's own (still-unknown) rotation
(a rotation about an axis never moves a vector parallel to that same
axis), which is exactly what TIER_FALLBACK_1 relies on. Pairs that
fail this check are skipped here — using them would silently mix in
the child's unknown rotation and bias the result (see module
docstring / TIER 1).
"""
a_hat = own_axis_local / (np.linalg.norm(own_axis_local) + 1e-15)
good: List[Tuple[int, int]] = []
for id1, id2 in combinations(sorted(local_positions.keys()), 2):
v_local = local_positions[id2] - local_positions[id1]
v_radial = v_local - np.dot(v_local, a_hat) * a_hat
if float(np.linalg.norm(v_radial)) <= tol_mm:
good.append((id1, id2))
return good
# ──────────────────────────────────────────────────────────────
# Core estimator (generic — works for any revolute joint)
# ──────────────────────────────────────────────────────────────
def estimate_revolute_angle(
fk: RobotFK,
observed_mm: Dict[int, np.ndarray],
link_name: str,
known_state: Dict[str, float],
min_baseline_mm: float = 15.0,
child_axis_tol_mm: float = 1.0,
verbose: bool = True,
) -> dict:
"""
Estimate the revolute joint angle for `link_name`.
Parameters
----------
fk : RobotFK
observed_mm : world marker positions from step 3
link_name : e.g. "Arm1", "Ellbow", "Arm2"
known_state : already-estimated joint values (e.g. {"x": 180.0, "y": 86.0})
The target joint variable should NOT be in this dict.
min_baseline_mm : skip pairs with model or observed baseline shorter than this
child_axis_tol_mm : TIER_FALLBACK_1 only — max perpendicular component (mm)
a child-link marker pair may have relative to the
child's OWN axis to still count as "axis-aligned"
verbose : print report
Returns
-------
dict with estimation results + updated accumulated_state
"""
# ── sanity checks ─────────────────────────────────────────
links = fk.links
if link_name not in links:
return {"status": "failed",
"reason": f"Link '{link_name}' not in robot.json"}
ji = links[link_name].get("jointToParent", {}) or {}
jtype = str(ji.get("type", "")).lower()
if jtype != "revolute":
return {"status": "failed",
"reason": f"Joint of '{link_name}' is not revolute (type='{jtype}')"}
var = str(ji.get("variable", "")).lower()
# ── FK: joint origin and axis in world ───────────────────
# Use known_state with the TARGET joint at 0
zero_state = dict(known_state)
zero_state[var] = 0.0
origin_world = fk.joint_origin_world(link_name, zero_state)
axis_world = fk.joint_axis_world(link_name, zero_state)
zero_transforms = fk.compute(zero_state) # needed to rotate model spokes into world orientation
# ── collect matched markers ───────────────────────────────
model_local: Dict[int, np.ndarray] = {}
for m in links[link_name].get("markers", []):
mid = int(m.get("id", -1))
if mid >= 0 and "position" in m:
model_local[mid] = np.array(m["position"], dtype=float)
matched = {mid: (model_local[mid], observed_mm[mid])
for mid in model_local if mid in observed_mm}
# No early return here even if `matched` is empty: TIER_FALLBACK_1
# below needs zero markers on the TARGET link itself — only on its
# child. Whether ANY tier found anything is checked once, at the end.
def _spoke(local_pos: np.ndarray) -> np.ndarray:
return _model_spoke_world(fk, zero_transforms, link_name, origin_world, local_pos)
# ── TIER 0 — PRIMARY: marker-to-marker pairs within this link ──
# Preferred whenever ≥2 markers with a usable (non axis-parallel)
# baseline are visible. Only the AXIS DIRECTION needs to be correct
# for this — not the pivot's position — so it is the more robust
# source of truth and is always tried first.
ids = sorted(matched.keys())
angle_rad_list: List[float] = []
weight_list: List[float] = []
per_pair: List[dict] = []
for id1, id2 in combinations(ids, 2):
l1, o1 = matched[id1]
l2, o2 = matched[id2]
v_model = _spoke(l2) - _spoke(l1) # model, world-oriented
v_obs = o2 - o1 # observed, world frame
angle, weight, entry = _pair_estimate(
v_model, v_obs, axis_world, (id1, id2), min_baseline_mm,
tier=TIER_PRIMARY, source_link=link_name)
per_pair.append(entry)
if angle is not None:
angle_rad_list.append(angle)
weight_list.append(weight)
tier_used = TIER_PRIMARY
children_tried: List[str] = [] # for the diagnostic message if everything fails
# ── TIER 1 — FALLBACK-1: axis-aligned pair on a CHILD link ────
# Only entered when TIER 0 produced NOT A SINGLE usable pair. Looks
# at every DIRECT child of this link and picks marker pairs whose
# local vector is parallel to the CHILD's OWN axis (see
# `_axis_aligned_pairs()`) — those are invariant to the child's own
# still-unknown rotation, so they can stand in for a TIER-0 pair.
# Like TIER 0, this needs only the axis DIRECTION, not the pivot's
# position, so it is preferred over TIER 2.
if not angle_rad_list:
tier_used = TIER_FALLBACK_1
children_tried = _child_links(fk, link_name)
for child_name in children_tried:
child = links[child_name]
child_ji = child.get("jointToParent", {}) or {}
child_axis_local = np.asarray(child_ji.get("axis", [1, 0, 0]), dtype=float)
child_model_local: Dict[int, np.ndarray] = {}
for m in child.get("markers", []):
mid = int(m.get("id", -1))
if mid >= 0 and "position" in m:
child_model_local[mid] = np.array(m["position"], dtype=float)
child_matched = {mid: (child_model_local[mid], observed_mm[mid])
for mid in child_model_local if mid in observed_mm}
if len(child_matched) < 2:
continue
aligned_pairs = _axis_aligned_pairs(
{mid: l for mid, (l, _o) in child_matched.items()},
child_axis_local, child_axis_tol_mm)
for id1, id2 in aligned_pairs:
l1, o1 = child_matched[id1]
l2, o2 = child_matched[id2]
v_model = (_model_spoke_world(fk, zero_transforms, child_name, origin_world, l2)
- _model_spoke_world(fk, zero_transforms, child_name, origin_world, l1))
v_obs = o2 - o1
angle, weight, entry = _pair_estimate(
v_model, v_obs, axis_world, (id1, id2), min_baseline_mm,
tier=TIER_FALLBACK_1, source_link=child_name)
per_pair.append(entry)
if angle is not None:
angle_rad_list.append(angle)
weight_list.append(weight)
# ── TIER 2 — FALLBACK-2: pivot + single marker on the target link ──
# Only entered when TIER 1 ALSO produced nothing (e.g. no child
# link, or its markers aren't visible/aligned either). Each
# matched marker on the TARGET link is paired with the joint
# PIVOT instead of another marker, using the rotation axis
# already known from the predecessor joints. This is the last
# resort: unlike TIER 0/1 it additionally relies on the
# predecessor joints' *values* (not just their axis direction)
# being accurate, since the pivot's world position comes from FK
# rather than being observed directly.
if not angle_rad_list:
tier_used = TIER_FALLBACK_2
for mid in ids:
l, o = matched[mid]
v_model = _spoke(l) # pivot → marker, model, world-oriented
v_obs = o - origin_world # pivot → marker, observed
angle, weight, entry = _pair_estimate(
v_model, v_obs, axis_world,
(PIVOT_FALLBACK_ID, mid), min_baseline_mm,
tier=TIER_FALLBACK_2, source_link=link_name)
per_pair.append(entry)
if angle is not None:
angle_rad_list.append(angle)
weight_list.append(weight)
if not angle_rad_list:
return {
"status": "failed",
"reason": (f"No usable pair at any tier: primary ({len(matched)} "
f"marker(s) on '{link_name}'), fallback-1 (children "
f"tried: {children_tried or 'none'}), fallback-2 "
f"(pivot, same {len(matched)} marker(s)). Try "
f"--min-baseline / --child-axis-tol, or check step-3 output."),
}
mean_deg, c_var, c_std_deg = _circular_mean_deg(
np.array(angle_rad_list), np.array(weight_list)
)
# ── verbose report ────────────────────────────────────────
if verbose:
print(f"\n── 4b: '{link_name}' angle ({var}) ──────────────────────")
print(f" Joint origin (world): [{', '.join(f'{v:.1f}' for v in origin_world)}] mm")
print(f" Joint axis (world): [{', '.join(f'{v:.3f}' for v in axis_world)}]")
print(f" Matched markers: {list(matched.keys())}")
if tier_used == TIER_FALLBACK_1:
print(f" [FALLBACK-1] No usable same-link pair — estimating from "
f"axis-aligned marker pair(s) on child link(s) "
f"{children_tried} instead.")
elif tier_used == TIER_FALLBACK_2:
print(f" [FALLBACK-2] No usable pair on this link or its children — "
f"estimating from pivot + predecessor axis instead "
f"(single-marker spokes).")
print(f" Pairs used: {len(angle_rad_list)} / {len(per_pair)}")
print(f" Angle: {mean_deg:+.2f} ° circular_σ {c_std_deg:.2f} °")
if c_std_deg > 5.0:
print(f" [WARN] high spread step-3 errors or marker overlap")
print(f"\n Pair detail:")
for pp in per_pair:
id0, id1_ = pp["marker_ids"]
m0 = "PIVOT" if id0 == PIVOT_FALLBACK_ID else f"M{id0}"
m1 = "PIVOT" if id1_ == PIVOT_FALLBACK_ID else f"M{id1_}"
link_prefix = f"{pp['link']}:" if pp["link"] != link_name else ""
tag = {TIER_PRIMARY: "", TIER_FALLBACK_1: " [fallback-1]",
TIER_FALLBACK_2: " [fallback-2]"}.get(pp.get("tier"), "")
if pp["skipped"]:
print(f" {link_prefix}{m0}{link_prefix}{m1}{tag}: SKIPPED {pp['reason']}")
else:
print(f" {link_prefix}{m0}{link_prefix}{m1}{tag}: "
f"{pp['angle_deg']:+7.2f}° "
f"bl_model={pp['baseline_model_mm']:.1f} "
f"bl_obs={pp['baseline_obs_mm']:.1f}")
# ── build accumulated state ───────────────────────────────
accumulated = dict(known_state)
accumulated[var] = mean_deg
return {
"status": "ok",
"link": link_name,
"joint": var,
"method": tier_used,
"joint_origin_world_mm": origin_world.tolist(),
"joint_axis_world": axis_world.tolist(),
"mean_angle_deg": mean_deg,
"circular_variance": c_var,
"circular_std_deg": c_std_deg,
"num_pairs_used": len(angle_rad_list),
"num_markers_matched": len(matched),
"per_pair": per_pair,
"accumulated_state": accumulated,
}
# ──────────────────────────────────────────────────────────────
# CLI
# ──────────────────────────────────────────────────────────────
def main() -> int:
p = argparse.ArgumentParser(
description="4b: estimate revolute joint angle for any link")
p.add_argument("--robot", type=Path, default=Path("robot.json"))
p.add_argument("--aruco", type=Path,
default=Path("aruco_positions_optimized.json"))
p.add_argument("--link", type=str, required=True,
help="Link name, e.g. Arm1, Ellbow, Arm2")
p.add_argument("--from-state", type=Path, default=None,
help="JSON from a previous 4b run (carries accumulated state)")
# Manual joint-value overrides (for use without --from-state)
for k in STATE_KEYS:
p.add_argument(f"--{k}-mm" if k in ("x", "e") else f"--{k}-deg",
type=float, default=None,
help=f"Known value for joint '{k}'"
+ (" (mm)" if k in ("x", "e") else " (deg)"))
p.add_argument("--min-baseline", type=float, default=15.0,
help="Skip pairs with perpendicular baseline < this (mm)")
p.add_argument("--child-axis-tol", type=float, default=1.0,
help="FALLBACK-1 only: max perpendicular component (mm) a "
"child-link marker pair may have relative to the "
"child's own axis to still count as axis-aligned")
p.add_argument("--output", type=Path, default=None,
help="Save result JSON (readable by next 4b as --from-state)")
args = p.parse_args()
# Assemble known state
if args.from_state:
known_state = _load_state(args.from_state)
print(f" Loaded state from {args.from_state}: {known_state}")
else:
known_state = {}
# CLI overrides
for k in STATE_KEYS:
attr = f"{k}_mm" if k in ("x", "e") else f"{k}_deg"
v = getattr(args, attr, None)
if v is not None:
known_state[k] = float(v)
fk = RobotFK.from_file(args.robot)
aruco_json = _load_json(args.aruco)
observed_mm = _load_observed_mm(aruco_json)
result = estimate_revolute_angle(
fk = fk,
observed_mm = observed_mm,
link_name = args.link,
known_state = known_state,
min_baseline_mm = args.min_baseline,
child_axis_tol_mm = args.child_axis_tol,
verbose = True,
)
if args.output and result["status"] == "ok":
args.output.parent.mkdir(parents=True, exist_ok=True)
with args.output.open("w", encoding="utf-8") as f:
json.dump(result, f, indent=2)
print(f"\n Saved → {args.output}")
return 0 if result["status"] == "ok" else 1
if __name__ == "__main__":
raise SystemExit(main())