361 lines
14 KiB
Python
361 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
4b_revolute_angle.py
|
||
--------------------
|
||
Generic revolute-joint angle estimator.
|
||
|
||
For each movable link (Arm1, Ellbow, Arm2 …) whose joint type is 'revolute',
|
||
this script estimates the rotation angle using the pairwise-vector method:
|
||
|
||
For every PAIR (m1, m2) of markers belonging to the target link:
|
||
v_model = local_pos_m2 - local_pos_m1 (in link's own frame)
|
||
v_obs = world_pos_m2 - world_pos_m1 (in world frame)
|
||
|
||
Both vectors are projected perpendicular to the joint axis (in world frame),
|
||
and the signed angle from v_model_perp to v_obs_perp is measured.
|
||
|
||
The joint axis in world frame is computed via FK using the already-known
|
||
joint values (from 4a, 4b-prev …), so it is ALWAYS correct — even for
|
||
deeply-nested joints whose world-frame axis differs from their local axis.
|
||
|
||
Pair weights = baseline_model × baseline_obs (longer baselines → more reliable).
|
||
|
||
How to use sequentially
|
||
-----------------------
|
||
Run 4b once per revolute joint, from root to tip:
|
||
|
||
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm1 --x-mm 180
|
||
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Ellbow --from-state state.json
|
||
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm2 --from-state state.json
|
||
|
||
The --from-state flag reads the accumulated joint state JSON so you don't have
|
||
to pass every preceding value on the command line.
|
||
|
||
Output JSON
|
||
-----------
|
||
{
|
||
"link": "Arm1",
|
||
"joint": "y",
|
||
"mean_angle_deg": 86.3,
|
||
"circular_std_deg": 0.7,
|
||
"num_pairs": 6,
|
||
"joint_origin_world_mm": [290, 108, 61],
|
||
"joint_axis_world": [-1, 0, 0],
|
||
...
|
||
}
|
||
|
||
The file also contains the full accumulated state so the next 4b invocation
|
||
can read it via --from-state.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import math
|
||
from itertools import combinations
|
||
from pathlib import Path
|
||
from typing import Dict, List, Optional, Sequence, Tuple
|
||
|
||
import numpy as np
|
||
from robot_fk import RobotFK
|
||
|
||
STATE_KEYS = ("x", "y", "z", "a", "b", "c", "e")
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────
|
||
# I/O
|
||
# ──────────────────────────────────────────────────────────────
|
||
|
||
def _load_json(path: Path) -> dict:
|
||
with path.open("r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
|
||
|
||
def _load_observed_mm(aruco_json: dict) -> Dict[int, np.ndarray]:
|
||
result: Dict[int, np.ndarray] = {}
|
||
for m in aruco_json.get("markers", []):
|
||
mid = int(m.get("marker_id", m.get("id", -1)))
|
||
if mid < 0:
|
||
continue
|
||
if "position_mm" in m:
|
||
result[mid] = np.array(m["position_mm"], dtype=float)
|
||
elif "position_m" in m:
|
||
result[mid] = np.array(m["position_m"], dtype=float) * 1000.0
|
||
return result
|
||
|
||
|
||
def _load_state(path: Path) -> Dict[str, float]:
|
||
"""Load accumulated joint state from a previous 4b output JSON."""
|
||
raw = _load_json(path)
|
||
state = raw.get("accumulated_state", raw.get("state", {}))
|
||
return {k: float(v) for k, v in state.items() if k in STATE_KEYS}
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────
|
||
# Angle maths
|
||
# ──────────────────────────────────────────────────────────────
|
||
|
||
def _project_perp(v: np.ndarray, axis: np.ndarray) -> np.ndarray:
|
||
"""Remove the component of v along axis."""
|
||
a = axis / (np.linalg.norm(axis) + 1e-15)
|
||
return v - np.dot(v, a) * a
|
||
|
||
|
||
def _signed_angle_rad(v_from: np.ndarray, v_to: np.ndarray,
|
||
axis: np.ndarray) -> float:
|
||
"""Signed angle rotating v_from onto v_to around axis (radians)."""
|
||
a = axis / (np.linalg.norm(axis) + 1e-15)
|
||
return math.atan2(float(np.dot(a, np.cross(v_from, v_to))),
|
||
float(np.dot(v_from, v_to)))
|
||
|
||
|
||
def _wrap(angle: float) -> float:
|
||
"""Wrap to (−π, π]."""
|
||
return (angle + math.pi) % (2.0 * math.pi) - math.pi
|
||
|
||
|
||
def _circular_mean_deg(angles_rad: np.ndarray,
|
||
weights: np.ndarray) -> Tuple[float, float, float]:
|
||
"""Returns mean_deg, circular_variance, circular_std_deg."""
|
||
w = np.clip(weights, 0, None)
|
||
if w.sum() < 1e-15:
|
||
w = np.ones_like(w)
|
||
s, c = np.sum(w * np.sin(angles_rad)), np.sum(w * np.cos(angles_rad))
|
||
mean = math.atan2(s, c)
|
||
R = math.sqrt(s*s + c*c) / w.sum()
|
||
R = float(np.clip(R, 0, 1))
|
||
c_var = 1.0 - R
|
||
c_std = math.sqrt(max(0.0, -2.0 * math.log(max(R, 1e-15))))
|
||
return math.degrees(mean), c_var, math.degrees(c_std)
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────
|
||
# Core estimator (generic — works for any revolute joint)
|
||
# ──────────────────────────────────────────────────────────────
|
||
|
||
def estimate_revolute_angle(
|
||
fk: RobotFK,
|
||
observed_mm: Dict[int, np.ndarray],
|
||
link_name: str,
|
||
known_state: Dict[str, float],
|
||
min_baseline_mm: float = 15.0,
|
||
verbose: bool = True,
|
||
) -> dict:
|
||
"""
|
||
Estimate the revolute joint angle for `link_name`.
|
||
|
||
Parameters
|
||
----------
|
||
fk : RobotFK
|
||
observed_mm : world marker positions from step 3
|
||
link_name : e.g. "Arm1", "Ellbow", "Arm2"
|
||
known_state : already-estimated joint values (e.g. {"x": 180.0, "y": 86.0})
|
||
The target joint variable should NOT be in this dict.
|
||
min_baseline_mm : skip pairs with model or observed baseline shorter than this
|
||
verbose : print report
|
||
|
||
Returns
|
||
-------
|
||
dict with estimation results + updated accumulated_state
|
||
"""
|
||
|
||
# ── sanity checks ─────────────────────────────────────────
|
||
links = fk.links
|
||
if link_name not in links:
|
||
return {"status": "failed",
|
||
"reason": f"Link '{link_name}' not in robot.json"}
|
||
|
||
ji = links[link_name].get("jointToParent", {}) or {}
|
||
jtype = str(ji.get("type", "")).lower()
|
||
if jtype != "revolute":
|
||
return {"status": "failed",
|
||
"reason": f"Joint of '{link_name}' is not revolute (type='{jtype}')"}
|
||
|
||
var = str(ji.get("variable", "")).lower()
|
||
|
||
# ── FK: joint origin and axis in world ───────────────────
|
||
# Use known_state with the TARGET joint at 0
|
||
zero_state = dict(known_state)
|
||
zero_state[var] = 0.0
|
||
|
||
origin_world = fk.joint_origin_world(link_name, zero_state)
|
||
axis_world = fk.joint_axis_world(link_name, zero_state)
|
||
|
||
# ── collect matched markers ───────────────────────────────
|
||
model_local: Dict[int, np.ndarray] = {}
|
||
for m in links[link_name].get("markers", []):
|
||
mid = int(m.get("id", -1))
|
||
if mid >= 0 and "position" in m:
|
||
model_local[mid] = np.array(m["position"], dtype=float)
|
||
|
||
matched = {mid: (model_local[mid], observed_mm[mid])
|
||
for mid in model_local if mid in observed_mm}
|
||
|
||
if len(matched) < 2:
|
||
return {
|
||
"status": "failed",
|
||
"reason": (f"Need ≥2 matched markers, found {len(matched)}: "
|
||
f"{list(matched.keys())}. "
|
||
f"Model marker IDs: {list(model_local.keys())}"),
|
||
}
|
||
|
||
# ── pairwise estimation ───────────────────────────────────
|
||
ids = sorted(matched.keys())
|
||
angle_rad_list: List[float] = []
|
||
weight_list: List[float] = []
|
||
per_pair: List[dict] = []
|
||
|
||
for id1, id2 in combinations(ids, 2):
|
||
l1, o1 = matched[id1]
|
||
l2, o2 = matched[id2]
|
||
|
||
v_model = l2 - l1 # local frame, both in same link
|
||
v_obs = o2 - o1 # world frame
|
||
|
||
v_model_perp = _project_perp(v_model, axis_world)
|
||
v_obs_perp = _project_perp(v_obs, axis_world)
|
||
|
||
bl_model = float(np.linalg.norm(v_model_perp))
|
||
bl_obs = float(np.linalg.norm(v_obs_perp))
|
||
|
||
if bl_model < min_baseline_mm or bl_obs < min_baseline_mm:
|
||
per_pair.append({
|
||
"marker_ids": [id1, id2],
|
||
"skipped": True,
|
||
"reason": f"bl_model={bl_model:.1f} bl_obs={bl_obs:.1f} < {min_baseline_mm}",
|
||
})
|
||
continue
|
||
|
||
angle = _wrap(_signed_angle_rad(v_model_perp, v_obs_perp, axis_world))
|
||
w = bl_model * bl_obs
|
||
|
||
angle_rad_list.append(angle)
|
||
weight_list.append(w)
|
||
per_pair.append({
|
||
"marker_ids": [id1, id2],
|
||
"skipped": False,
|
||
"angle_deg": math.degrees(angle),
|
||
"baseline_model_mm": bl_model,
|
||
"baseline_obs_mm": bl_obs,
|
||
"weight": w,
|
||
})
|
||
|
||
if not angle_rad_list:
|
||
return {
|
||
"status": "failed",
|
||
"reason": "All pairs below min_baseline_mm. "
|
||
"Try --min-baseline 5 or check step-3 output.",
|
||
}
|
||
|
||
mean_deg, c_var, c_std_deg = _circular_mean_deg(
|
||
np.array(angle_rad_list), np.array(weight_list)
|
||
)
|
||
|
||
# ── verbose report ────────────────────────────────────────
|
||
if verbose:
|
||
print(f"\n── 4b: '{link_name}' angle ({var}) ──────────────────────")
|
||
print(f" Joint origin (world): [{', '.join(f'{v:.1f}' for v in origin_world)}] mm")
|
||
print(f" Joint axis (world): [{', '.join(f'{v:.3f}' for v in axis_world)}]")
|
||
print(f" Matched markers: {list(matched.keys())}")
|
||
print(f" Pairs used: {len(angle_rad_list)} / {len(list(combinations(ids, 2)))}")
|
||
print(f" Angle: {mean_deg:+.2f} ° circular_σ {c_std_deg:.2f} °")
|
||
if c_std_deg > 5.0:
|
||
print(f" [WARN] high spread – step-3 errors or marker overlap")
|
||
print(f"\n Pair detail:")
|
||
for pp in per_pair:
|
||
if pp["skipped"]:
|
||
print(f" M{pp['marker_ids'][0]}↔M{pp['marker_ids'][1]}: SKIPPED – {pp['reason']}")
|
||
else:
|
||
print(f" M{pp['marker_ids'][0]}↔M{pp['marker_ids'][1]}: "
|
||
f"{pp['angle_deg']:+7.2f}° "
|
||
f"bl_model={pp['baseline_model_mm']:.1f} "
|
||
f"bl_obs={pp['baseline_obs_mm']:.1f}")
|
||
|
||
# ── build accumulated state ───────────────────────────────
|
||
accumulated = dict(known_state)
|
||
accumulated[var] = mean_deg
|
||
|
||
return {
|
||
"status": "ok",
|
||
"link": link_name,
|
||
"joint": var,
|
||
"joint_origin_world_mm": origin_world.tolist(),
|
||
"joint_axis_world": axis_world.tolist(),
|
||
"mean_angle_deg": mean_deg,
|
||
"circular_variance": c_var,
|
||
"circular_std_deg": c_std_deg,
|
||
"num_pairs_used": len(angle_rad_list),
|
||
"num_markers_matched": len(matched),
|
||
"per_pair": per_pair,
|
||
"accumulated_state": accumulated,
|
||
}
|
||
|
||
|
||
# ──────────────────────────────────────────────────────────────
|
||
# CLI
|
||
# ──────────────────────────────────────────────────────────────
|
||
|
||
def main() -> int:
|
||
p = argparse.ArgumentParser(
|
||
description="4b: estimate revolute joint angle for any link")
|
||
p.add_argument("--robot", type=Path, default=Path("robot.json"))
|
||
p.add_argument("--aruco", type=Path,
|
||
default=Path("aruco_positions_optimized.json"))
|
||
p.add_argument("--link", type=str, required=True,
|
||
help="Link name, e.g. Arm1, Ellbow, Arm2")
|
||
p.add_argument("--from-state", type=Path, default=None,
|
||
help="JSON from a previous 4b run (carries accumulated state)")
|
||
|
||
# Manual joint-value overrides (for use without --from-state)
|
||
for k in STATE_KEYS:
|
||
p.add_argument(f"--{k}-mm" if k in ("x", "e") else f"--{k}-deg",
|
||
type=float, default=None,
|
||
help=f"Known value for joint '{k}'"
|
||
+ (" (mm)" if k in ("x", "e") else " (deg)"))
|
||
|
||
p.add_argument("--min-baseline", type=float, default=15.0,
|
||
help="Skip pairs with perpendicular baseline < this (mm)")
|
||
p.add_argument("--output", type=Path, default=None,
|
||
help="Save result JSON (readable by next 4b as --from-state)")
|
||
args = p.parse_args()
|
||
|
||
# Assemble known state
|
||
if args.from_state:
|
||
known_state = _load_state(args.from_state)
|
||
print(f" Loaded state from {args.from_state}: {known_state}")
|
||
else:
|
||
known_state = {}
|
||
|
||
# CLI overrides
|
||
for k in STATE_KEYS:
|
||
attr = f"{k}_mm" if k in ("x", "e") else f"{k}_deg"
|
||
v = getattr(args, attr, None)
|
||
if v is not None:
|
||
known_state[k] = float(v)
|
||
|
||
fk = RobotFK.from_file(args.robot)
|
||
aruco_json = _load_json(args.aruco)
|
||
observed_mm = _load_observed_mm(aruco_json)
|
||
|
||
result = estimate_revolute_angle(
|
||
fk = fk,
|
||
observed_mm = observed_mm,
|
||
link_name = args.link,
|
||
known_state = known_state,
|
||
min_baseline_mm = args.min_baseline,
|
||
verbose = True,
|
||
)
|
||
|
||
if args.output and result["status"] == "ok":
|
||
args.output.parent.mkdir(parents=True, exist_ok=True)
|
||
with args.output.open("w", encoding="utf-8") as f:
|
||
json.dump(result, f, indent=2)
|
||
print(f"\n Saved → {args.output}")
|
||
|
||
return 0 if result["status"] == "ok" else 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|