#!/usr/bin/env python3 """ 4b_revolute_angle.py -------------------- Generic revolute-joint angle estimator. For each movable link (Arm1, Ellbow, Arm2 …) whose joint type is 'revolute', this script estimates the rotation angle using the pairwise-vector method (PRIMARY), with a single-marker pivot method as FALLBACK: PRIMARY — for every PAIR (m1, m2) of markers belonging to the target link: v_model = spoke_world(m2) - spoke_world(m1) (model, world-oriented) v_obs = world_pos_m2 - world_pos_m1 (observed, world frame) where spoke_world(m) rotates the marker's local `position` (from robot.json) through the already-known PARENT joints via FK, so it is expressed in world orientation at this joint's own angle = 0 — exactly the frame v_obs lives in. Both vectors are projected perpendicular to the joint axis (in world frame), and the signed angle from v_model_perp to v_obs_perp is measured. The joint axis in world frame is computed via FK using the already-known joint values (from 4a, 4b-prev …), so it is ALWAYS correct — even for deeply-nested joints whose world-frame axis differs from their local axis. Pair weights = baseline_model × baseline_obs (longer baselines → more reliable). FALLBACK — only used when the PRIMARY method has no usable pair at all (e.g. just one marker visible, or every visible pair happens to lie parallel to the joint axis, as for two markers spaced along a forearm): the joint PIVOT itself stands in for the missing second marker, i.e. the "pair" becomes (pivot, m1). This needs only ONE matched marker, but — unlike the primary method — its accuracy additionally depends on the already-estimated PARENT joint *values* being correct (not just their axis direction), since the pivot's world position comes from FK. See `PIVOT_FALLBACK_ID` / `used_fallback` in the code. How to use sequentially ----------------------- Run 4b once per revolute joint, from root to tip: python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm1 --x-mm 180 python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Ellbow --from-state state.json python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm2 --from-state state.json The --from-state flag reads the accumulated joint state JSON so you don't have to pass every preceding value on the command line. Output JSON ----------- { "link": "Arm1", "joint": "y", "method": "marker_pair", // or "pivot_fallback" — see FALLBACK above "mean_angle_deg": 86.3, "circular_std_deg": 0.7, "num_pairs": 6, "joint_origin_world_mm": [290, 108, 61], "joint_axis_world": [-1, 0, 0], ... } The file also contains the full accumulated state so the next 4b invocation can read it via --from-state. """ from __future__ import annotations import argparse import json import math from itertools import combinations from pathlib import Path from typing import Dict, List, Optional, Sequence, Tuple import numpy as np from robot_fk import RobotFK STATE_KEYS = ("x", "y", "z", "a", "b", "c", "e") # Sentinel "marker id" used in `per_pair` reports for the joint pivot. # Only ever appears when the FALLBACK path (pivot vs. a single marker) # was used instead of a real marker-to-marker pair — see the # `used_fallback` block inside `estimate_revolute_angle()` below. PIVOT_FALLBACK_ID = -1 # ────────────────────────────────────────────────────────────── # I/O # ────────────────────────────────────────────────────────────── def _load_json(path: Path) -> dict: with path.open("r", encoding="utf-8") as f: return json.load(f) def _load_observed_mm(aruco_json: dict) -> Dict[int, np.ndarray]: result: Dict[int, np.ndarray] = {} for m in aruco_json.get("markers", []): mid = int(m.get("marker_id", m.get("id", -1))) if mid < 0: continue if "position_mm" in m: result[mid] = np.array(m["position_mm"], dtype=float) elif "position_m" in m: result[mid] = np.array(m["position_m"], dtype=float) * 1000.0 return result def _load_state(path: Path) -> Dict[str, float]: """Load accumulated joint state from a previous 4b output JSON.""" raw = _load_json(path) state = raw.get("accumulated_state", raw.get("state", {})) return {k: float(v) for k, v in state.items() if k in STATE_KEYS} # ────────────────────────────────────────────────────────────── # Angle maths # ────────────────────────────────────────────────────────────── def _project_perp(v: np.ndarray, axis: np.ndarray) -> np.ndarray: """Remove the component of v along axis.""" a = axis / (np.linalg.norm(axis) + 1e-15) return v - np.dot(v, a) * a def _signed_angle_rad(v_from: np.ndarray, v_to: np.ndarray, axis: np.ndarray) -> float: """Signed angle rotating v_from onto v_to around axis (radians).""" a = axis / (np.linalg.norm(axis) + 1e-15) return math.atan2(float(np.dot(a, np.cross(v_from, v_to))), float(np.dot(v_from, v_to))) def _wrap(angle: float) -> float: """Wrap to (−π, π].""" return (angle + math.pi) % (2.0 * math.pi) - math.pi def _circular_mean_deg(angles_rad: np.ndarray, weights: np.ndarray) -> Tuple[float, float, float]: """Returns mean_deg, circular_variance, circular_std_deg.""" w = np.clip(weights, 0, None) if w.sum() < 1e-15: w = np.ones_like(w) s, c = np.sum(w * np.sin(angles_rad)), np.sum(w * np.cos(angles_rad)) mean = math.atan2(s, c) R = math.sqrt(s*s + c*c) / w.sum() R = float(np.clip(R, 0, 1)) c_var = 1.0 - R c_std = math.sqrt(max(0.0, -2.0 * math.log(max(R, 1e-15)))) return math.degrees(mean), c_var, math.degrees(c_std) # ────────────────────────────────────────────────────────────── # Shared spoke/pair math # (used by BOTH the primary marker-pair method and the pivot FALLBACK) # ────────────────────────────────────────────────────────────── def _model_spoke_world(fk: RobotFK, zero_transforms: Dict[str, np.ndarray], link_name: str, origin_world: np.ndarray, local_pos: np.ndarray) -> np.ndarray: """ Vector from the joint PIVOT to a marker, in WORLD ORIENTATION, as it would be if this joint's own angle were 0 (only the already-known PARENT rotations applied). This is what `v_model` must be expressed as before comparing it to `axis_world` / an observed vector: the raw `position` field from robot.json lives in the link's own pre-rotation local frame and is NOT yet rotated into world orientation by the parent chain. Skipping this rotation silently biases the estimated angle by (roughly) the parent joint's own angle — invisible whenever the parent chain has no rotation yet (e.g. Arm1, whose parent 'Base' is purely linear), but wrong for anything further down the chain (Ellbow, Arm2, Hand …). """ p0 = fk.marker_world(zero_transforms, link_name, local_pos) return p0 - origin_world def _pair_estimate(v_model: np.ndarray, v_obs: np.ndarray, axis_world: np.ndarray, marker_ids: Tuple[int, int], min_baseline_mm: float, fallback: bool) -> Tuple[Optional[float], Optional[float], dict]: """ Project model/observed vectors perpendicular to the joint axis and derive one angle estimate from them. Returns (angle_rad, weight, per_pair_entry) — angle_rad/weight are None when skipped (baseline too short). `fallback=True` marks entries produced by the pivot FALLBACK (one of the two "markers" is actually the joint pivot, see PIVOT_FALLBACK_ID) so callers/reports can always tell primary and fallback data apart. """ v_model_perp = _project_perp(v_model, axis_world) v_obs_perp = _project_perp(v_obs, axis_world) bl_model = float(np.linalg.norm(v_model_perp)) bl_obs = float(np.linalg.norm(v_obs_perp)) if bl_model < min_baseline_mm or bl_obs < min_baseline_mm: return None, None, { "marker_ids": list(marker_ids), "fallback": fallback, "skipped": True, "reason": f"bl_model={bl_model:.1f} bl_obs={bl_obs:.1f} < {min_baseline_mm}", } angle = _wrap(_signed_angle_rad(v_model_perp, v_obs_perp, axis_world)) weight = bl_model * bl_obs entry = { "marker_ids": list(marker_ids), "fallback": fallback, "skipped": False, "angle_deg": math.degrees(angle), "baseline_model_mm": bl_model, "baseline_obs_mm": bl_obs, "weight": weight, } return angle, weight, entry # ────────────────────────────────────────────────────────────── # Core estimator (generic — works for any revolute joint) # ────────────────────────────────────────────────────────────── def estimate_revolute_angle( fk: RobotFK, observed_mm: Dict[int, np.ndarray], link_name: str, known_state: Dict[str, float], min_baseline_mm: float = 15.0, verbose: bool = True, ) -> dict: """ Estimate the revolute joint angle for `link_name`. Parameters ---------- fk : RobotFK observed_mm : world marker positions from step 3 link_name : e.g. "Arm1", "Ellbow", "Arm2" known_state : already-estimated joint values (e.g. {"x": 180.0, "y": 86.0}) The target joint variable should NOT be in this dict. min_baseline_mm : skip pairs with model or observed baseline shorter than this verbose : print report Returns ------- dict with estimation results + updated accumulated_state """ # ── sanity checks ───────────────────────────────────────── links = fk.links if link_name not in links: return {"status": "failed", "reason": f"Link '{link_name}' not in robot.json"} ji = links[link_name].get("jointToParent", {}) or {} jtype = str(ji.get("type", "")).lower() if jtype != "revolute": return {"status": "failed", "reason": f"Joint of '{link_name}' is not revolute (type='{jtype}')"} var = str(ji.get("variable", "")).lower() # ── FK: joint origin and axis in world ─────────────────── # Use known_state with the TARGET joint at 0 zero_state = dict(known_state) zero_state[var] = 0.0 origin_world = fk.joint_origin_world(link_name, zero_state) axis_world = fk.joint_axis_world(link_name, zero_state) zero_transforms = fk.compute(zero_state) # needed to rotate model spokes into world orientation # ── collect matched markers ─────────────────────────────── model_local: Dict[int, np.ndarray] = {} for m in links[link_name].get("markers", []): mid = int(m.get("id", -1)) if mid >= 0 and "position" in m: model_local[mid] = np.array(m["position"], dtype=float) matched = {mid: (model_local[mid], observed_mm[mid]) for mid in model_local if mid in observed_mm} # Only 1 matched marker is enough to *attempt* an estimate — the # PIVOT FALLBACK below can work with a single marker. With 0 there # is nothing to go on at all. if len(matched) < 1: return { "status": "failed", "reason": (f"Need ≥1 matched marker, found {len(matched)}. " f"Model marker IDs: {list(model_local.keys())}"), } def _spoke(local_pos: np.ndarray) -> np.ndarray: return _model_spoke_world(fk, zero_transforms, link_name, origin_world, local_pos) # ── PRIMARY: marker-to-marker pairs within this link ────── # Preferred whenever ≥2 markers with a usable (non axis-parallel) # baseline are visible. Only the AXIS DIRECTION needs to be correct # for this — not the pivot's position — so it is the more robust # source of truth and is always tried first. ids = sorted(matched.keys()) angle_rad_list: List[float] = [] weight_list: List[float] = [] per_pair: List[dict] = [] for id1, id2 in combinations(ids, 2): l1, o1 = matched[id1] l2, o2 = matched[id2] v_model = _spoke(l2) - _spoke(l1) # model, world-oriented v_obs = o2 - o1 # observed, world frame angle, weight, entry = _pair_estimate( v_model, v_obs, axis_world, (id1, id2), min_baseline_mm, fallback=False) per_pair.append(entry) if angle is not None: angle_rad_list.append(angle) weight_list.append(weight) # ── FALLBACK: pivot + single marker, axis from predecessor ──── # Only entered when the PRIMARY method above produced NOT A SINGLE # usable pair (e.g. only one marker visible at all, or every visible # pair happens to lie parallel to the joint axis — as for two # markers spaced along a forearm). Each matched marker is paired # with the joint PIVOT instead of another marker, using the # rotation axis already known from the predecessor joints. # This is strictly a fallback: compared to a real 2-marker baseline # it additionally relies on the predecessor joints' *values* (not # just their axis direction) being accurate, since the pivot's # world position is computed via FK rather than observed directly. used_fallback = False if not angle_rad_list: used_fallback = True for mid in ids: l, o = matched[mid] v_model = _spoke(l) # pivot → marker, model, world-oriented v_obs = o - origin_world # pivot → marker, observed angle, weight, entry = _pair_estimate( v_model, v_obs, axis_world, (PIVOT_FALLBACK_ID, mid), min_baseline_mm, fallback=True) per_pair.append(entry) if angle is not None: angle_rad_list.append(angle) weight_list.append(weight) if not angle_rad_list: return { "status": "failed", "reason": "All pairs below min_baseline_mm, including the " "pivot fallback. Try --min-baseline 5 or check " "step-3 output.", } mean_deg, c_var, c_std_deg = _circular_mean_deg( np.array(angle_rad_list), np.array(weight_list) ) # ── verbose report ──────────────────────────────────────── if verbose: print(f"\n── 4b: '{link_name}' angle ({var}) ──────────────────────") print(f" Joint origin (world): [{', '.join(f'{v:.1f}' for v in origin_world)}] mm") print(f" Joint axis (world): [{', '.join(f'{v:.3f}' for v in axis_world)}]") print(f" Matched markers: {list(matched.keys())}") if used_fallback: print(f" [FALLBACK] No usable marker-marker pair — estimating from " f"pivot + predecessor axis instead (single-marker spokes).") print(f" Pairs used: {len(angle_rad_list)} / {len(per_pair)}") print(f" Angle: {mean_deg:+.2f} ° circular_σ {c_std_deg:.2f} °") if c_std_deg > 5.0: print(f" [WARN] high spread – step-3 errors or marker overlap") print(f"\n Pair detail:") for pp in per_pair: id0, id1_ = pp["marker_ids"] m0 = "PIVOT" if id0 == PIVOT_FALLBACK_ID else f"M{id0}" m1 = "PIVOT" if id1_ == PIVOT_FALLBACK_ID else f"M{id1_}" tag = " [fallback]" if pp.get("fallback") else "" if pp["skipped"]: print(f" {m0}↔{m1}{tag}: SKIPPED – {pp['reason']}") else: print(f" {m0}↔{m1}{tag}: " f"{pp['angle_deg']:+7.2f}° " f"bl_model={pp['baseline_model_mm']:.1f} " f"bl_obs={pp['baseline_obs_mm']:.1f}") # ── build accumulated state ─────────────────────────────── accumulated = dict(known_state) accumulated[var] = mean_deg return { "status": "ok", "link": link_name, "joint": var, "method": "pivot_fallback" if used_fallback else "marker_pair", "joint_origin_world_mm": origin_world.tolist(), "joint_axis_world": axis_world.tolist(), "mean_angle_deg": mean_deg, "circular_variance": c_var, "circular_std_deg": c_std_deg, "num_pairs_used": len(angle_rad_list), "num_markers_matched": len(matched), "per_pair": per_pair, "accumulated_state": accumulated, } # ────────────────────────────────────────────────────────────── # CLI # ────────────────────────────────────────────────────────────── def main() -> int: p = argparse.ArgumentParser( description="4b: estimate revolute joint angle for any link") p.add_argument("--robot", type=Path, default=Path("robot.json")) p.add_argument("--aruco", type=Path, default=Path("aruco_positions_optimized.json")) p.add_argument("--link", type=str, required=True, help="Link name, e.g. Arm1, Ellbow, Arm2") p.add_argument("--from-state", type=Path, default=None, help="JSON from a previous 4b run (carries accumulated state)") # Manual joint-value overrides (for use without --from-state) for k in STATE_KEYS: p.add_argument(f"--{k}-mm" if k in ("x", "e") else f"--{k}-deg", type=float, default=None, help=f"Known value for joint '{k}'" + (" (mm)" if k in ("x", "e") else " (deg)")) p.add_argument("--min-baseline", type=float, default=15.0, help="Skip pairs with perpendicular baseline < this (mm)") p.add_argument("--output", type=Path, default=None, help="Save result JSON (readable by next 4b as --from-state)") args = p.parse_args() # Assemble known state if args.from_state: known_state = _load_state(args.from_state) print(f" Loaded state from {args.from_state}: {known_state}") else: known_state = {} # CLI overrides for k in STATE_KEYS: attr = f"{k}_mm" if k in ("x", "e") else f"{k}_deg" v = getattr(args, attr, None) if v is not None: known_state[k] = float(v) fk = RobotFK.from_file(args.robot) aruco_json = _load_json(args.aruco) observed_mm = _load_observed_mm(aruco_json) result = estimate_revolute_angle( fk = fk, observed_mm = observed_mm, link_name = args.link, known_state = known_state, min_baseline_mm = args.min_baseline, verbose = True, ) if args.output and result["status"] == "ok": args.output.parent.mkdir(parents=True, exist_ok=True) with args.output.open("w", encoding="utf-8") as f: json.dump(result, f, indent=2) print(f"\n Saved → {args.output}") return 0 if result["status"] == "ok" else 1 if __name__ == "__main__": raise SystemExit(main())