Python Scripte in HomingApp
This commit is contained in:
360
scripts/4b_revolute_angle.py
Normal file
360
scripts/4b_revolute_angle.py
Normal file
@@ -0,0 +1,360 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
4b_revolute_angle.py
|
||||
--------------------
|
||||
Generic revolute-joint angle estimator.
|
||||
|
||||
For each movable link (Arm1, Ellbow, Arm2 …) whose joint type is 'revolute',
|
||||
this script estimates the rotation angle using the pairwise-vector method:
|
||||
|
||||
For every PAIR (m1, m2) of markers belonging to the target link:
|
||||
v_model = local_pos_m2 - local_pos_m1 (in link's own frame)
|
||||
v_obs = world_pos_m2 - world_pos_m1 (in world frame)
|
||||
|
||||
Both vectors are projected perpendicular to the joint axis (in world frame),
|
||||
and the signed angle from v_model_perp to v_obs_perp is measured.
|
||||
|
||||
The joint axis in world frame is computed via FK using the already-known
|
||||
joint values (from 4a, 4b-prev …), so it is ALWAYS correct — even for
|
||||
deeply-nested joints whose world-frame axis differs from their local axis.
|
||||
|
||||
Pair weights = baseline_model × baseline_obs (longer baselines → more reliable).
|
||||
|
||||
How to use sequentially
|
||||
-----------------------
|
||||
Run 4b once per revolute joint, from root to tip:
|
||||
|
||||
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm1 --x-mm 180
|
||||
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Ellbow --from-state state.json
|
||||
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm2 --from-state state.json
|
||||
|
||||
The --from-state flag reads the accumulated joint state JSON so you don't have
|
||||
to pass every preceding value on the command line.
|
||||
|
||||
Output JSON
|
||||
-----------
|
||||
{
|
||||
"link": "Arm1",
|
||||
"joint": "y",
|
||||
"mean_angle_deg": 86.3,
|
||||
"circular_std_deg": 0.7,
|
||||
"num_pairs": 6,
|
||||
"joint_origin_world_mm": [290, 108, 61],
|
||||
"joint_axis_world": [-1, 0, 0],
|
||||
...
|
||||
}
|
||||
|
||||
The file also contains the full accumulated state so the next 4b invocation
|
||||
can read it via --from-state.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import math
|
||||
from itertools import combinations
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Sequence, Tuple
|
||||
|
||||
import numpy as np
|
||||
from robot_fk import RobotFK
|
||||
|
||||
STATE_KEYS = ("x", "y", "z", "a", "b", "c", "e")
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# I/O
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _load_json(path: Path) -> dict:
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def _load_observed_mm(aruco_json: dict) -> Dict[int, np.ndarray]:
|
||||
result: Dict[int, np.ndarray] = {}
|
||||
for m in aruco_json.get("markers", []):
|
||||
mid = int(m.get("marker_id", m.get("id", -1)))
|
||||
if mid < 0:
|
||||
continue
|
||||
if "position_mm" in m:
|
||||
result[mid] = np.array(m["position_mm"], dtype=float)
|
||||
elif "position_m" in m:
|
||||
result[mid] = np.array(m["position_m"], dtype=float) * 1000.0
|
||||
return result
|
||||
|
||||
|
||||
def _load_state(path: Path) -> Dict[str, float]:
|
||||
"""Load accumulated joint state from a previous 4b output JSON."""
|
||||
raw = _load_json(path)
|
||||
state = raw.get("accumulated_state", raw.get("state", {}))
|
||||
return {k: float(v) for k, v in state.items() if k in STATE_KEYS}
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Angle maths
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _project_perp(v: np.ndarray, axis: np.ndarray) -> np.ndarray:
|
||||
"""Remove the component of v along axis."""
|
||||
a = axis / (np.linalg.norm(axis) + 1e-15)
|
||||
return v - np.dot(v, a) * a
|
||||
|
||||
|
||||
def _signed_angle_rad(v_from: np.ndarray, v_to: np.ndarray,
|
||||
axis: np.ndarray) -> float:
|
||||
"""Signed angle rotating v_from onto v_to around axis (radians)."""
|
||||
a = axis / (np.linalg.norm(axis) + 1e-15)
|
||||
return math.atan2(float(np.dot(a, np.cross(v_from, v_to))),
|
||||
float(np.dot(v_from, v_to)))
|
||||
|
||||
|
||||
def _wrap(angle: float) -> float:
|
||||
"""Wrap to (−π, π]."""
|
||||
return (angle + math.pi) % (2.0 * math.pi) - math.pi
|
||||
|
||||
|
||||
def _circular_mean_deg(angles_rad: np.ndarray,
|
||||
weights: np.ndarray) -> Tuple[float, float, float]:
|
||||
"""Returns mean_deg, circular_variance, circular_std_deg."""
|
||||
w = np.clip(weights, 0, None)
|
||||
if w.sum() < 1e-15:
|
||||
w = np.ones_like(w)
|
||||
s, c = np.sum(w * np.sin(angles_rad)), np.sum(w * np.cos(angles_rad))
|
||||
mean = math.atan2(s, c)
|
||||
R = math.sqrt(s*s + c*c) / w.sum()
|
||||
R = float(np.clip(R, 0, 1))
|
||||
c_var = 1.0 - R
|
||||
c_std = math.sqrt(max(0.0, -2.0 * math.log(max(R, 1e-15))))
|
||||
return math.degrees(mean), c_var, math.degrees(c_std)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Core estimator (generic — works for any revolute joint)
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def estimate_revolute_angle(
|
||||
fk: RobotFK,
|
||||
observed_mm: Dict[int, np.ndarray],
|
||||
link_name: str,
|
||||
known_state: Dict[str, float],
|
||||
min_baseline_mm: float = 15.0,
|
||||
verbose: bool = True,
|
||||
) -> dict:
|
||||
"""
|
||||
Estimate the revolute joint angle for `link_name`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
fk : RobotFK
|
||||
observed_mm : world marker positions from step 3
|
||||
link_name : e.g. "Arm1", "Ellbow", "Arm2"
|
||||
known_state : already-estimated joint values (e.g. {"x": 180.0, "y": 86.0})
|
||||
The target joint variable should NOT be in this dict.
|
||||
min_baseline_mm : skip pairs with model or observed baseline shorter than this
|
||||
verbose : print report
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict with estimation results + updated accumulated_state
|
||||
"""
|
||||
|
||||
# ── sanity checks ─────────────────────────────────────────
|
||||
links = fk.links
|
||||
if link_name not in links:
|
||||
return {"status": "failed",
|
||||
"reason": f"Link '{link_name}' not in robot.json"}
|
||||
|
||||
ji = links[link_name].get("jointToParent", {}) or {}
|
||||
jtype = str(ji.get("type", "")).lower()
|
||||
if jtype != "revolute":
|
||||
return {"status": "failed",
|
||||
"reason": f"Joint of '{link_name}' is not revolute (type='{jtype}')"}
|
||||
|
||||
var = str(ji.get("variable", "")).lower()
|
||||
|
||||
# ── FK: joint origin and axis in world ───────────────────
|
||||
# Use known_state with the TARGET joint at 0
|
||||
zero_state = dict(known_state)
|
||||
zero_state[var] = 0.0
|
||||
|
||||
origin_world = fk.joint_origin_world(link_name, zero_state)
|
||||
axis_world = fk.joint_axis_world(link_name, zero_state)
|
||||
|
||||
# ── collect matched markers ───────────────────────────────
|
||||
model_local: Dict[int, np.ndarray] = {}
|
||||
for m in links[link_name].get("markers", []):
|
||||
mid = int(m.get("id", -1))
|
||||
if mid >= 0 and "position" in m:
|
||||
model_local[mid] = np.array(m["position"], dtype=float)
|
||||
|
||||
matched = {mid: (model_local[mid], observed_mm[mid])
|
||||
for mid in model_local if mid in observed_mm}
|
||||
|
||||
if len(matched) < 2:
|
||||
return {
|
||||
"status": "failed",
|
||||
"reason": (f"Need ≥2 matched markers, found {len(matched)}: "
|
||||
f"{list(matched.keys())}. "
|
||||
f"Model marker IDs: {list(model_local.keys())}"),
|
||||
}
|
||||
|
||||
# ── pairwise estimation ───────────────────────────────────
|
||||
ids = sorted(matched.keys())
|
||||
angle_rad_list: List[float] = []
|
||||
weight_list: List[float] = []
|
||||
per_pair: List[dict] = []
|
||||
|
||||
for id1, id2 in combinations(ids, 2):
|
||||
l1, o1 = matched[id1]
|
||||
l2, o2 = matched[id2]
|
||||
|
||||
v_model = l2 - l1 # local frame, both in same link
|
||||
v_obs = o2 - o1 # world frame
|
||||
|
||||
v_model_perp = _project_perp(v_model, axis_world)
|
||||
v_obs_perp = _project_perp(v_obs, axis_world)
|
||||
|
||||
bl_model = float(np.linalg.norm(v_model_perp))
|
||||
bl_obs = float(np.linalg.norm(v_obs_perp))
|
||||
|
||||
if bl_model < min_baseline_mm or bl_obs < min_baseline_mm:
|
||||
per_pair.append({
|
||||
"marker_ids": [id1, id2],
|
||||
"skipped": True,
|
||||
"reason": f"bl_model={bl_model:.1f} bl_obs={bl_obs:.1f} < {min_baseline_mm}",
|
||||
})
|
||||
continue
|
||||
|
||||
angle = _wrap(_signed_angle_rad(v_model_perp, v_obs_perp, axis_world))
|
||||
w = bl_model * bl_obs
|
||||
|
||||
angle_rad_list.append(angle)
|
||||
weight_list.append(w)
|
||||
per_pair.append({
|
||||
"marker_ids": [id1, id2],
|
||||
"skipped": False,
|
||||
"angle_deg": math.degrees(angle),
|
||||
"baseline_model_mm": bl_model,
|
||||
"baseline_obs_mm": bl_obs,
|
||||
"weight": w,
|
||||
})
|
||||
|
||||
if not angle_rad_list:
|
||||
return {
|
||||
"status": "failed",
|
||||
"reason": "All pairs below min_baseline_mm. "
|
||||
"Try --min-baseline 5 or check step-3 output.",
|
||||
}
|
||||
|
||||
mean_deg, c_var, c_std_deg = _circular_mean_deg(
|
||||
np.array(angle_rad_list), np.array(weight_list)
|
||||
)
|
||||
|
||||
# ── verbose report ────────────────────────────────────────
|
||||
if verbose:
|
||||
print(f"\n── 4b: '{link_name}' angle ({var}) ──────────────────────")
|
||||
print(f" Joint origin (world): [{', '.join(f'{v:.1f}' for v in origin_world)}] mm")
|
||||
print(f" Joint axis (world): [{', '.join(f'{v:.3f}' for v in axis_world)}]")
|
||||
print(f" Matched markers: {list(matched.keys())}")
|
||||
print(f" Pairs used: {len(angle_rad_list)} / {len(list(combinations(ids, 2)))}")
|
||||
print(f" Angle: {mean_deg:+.2f} ° circular_σ {c_std_deg:.2f} °")
|
||||
if c_std_deg > 5.0:
|
||||
print(f" [WARN] high spread – step-3 errors or marker overlap")
|
||||
print(f"\n Pair detail:")
|
||||
for pp in per_pair:
|
||||
if pp["skipped"]:
|
||||
print(f" M{pp['marker_ids'][0]}↔M{pp['marker_ids'][1]}: SKIPPED – {pp['reason']}")
|
||||
else:
|
||||
print(f" M{pp['marker_ids'][0]}↔M{pp['marker_ids'][1]}: "
|
||||
f"{pp['angle_deg']:+7.2f}° "
|
||||
f"bl_model={pp['baseline_model_mm']:.1f} "
|
||||
f"bl_obs={pp['baseline_obs_mm']:.1f}")
|
||||
|
||||
# ── build accumulated state ───────────────────────────────
|
||||
accumulated = dict(known_state)
|
||||
accumulated[var] = mean_deg
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"link": link_name,
|
||||
"joint": var,
|
||||
"joint_origin_world_mm": origin_world.tolist(),
|
||||
"joint_axis_world": axis_world.tolist(),
|
||||
"mean_angle_deg": mean_deg,
|
||||
"circular_variance": c_var,
|
||||
"circular_std_deg": c_std_deg,
|
||||
"num_pairs_used": len(angle_rad_list),
|
||||
"num_markers_matched": len(matched),
|
||||
"per_pair": per_pair,
|
||||
"accumulated_state": accumulated,
|
||||
}
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# CLI
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> int:
|
||||
p = argparse.ArgumentParser(
|
||||
description="4b: estimate revolute joint angle for any link")
|
||||
p.add_argument("--robot", type=Path, default=Path("robot.json"))
|
||||
p.add_argument("--aruco", type=Path,
|
||||
default=Path("aruco_positions_optimized.json"))
|
||||
p.add_argument("--link", type=str, required=True,
|
||||
help="Link name, e.g. Arm1, Ellbow, Arm2")
|
||||
p.add_argument("--from-state", type=Path, default=None,
|
||||
help="JSON from a previous 4b run (carries accumulated state)")
|
||||
|
||||
# Manual joint-value overrides (for use without --from-state)
|
||||
for k in STATE_KEYS:
|
||||
p.add_argument(f"--{k}-mm" if k in ("x", "e") else f"--{k}-deg",
|
||||
type=float, default=None,
|
||||
help=f"Known value for joint '{k}'"
|
||||
+ (" (mm)" if k in ("x", "e") else " (deg)"))
|
||||
|
||||
p.add_argument("--min-baseline", type=float, default=15.0,
|
||||
help="Skip pairs with perpendicular baseline < this (mm)")
|
||||
p.add_argument("--output", type=Path, default=None,
|
||||
help="Save result JSON (readable by next 4b as --from-state)")
|
||||
args = p.parse_args()
|
||||
|
||||
# Assemble known state
|
||||
if args.from_state:
|
||||
known_state = _load_state(args.from_state)
|
||||
print(f" Loaded state from {args.from_state}: {known_state}")
|
||||
else:
|
||||
known_state = {}
|
||||
|
||||
# CLI overrides
|
||||
for k in STATE_KEYS:
|
||||
attr = f"{k}_mm" if k in ("x", "e") else f"{k}_deg"
|
||||
v = getattr(args, attr, None)
|
||||
if v is not None:
|
||||
known_state[k] = float(v)
|
||||
|
||||
fk = RobotFK.from_file(args.robot)
|
||||
aruco_json = _load_json(args.aruco)
|
||||
observed_mm = _load_observed_mm(aruco_json)
|
||||
|
||||
result = estimate_revolute_angle(
|
||||
fk = fk,
|
||||
observed_mm = observed_mm,
|
||||
link_name = args.link,
|
||||
known_state = known_state,
|
||||
min_baseline_mm = args.min_baseline,
|
||||
verbose = True,
|
||||
)
|
||||
|
||||
if args.output and result["status"] == "ok":
|
||||
args.output.parent.mkdir(parents=True, exist_ok=True)
|
||||
with args.output.open("w", encoding="utf-8") as f:
|
||||
json.dump(result, f, indent=2)
|
||||
print(f"\n Saved → {args.output}")
|
||||
|
||||
return 0 if result["status"] == "ok" else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user