Files
appRobotHoming/scripts/4b_revolute_angle.py
2026-06-14 16:03:15 +02:00

361 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
4b_revolute_angle.py
--------------------
Generic revolute-joint angle estimator.
For each movable link (Arm1, Ellbow, Arm2 …) whose joint type is 'revolute',
this script estimates the rotation angle using the pairwise-vector method:
For every PAIR (m1, m2) of markers belonging to the target link:
v_model = local_pos_m2 - local_pos_m1 (in link's own frame)
v_obs = world_pos_m2 - world_pos_m1 (in world frame)
Both vectors are projected perpendicular to the joint axis (in world frame),
and the signed angle from v_model_perp to v_obs_perp is measured.
The joint axis in world frame is computed via FK using the already-known
joint values (from 4a, 4b-prev …), so it is ALWAYS correct — even for
deeply-nested joints whose world-frame axis differs from their local axis.
Pair weights = baseline_model × baseline_obs (longer baselines → more reliable).
How to use sequentially
-----------------------
Run 4b once per revolute joint, from root to tip:
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm1 --x-mm 180
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Ellbow --from-state state.json
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm2 --from-state state.json
The --from-state flag reads the accumulated joint state JSON so you don't have
to pass every preceding value on the command line.
Output JSON
-----------
{
"link": "Arm1",
"joint": "y",
"mean_angle_deg": 86.3,
"circular_std_deg": 0.7,
"num_pairs": 6,
"joint_origin_world_mm": [290, 108, 61],
"joint_axis_world": [-1, 0, 0],
...
}
The file also contains the full accumulated state so the next 4b invocation
can read it via --from-state.
"""
from __future__ import annotations
import argparse
import json
import math
from itertools import combinations
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple
import numpy as np
from robot_fk import RobotFK
STATE_KEYS = ("x", "y", "z", "a", "b", "c", "e")
# ──────────────────────────────────────────────────────────────
# I/O
# ──────────────────────────────────────────────────────────────
def _load_json(path: Path) -> dict:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def _load_observed_mm(aruco_json: dict) -> Dict[int, np.ndarray]:
result: Dict[int, np.ndarray] = {}
for m in aruco_json.get("markers", []):
mid = int(m.get("marker_id", m.get("id", -1)))
if mid < 0:
continue
if "position_mm" in m:
result[mid] = np.array(m["position_mm"], dtype=float)
elif "position_m" in m:
result[mid] = np.array(m["position_m"], dtype=float) * 1000.0
return result
def _load_state(path: Path) -> Dict[str, float]:
"""Load accumulated joint state from a previous 4b output JSON."""
raw = _load_json(path)
state = raw.get("accumulated_state", raw.get("state", {}))
return {k: float(v) for k, v in state.items() if k in STATE_KEYS}
# ──────────────────────────────────────────────────────────────
# Angle maths
# ──────────────────────────────────────────────────────────────
def _project_perp(v: np.ndarray, axis: np.ndarray) -> np.ndarray:
"""Remove the component of v along axis."""
a = axis / (np.linalg.norm(axis) + 1e-15)
return v - np.dot(v, a) * a
def _signed_angle_rad(v_from: np.ndarray, v_to: np.ndarray,
axis: np.ndarray) -> float:
"""Signed angle rotating v_from onto v_to around axis (radians)."""
a = axis / (np.linalg.norm(axis) + 1e-15)
return math.atan2(float(np.dot(a, np.cross(v_from, v_to))),
float(np.dot(v_from, v_to)))
def _wrap(angle: float) -> float:
"""Wrap to (−π, π]."""
return (angle + math.pi) % (2.0 * math.pi) - math.pi
def _circular_mean_deg(angles_rad: np.ndarray,
weights: np.ndarray) -> Tuple[float, float, float]:
"""Returns mean_deg, circular_variance, circular_std_deg."""
w = np.clip(weights, 0, None)
if w.sum() < 1e-15:
w = np.ones_like(w)
s, c = np.sum(w * np.sin(angles_rad)), np.sum(w * np.cos(angles_rad))
mean = math.atan2(s, c)
R = math.sqrt(s*s + c*c) / w.sum()
R = float(np.clip(R, 0, 1))
c_var = 1.0 - R
c_std = math.sqrt(max(0.0, -2.0 * math.log(max(R, 1e-15))))
return math.degrees(mean), c_var, math.degrees(c_std)
# ──────────────────────────────────────────────────────────────
# Core estimator (generic — works for any revolute joint)
# ──────────────────────────────────────────────────────────────
def estimate_revolute_angle(
fk: RobotFK,
observed_mm: Dict[int, np.ndarray],
link_name: str,
known_state: Dict[str, float],
min_baseline_mm: float = 15.0,
verbose: bool = True,
) -> dict:
"""
Estimate the revolute joint angle for `link_name`.
Parameters
----------
fk : RobotFK
observed_mm : world marker positions from step 3
link_name : e.g. "Arm1", "Ellbow", "Arm2"
known_state : already-estimated joint values (e.g. {"x": 180.0, "y": 86.0})
The target joint variable should NOT be in this dict.
min_baseline_mm : skip pairs with model or observed baseline shorter than this
verbose : print report
Returns
-------
dict with estimation results + updated accumulated_state
"""
# ── sanity checks ─────────────────────────────────────────
links = fk.links
if link_name not in links:
return {"status": "failed",
"reason": f"Link '{link_name}' not in robot.json"}
ji = links[link_name].get("jointToParent", {}) or {}
jtype = str(ji.get("type", "")).lower()
if jtype != "revolute":
return {"status": "failed",
"reason": f"Joint of '{link_name}' is not revolute (type='{jtype}')"}
var = str(ji.get("variable", "")).lower()
# ── FK: joint origin and axis in world ───────────────────
# Use known_state with the TARGET joint at 0
zero_state = dict(known_state)
zero_state[var] = 0.0
origin_world = fk.joint_origin_world(link_name, zero_state)
axis_world = fk.joint_axis_world(link_name, zero_state)
# ── collect matched markers ───────────────────────────────
model_local: Dict[int, np.ndarray] = {}
for m in links[link_name].get("markers", []):
mid = int(m.get("id", -1))
if mid >= 0 and "position" in m:
model_local[mid] = np.array(m["position"], dtype=float)
matched = {mid: (model_local[mid], observed_mm[mid])
for mid in model_local if mid in observed_mm}
if len(matched) < 2:
return {
"status": "failed",
"reason": (f"Need ≥2 matched markers, found {len(matched)}: "
f"{list(matched.keys())}. "
f"Model marker IDs: {list(model_local.keys())}"),
}
# ── pairwise estimation ───────────────────────────────────
ids = sorted(matched.keys())
angle_rad_list: List[float] = []
weight_list: List[float] = []
per_pair: List[dict] = []
for id1, id2 in combinations(ids, 2):
l1, o1 = matched[id1]
l2, o2 = matched[id2]
v_model = l2 - l1 # local frame, both in same link
v_obs = o2 - o1 # world frame
v_model_perp = _project_perp(v_model, axis_world)
v_obs_perp = _project_perp(v_obs, axis_world)
bl_model = float(np.linalg.norm(v_model_perp))
bl_obs = float(np.linalg.norm(v_obs_perp))
if bl_model < min_baseline_mm or bl_obs < min_baseline_mm:
per_pair.append({
"marker_ids": [id1, id2],
"skipped": True,
"reason": f"bl_model={bl_model:.1f} bl_obs={bl_obs:.1f} < {min_baseline_mm}",
})
continue
angle = _wrap(_signed_angle_rad(v_model_perp, v_obs_perp, axis_world))
w = bl_model * bl_obs
angle_rad_list.append(angle)
weight_list.append(w)
per_pair.append({
"marker_ids": [id1, id2],
"skipped": False,
"angle_deg": math.degrees(angle),
"baseline_model_mm": bl_model,
"baseline_obs_mm": bl_obs,
"weight": w,
})
if not angle_rad_list:
return {
"status": "failed",
"reason": "All pairs below min_baseline_mm. "
"Try --min-baseline 5 or check step-3 output.",
}
mean_deg, c_var, c_std_deg = _circular_mean_deg(
np.array(angle_rad_list), np.array(weight_list)
)
# ── verbose report ────────────────────────────────────────
if verbose:
print(f"\n── 4b: '{link_name}' angle ({var}) ──────────────────────")
print(f" Joint origin (world): [{', '.join(f'{v:.1f}' for v in origin_world)}] mm")
print(f" Joint axis (world): [{', '.join(f'{v:.3f}' for v in axis_world)}]")
print(f" Matched markers: {list(matched.keys())}")
print(f" Pairs used: {len(angle_rad_list)} / {len(list(combinations(ids, 2)))}")
print(f" Angle: {mean_deg:+.2f} ° circular_σ {c_std_deg:.2f} °")
if c_std_deg > 5.0:
print(f" [WARN] high spread step-3 errors or marker overlap")
print(f"\n Pair detail:")
for pp in per_pair:
if pp["skipped"]:
print(f" M{pp['marker_ids'][0]}↔M{pp['marker_ids'][1]}: SKIPPED {pp['reason']}")
else:
print(f" M{pp['marker_ids'][0]}↔M{pp['marker_ids'][1]}: "
f"{pp['angle_deg']:+7.2f}° "
f"bl_model={pp['baseline_model_mm']:.1f} "
f"bl_obs={pp['baseline_obs_mm']:.1f}")
# ── build accumulated state ───────────────────────────────
accumulated = dict(known_state)
accumulated[var] = mean_deg
return {
"status": "ok",
"link": link_name,
"joint": var,
"joint_origin_world_mm": origin_world.tolist(),
"joint_axis_world": axis_world.tolist(),
"mean_angle_deg": mean_deg,
"circular_variance": c_var,
"circular_std_deg": c_std_deg,
"num_pairs_used": len(angle_rad_list),
"num_markers_matched": len(matched),
"per_pair": per_pair,
"accumulated_state": accumulated,
}
# ──────────────────────────────────────────────────────────────
# CLI
# ──────────────────────────────────────────────────────────────
def main() -> int:
p = argparse.ArgumentParser(
description="4b: estimate revolute joint angle for any link")
p.add_argument("--robot", type=Path, default=Path("robot.json"))
p.add_argument("--aruco", type=Path,
default=Path("aruco_positions_optimized.json"))
p.add_argument("--link", type=str, required=True,
help="Link name, e.g. Arm1, Ellbow, Arm2")
p.add_argument("--from-state", type=Path, default=None,
help="JSON from a previous 4b run (carries accumulated state)")
# Manual joint-value overrides (for use without --from-state)
for k in STATE_KEYS:
p.add_argument(f"--{k}-mm" if k in ("x", "e") else f"--{k}-deg",
type=float, default=None,
help=f"Known value for joint '{k}'"
+ (" (mm)" if k in ("x", "e") else " (deg)"))
p.add_argument("--min-baseline", type=float, default=15.0,
help="Skip pairs with perpendicular baseline < this (mm)")
p.add_argument("--output", type=Path, default=None,
help="Save result JSON (readable by next 4b as --from-state)")
args = p.parse_args()
# Assemble known state
if args.from_state:
known_state = _load_state(args.from_state)
print(f" Loaded state from {args.from_state}: {known_state}")
else:
known_state = {}
# CLI overrides
for k in STATE_KEYS:
attr = f"{k}_mm" if k in ("x", "e") else f"{k}_deg"
v = getattr(args, attr, None)
if v is not None:
known_state[k] = float(v)
fk = RobotFK.from_file(args.robot)
aruco_json = _load_json(args.aruco)
observed_mm = _load_observed_mm(aruco_json)
result = estimate_revolute_angle(
fk = fk,
observed_mm = observed_mm,
link_name = args.link,
known_state = known_state,
min_baseline_mm = args.min_baseline,
verbose = True,
)
if args.output and result["status"] == "ok":
args.output.parent.mkdir(parents=True, exist_ok=True)
with args.output.open("w", encoding="utf-8") as f:
json.dump(result, f, indent=2)
print(f"\n Saved → {args.output}")
return 0 if result["status"] == "ok" else 1
if __name__ == "__main__":
raise SystemExit(main())