Anthropic Claude
This commit is contained in:
0
pipeline/3_anchor_patch.py
Normal file
0
pipeline/3_anchor_patch.py
Normal file
File diff suppressed because it is too large
Load Diff
229
pipeline/3_multiview_bundle_adjustment_v5_board_anchor_patch.py
Normal file
229
pipeline/3_multiview_bundle_adjustment_v5_board_anchor_patch.py
Normal file
@@ -0,0 +1,229 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
3_board_anchor_patch.py
|
||||
-----------------------
|
||||
DROP-IN PATCH for 3_multiview_bundle_adjustment_v4.py
|
||||
|
||||
What it adds
|
||||
------------
|
||||
Board markers have KNOWN world positions (they are fixed to the physical board,
|
||||
which defines the coordinate frame). Letting the optimizer move them freely
|
||||
makes them drift, which degrades all arm-marker estimates.
|
||||
|
||||
This patch adds a PositionAnchorConstraint that pins each observed Board
|
||||
marker to its robot.json world position during bundle adjustment.
|
||||
|
||||
How to integrate
|
||||
----------------
|
||||
Copy the three sections labelled [PATCH – copy into v4] into the v4 file:
|
||||
|
||||
1. The PositionAnchorConstraint dataclass → after the JointAxisConstraint dataclass
|
||||
2. Update the Constraint type alias → replace the existing line
|
||||
3. The load_board_anchors() function → anywhere before main()
|
||||
4. The change to bundle_adjustment_residuals → add one branch inside the for-loop
|
||||
5. The three lines in main() → just before "STEP 4"
|
||||
|
||||
Quick-test without editing v4
|
||||
------------------------------
|
||||
You can also run this file standalone — it imports from v4 and patches it at
|
||||
runtime. Then call main_anchored() instead of main() for the patched run.
|
||||
|
||||
python 3_board_anchor_patch.py -det ... -pose ... -robot ... -outDir ...
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
# ══════════════════════════════════════════════════════════════
|
||||
# [PATCH section 1 – copy into v4 after JointAxisConstraint]
|
||||
# ══════════════════════════════════════════════════════════════
|
||||
from dataclasses import dataclass
|
||||
import numpy as np
|
||||
from typing import Dict, Any, List, Optional, Tuple
|
||||
|
||||
|
||||
@dataclass
|
||||
class PositionAnchorConstraint:
|
||||
"""
|
||||
Pins a single marker to a known world-space position.
|
||||
Used to anchor Board markers whose world positions are read from robot.json.
|
||||
"""
|
||||
marker_id: int
|
||||
target_world_m: np.ndarray # shape (3,), in metres
|
||||
weight: float = 100.0
|
||||
source: str = "board_anchor"
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════
|
||||
# [PATCH section 2 – replace the Constraint type alias in v4]
|
||||
# ══════════════════════════════════════════════════════════════
|
||||
# OLD: Constraint = MarkerDistanceConstraint | JointAxisConstraint
|
||||
# NEW:
|
||||
# from 3_board_anchor_patch import PositionAnchorConstraint
|
||||
# Constraint = MarkerDistanceConstraint | JointAxisConstraint | PositionAnchorConstraint
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════
|
||||
# [PATCH section 3 – new function, copy anywhere before main()]
|
||||
# ══════════════════════════════════════════════════════════════
|
||||
|
||||
def load_board_anchors(
|
||||
robot_data: Dict[str, Any],
|
||||
length_scale: float,
|
||||
weight: float = 100.0,
|
||||
) -> List[PositionAnchorConstraint]:
|
||||
"""
|
||||
Build PositionAnchorConstraints for every marker on the 'Board' link.
|
||||
|
||||
Board is the root link; its marker positions are in the Board (world) frame.
|
||||
length_scale = 1/1000 when robot.json uses mm (the standard).
|
||||
|
||||
Parameters
|
||||
----------
|
||||
robot_data : loaded robot.json
|
||||
length_scale : float (1/1000 to convert mm → m)
|
||||
weight : constraint weight (100 ≈ 1 mm anchor tolerance at λ=100)
|
||||
"""
|
||||
links = robot_data.get("links", {}) or {}
|
||||
board = links.get("Board", {}) or {}
|
||||
anchors: List[PositionAnchorConstraint] = []
|
||||
|
||||
for m in board.get("markers", []):
|
||||
mid = int(m.get("id", -1))
|
||||
pos = m.get("position", None)
|
||||
if mid < 0 or pos is None or len(pos) != 3:
|
||||
continue
|
||||
world_m = np.array(pos, dtype=np.float64) * float(length_scale)
|
||||
anchors.append(
|
||||
PositionAnchorConstraint(
|
||||
marker_id=mid,
|
||||
target_world_m=world_m,
|
||||
weight=weight,
|
||||
source="board_anchor",
|
||||
)
|
||||
)
|
||||
|
||||
return anchors
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════
|
||||
# [PATCH section 4 – add this branch inside bundle_adjustment_residuals,
|
||||
# in the "for constraint in constraints:" loop, after the JointAxisConstraint branch]
|
||||
# ══════════════════════════════════════════════════════════════
|
||||
|
||||
def _anchor_residuals(constraint: PositionAnchorConstraint,
|
||||
marker_dict: Dict[int, np.ndarray],
|
||||
lambda_constraint: float) -> List[float]:
|
||||
"""
|
||||
Returns 3 residual components w·λ·(X_observed - X_target) for each axis.
|
||||
|
||||
Paste the body of this function into bundle_adjustment_residuals like this:
|
||||
|
||||
elif isinstance(constraint, PositionAnchorConstraint):
|
||||
if constraint.marker_id in marker_dict:
|
||||
diff = marker_dict[constraint.marker_id] - constraint.target_world_m
|
||||
for d in diff:
|
||||
residuals.append(float(d) * constraint.weight * lambda_constraint)
|
||||
"""
|
||||
residuals: List[float] = []
|
||||
if constraint.marker_id in marker_dict:
|
||||
diff = marker_dict[constraint.marker_id] - constraint.target_world_m
|
||||
for d in diff:
|
||||
residuals.append(float(d) * constraint.weight * lambda_constraint)
|
||||
return residuals
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════
|
||||
# [PATCH section 5 – add these lines in main(), just BEFORE
|
||||
# "print('\n[STEP 4] Bundle adjustment with constraints...')"]
|
||||
# ══════════════════════════════════════════════════════════════
|
||||
#
|
||||
# board_anchors = load_board_anchors(robot_data, length_scale, weight=100.0)
|
||||
# constraints += board_anchors
|
||||
# print(f"[INFO] Board anchors added: {len(board_anchors)}")
|
||||
#
|
||||
# ══════════════════════════════════════════════════════════════
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# STANDALONE RUNTIME PATCH (alternative to editing v4)
|
||||
# Run this file directly and it patches v4 in memory.
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _monkey_patch_v4():
|
||||
"""
|
||||
Import the v4 module under an alias and patch it so Board markers are
|
||||
anchored without touching the original file.
|
||||
"""
|
||||
import importlib.util, sys, types, pathlib
|
||||
|
||||
v4_path = pathlib.Path(__file__).parent / "3_multiview_bundle_adjustment_v4.py"
|
||||
if not v4_path.exists():
|
||||
raise FileNotFoundError(
|
||||
f"Expected v4 at {v4_path}.\n"
|
||||
"Place this patch file alongside 3_multiview_bundle_adjustment_v4.py."
|
||||
)
|
||||
|
||||
spec = importlib.util.spec_from_file_location("ba_v4", v4_path)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
|
||||
# Inject PositionAnchorConstraint into the module namespace
|
||||
mod.PositionAnchorConstraint = PositionAnchorConstraint
|
||||
|
||||
# Patch bundle_adjustment_residuals ─────────────────────────
|
||||
_orig_residuals = mod.bundle_adjustment_residuals
|
||||
|
||||
def _patched_residuals(marker_positions_flat, marker_ids, marker_observations,
|
||||
cameras, constraints, obs_weights, lambda_constraint=100.0):
|
||||
r = _orig_residuals(marker_positions_flat, marker_ids, marker_observations,
|
||||
cameras, constraints, obs_weights, lambda_constraint)
|
||||
# Rebuild marker_dict (same as inside the original function)
|
||||
marker_dict = {
|
||||
mid: marker_positions_flat[i*3:(i+1)*3]
|
||||
for i, mid in enumerate(marker_ids)
|
||||
}
|
||||
extra = []
|
||||
for c in constraints:
|
||||
if isinstance(c, PositionAnchorConstraint):
|
||||
extra.extend(_anchor_residuals(c, marker_dict, lambda_constraint))
|
||||
if extra:
|
||||
r = np.concatenate([r, np.array(extra, dtype=np.float64)])
|
||||
return r
|
||||
|
||||
mod.bundle_adjustment_residuals = _patched_residuals
|
||||
|
||||
# Patch main to inject board anchors ────────────────────────
|
||||
_orig_main = mod.main
|
||||
|
||||
def _patched_main():
|
||||
import sys as _sys
|
||||
# We hijack the module-level function calls by patching compile_constraints
|
||||
_orig_compile = mod.compile_constraints
|
||||
|
||||
def _compile_with_anchors(robot_data, length_scale, cfg):
|
||||
marker_to_link, link_markers, constraints, issues, marker_meta = \
|
||||
_orig_compile(robot_data, length_scale, cfg)
|
||||
|
||||
anchors = load_board_anchors(robot_data, length_scale, weight=100.0)
|
||||
print(f"[PATCH] Board anchors added: {len(anchors)}")
|
||||
constraints = constraints + anchors
|
||||
return marker_to_link, link_markers, constraints, issues, marker_meta
|
||||
|
||||
mod.compile_constraints = _compile_with_anchors
|
||||
_orig_main()
|
||||
mod.compile_constraints = _orig_compile # restore
|
||||
|
||||
mod.main = _patched_main
|
||||
return mod
|
||||
|
||||
|
||||
def main():
|
||||
"""Run the patched v4 pipeline with Board marker anchoring."""
|
||||
import sys
|
||||
mod = _monkey_patch_v4()
|
||||
sys.argv[0] = "3_multiview_bundle_adjustment_v4.py" # cosmetic
|
||||
mod.main()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
207
pipeline/4_v8_4a_base_slider.py
Normal file
207
pipeline/4_v8_4a_base_slider.py
Normal file
@@ -0,0 +1,207 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
4a_base_slider.py
|
||||
-----------------
|
||||
Estimate the Base linear-slider position (variable 'x', in mm) from
|
||||
observed world-space marker positions produced by step 3.
|
||||
|
||||
Mathematical principle
|
||||
----------------------
|
||||
The Base slides along the world x-axis.
|
||||
Every joint between Base and the observable arm markers rotates around
|
||||
the x-axis (axis ≈ [-1,0,0] for Arm1, Ellbow, Arm2).
|
||||
|
||||
Rotation around the x-axis NEVER changes the x-coordinate of any point.
|
||||
Therefore, for every marker on Arm1, Ellbow, or Arm2:
|
||||
|
||||
world_x = x_slider + link_frame_x_at_zero + local_x
|
||||
|
||||
where link_frame_x_at_zero is the x of the link-frame origin in world
|
||||
when all joint variables are zero (constant, readable from robot.json).
|
||||
|
||||
Rearranging:
|
||||
x_slider = world_x - link_frame_x_at_zero - local_x
|
||||
|
||||
This holds for ANY rotation angle y, z, a of the arm links.
|
||||
|
||||
This gives one independent estimate per observed marker.
|
||||
We report their mean (and statistics) as the final estimate.
|
||||
|
||||
Usage
|
||||
-----
|
||||
python 4a_base_slider.py --robot robot.json --aruco aruco_positions_optimized.json
|
||||
|
||||
python 4a_base_slider.py --robot robot.json --aruco aruco_positions_optimized.json \\
|
||||
--links Arm1 Ellbow Arm2 --output 4a_result.json
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import math
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
import numpy as np
|
||||
from robot_fk import RobotFK
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# I/O helpers
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _load_json(path: Path) -> dict:
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def _load_observed_mm(aruco_json: dict) -> Dict[int, np.ndarray]:
|
||||
"""marker_id → world position in mm, from aruco_positions_optimized.json."""
|
||||
result: Dict[int, np.ndarray] = {}
|
||||
for m in aruco_json.get("markers", []):
|
||||
mid = int(m.get("marker_id", m.get("id", -1)))
|
||||
if mid < 0:
|
||||
continue
|
||||
if "position_mm" in m:
|
||||
result[mid] = np.array(m["position_mm"], dtype=float)
|
||||
elif "position_m" in m:
|
||||
result[mid] = np.array(m["position_m"], dtype=float) * 1000.0
|
||||
return result
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Core estimator
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def estimate_base_slider(
|
||||
fk: RobotFK,
|
||||
observed_mm: Dict[int, np.ndarray],
|
||||
use_links: Optional[List[str]] = None,
|
||||
verbose: bool = True,
|
||||
) -> dict:
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
fk : RobotFK instance
|
||||
observed_mm : dict marker_id -> world position [mm]
|
||||
use_links : links whose markers are used (default: Arm1, Ellbow, Arm2)
|
||||
All must be connected to Base via only x-axis rotations.
|
||||
verbose : print breakdown
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict with keys:
|
||||
status, x_mean_mm, x_median_mm, x_std_mm,
|
||||
num_markers, per_marker
|
||||
"""
|
||||
if use_links is None:
|
||||
use_links = ["Arm1", "Ellbow", "Arm2"]
|
||||
|
||||
# Pre-compute link_frame_x at zero-state for each target link
|
||||
link_x0: Dict[str, float] = {}
|
||||
for lname in use_links:
|
||||
if lname in fk.links:
|
||||
link_x0[lname] = fk.link_x_at_zero_state(lname)
|
||||
|
||||
estimates: List[float] = []
|
||||
per_marker: List[dict] = []
|
||||
|
||||
for lname in use_links:
|
||||
if lname not in fk.links or lname not in link_x0:
|
||||
continue
|
||||
x0 = link_x0[lname]
|
||||
|
||||
for m in fk.links[lname].get("markers", []):
|
||||
mid = int(m.get("id", -1))
|
||||
if mid < 0 or "position" not in m:
|
||||
continue
|
||||
if mid not in observed_mm:
|
||||
continue # marker not seen in this pose
|
||||
|
||||
local_x = float(m["position"][0])
|
||||
world_x = float(observed_mm[mid][0])
|
||||
x_est = world_x - x0 - local_x
|
||||
|
||||
estimates.append(x_est)
|
||||
per_marker.append({
|
||||
"marker_id": mid,
|
||||
"link": lname,
|
||||
"local_x_mm": local_x,
|
||||
"link_x0_mm": x0,
|
||||
"world_x_mm": world_x,
|
||||
"x_estimate_mm": x_est,
|
||||
})
|
||||
|
||||
if not estimates:
|
||||
msg = (f"No observed markers found on links {use_links}. "
|
||||
"Check that aruco_positions_optimized.json contains "
|
||||
"markers from those links.")
|
||||
if verbose:
|
||||
print(f"[4a FAILED] {msg}")
|
||||
return {"status": "failed", "reason": msg, "use_links": use_links}
|
||||
|
||||
arr = np.array(estimates)
|
||||
x_mean = float(np.mean(arr))
|
||||
x_median = float(np.median(arr))
|
||||
x_std = float(np.std(arr, ddof=0))
|
||||
|
||||
if verbose:
|
||||
print("\n── 4a: Base slider (x) ──────────────────────────────────")
|
||||
print(f" Links used: {use_links}")
|
||||
print(f" Markers used: {len(estimates)}")
|
||||
print(f" x = {x_mean:+.2f} mm (median {x_median:+.2f}, σ {x_std:.2f})")
|
||||
if x_std > 5.0:
|
||||
print(f" [WARN] high spread ({x_std:.1f} mm) – step-3 errors or wrong link list")
|
||||
print(f"\n Per-marker:")
|
||||
print(f" {'ID':>5} {'Link':8} {'local_x':>8} {'world_x':>8} {'x_est':>8}")
|
||||
print(f" " + "-"*46)
|
||||
for pm in per_marker:
|
||||
print(f" {pm['marker_id']:>5} {pm['link']:8} "
|
||||
f"{pm['local_x_mm']:>8.2f} {pm['world_x_mm']:>8.2f} {pm['x_estimate_mm']:>8.2f}")
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"joint": "x",
|
||||
"description": "Base linear slider",
|
||||
"x_mean_mm": x_mean,
|
||||
"x_median_mm": x_median,
|
||||
"x_std_mm": x_std,
|
||||
"num_markers": len(estimates),
|
||||
"use_links": use_links,
|
||||
"per_marker": per_marker,
|
||||
}
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# CLI
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> int:
|
||||
p = argparse.ArgumentParser(description="4a: estimate Base slider (x)")
|
||||
p.add_argument("--robot", type=Path, default=Path("robot.json"))
|
||||
p.add_argument("--aruco", type=Path, default=Path("aruco_positions_optimized.json"))
|
||||
p.add_argument("--links", nargs="+", default=["Arm1", "Ellbow", "Arm2"],
|
||||
dest="links",
|
||||
help="Links whose markers to use (must be x-axis-rotation chain)")
|
||||
p.add_argument("--output", type=Path, default=None,
|
||||
help="Optional JSON output path (e.g. 4a_result.json)")
|
||||
args = p.parse_args()
|
||||
|
||||
fk = RobotFK.from_file(args.robot)
|
||||
aruco_json = _load_json(args.aruco)
|
||||
observed_mm = _load_observed_mm(aruco_json)
|
||||
|
||||
result = estimate_base_slider(fk, observed_mm, use_links=args.links)
|
||||
|
||||
if args.output and result["status"] == "ok":
|
||||
args.output.parent.mkdir(parents=True, exist_ok=True)
|
||||
with args.output.open("w", encoding="utf-8") as f:
|
||||
json.dump(result, f, indent=2)
|
||||
print(f"\n Saved → {args.output}")
|
||||
|
||||
return 0 if result["status"] == "ok" else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
360
pipeline/4_v8_4b_revolute_angle.py
Normal file
360
pipeline/4_v8_4b_revolute_angle.py
Normal file
@@ -0,0 +1,360 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
4b_revolute_angle.py
|
||||
--------------------
|
||||
Generic revolute-joint angle estimator.
|
||||
|
||||
For each movable link (Arm1, Ellbow, Arm2 …) whose joint type is 'revolute',
|
||||
this script estimates the rotation angle using the pairwise-vector method:
|
||||
|
||||
For every PAIR (m1, m2) of markers belonging to the target link:
|
||||
v_model = local_pos_m2 - local_pos_m1 (in link's own frame)
|
||||
v_obs = world_pos_m2 - world_pos_m1 (in world frame)
|
||||
|
||||
Both vectors are projected perpendicular to the joint axis (in world frame),
|
||||
and the signed angle from v_model_perp to v_obs_perp is measured.
|
||||
|
||||
The joint axis in world frame is computed via FK using the already-known
|
||||
joint values (from 4a, 4b-prev …), so it is ALWAYS correct — even for
|
||||
deeply-nested joints whose world-frame axis differs from their local axis.
|
||||
|
||||
Pair weights = baseline_model × baseline_obs (longer baselines → more reliable).
|
||||
|
||||
How to use sequentially
|
||||
-----------------------
|
||||
Run 4b once per revolute joint, from root to tip:
|
||||
|
||||
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm1 --x-mm 180
|
||||
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Ellbow --from-state state.json
|
||||
python 4b_revolute_angle.py --robot r.json --aruco obs.json --link Arm2 --from-state state.json
|
||||
|
||||
The --from-state flag reads the accumulated joint state JSON so you don't have
|
||||
to pass every preceding value on the command line.
|
||||
|
||||
Output JSON
|
||||
-----------
|
||||
{
|
||||
"link": "Arm1",
|
||||
"joint": "y",
|
||||
"mean_angle_deg": 86.3,
|
||||
"circular_std_deg": 0.7,
|
||||
"num_pairs": 6,
|
||||
"joint_origin_world_mm": [290, 108, 61],
|
||||
"joint_axis_world": [-1, 0, 0],
|
||||
...
|
||||
}
|
||||
|
||||
The file also contains the full accumulated state so the next 4b invocation
|
||||
can read it via --from-state.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import math
|
||||
from itertools import combinations
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Sequence, Tuple
|
||||
|
||||
import numpy as np
|
||||
from robot_fk import RobotFK
|
||||
|
||||
STATE_KEYS = ("x", "y", "z", "a", "b", "c", "e")
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# I/O
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _load_json(path: Path) -> dict:
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def _load_observed_mm(aruco_json: dict) -> Dict[int, np.ndarray]:
|
||||
result: Dict[int, np.ndarray] = {}
|
||||
for m in aruco_json.get("markers", []):
|
||||
mid = int(m.get("marker_id", m.get("id", -1)))
|
||||
if mid < 0:
|
||||
continue
|
||||
if "position_mm" in m:
|
||||
result[mid] = np.array(m["position_mm"], dtype=float)
|
||||
elif "position_m" in m:
|
||||
result[mid] = np.array(m["position_m"], dtype=float) * 1000.0
|
||||
return result
|
||||
|
||||
|
||||
def _load_state(path: Path) -> Dict[str, float]:
|
||||
"""Load accumulated joint state from a previous 4b output JSON."""
|
||||
raw = _load_json(path)
|
||||
state = raw.get("accumulated_state", raw.get("state", {}))
|
||||
return {k: float(v) for k, v in state.items() if k in STATE_KEYS}
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Angle maths
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _project_perp(v: np.ndarray, axis: np.ndarray) -> np.ndarray:
|
||||
"""Remove the component of v along axis."""
|
||||
a = axis / (np.linalg.norm(axis) + 1e-15)
|
||||
return v - np.dot(v, a) * a
|
||||
|
||||
|
||||
def _signed_angle_rad(v_from: np.ndarray, v_to: np.ndarray,
|
||||
axis: np.ndarray) -> float:
|
||||
"""Signed angle rotating v_from onto v_to around axis (radians)."""
|
||||
a = axis / (np.linalg.norm(axis) + 1e-15)
|
||||
return math.atan2(float(np.dot(a, np.cross(v_from, v_to))),
|
||||
float(np.dot(v_from, v_to)))
|
||||
|
||||
|
||||
def _wrap(angle: float) -> float:
|
||||
"""Wrap to (−π, π]."""
|
||||
return (angle + math.pi) % (2.0 * math.pi) - math.pi
|
||||
|
||||
|
||||
def _circular_mean_deg(angles_rad: np.ndarray,
|
||||
weights: np.ndarray) -> Tuple[float, float, float]:
|
||||
"""Returns mean_deg, circular_variance, circular_std_deg."""
|
||||
w = np.clip(weights, 0, None)
|
||||
if w.sum() < 1e-15:
|
||||
w = np.ones_like(w)
|
||||
s, c = np.sum(w * np.sin(angles_rad)), np.sum(w * np.cos(angles_rad))
|
||||
mean = math.atan2(s, c)
|
||||
R = math.sqrt(s*s + c*c) / w.sum()
|
||||
R = float(np.clip(R, 0, 1))
|
||||
c_var = 1.0 - R
|
||||
c_std = math.sqrt(max(0.0, -2.0 * math.log(max(R, 1e-15))))
|
||||
return math.degrees(mean), c_var, math.degrees(c_std)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Core estimator (generic — works for any revolute joint)
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def estimate_revolute_angle(
|
||||
fk: RobotFK,
|
||||
observed_mm: Dict[int, np.ndarray],
|
||||
link_name: str,
|
||||
known_state: Dict[str, float],
|
||||
min_baseline_mm: float = 15.0,
|
||||
verbose: bool = True,
|
||||
) -> dict:
|
||||
"""
|
||||
Estimate the revolute joint angle for `link_name`.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
fk : RobotFK
|
||||
observed_mm : world marker positions from step 3
|
||||
link_name : e.g. "Arm1", "Ellbow", "Arm2"
|
||||
known_state : already-estimated joint values (e.g. {"x": 180.0, "y": 86.0})
|
||||
The target joint variable should NOT be in this dict.
|
||||
min_baseline_mm : skip pairs with model or observed baseline shorter than this
|
||||
verbose : print report
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict with estimation results + updated accumulated_state
|
||||
"""
|
||||
|
||||
# ── sanity checks ─────────────────────────────────────────
|
||||
links = fk.links
|
||||
if link_name not in links:
|
||||
return {"status": "failed",
|
||||
"reason": f"Link '{link_name}' not in robot.json"}
|
||||
|
||||
ji = links[link_name].get("jointToParent", {}) or {}
|
||||
jtype = str(ji.get("type", "")).lower()
|
||||
if jtype != "revolute":
|
||||
return {"status": "failed",
|
||||
"reason": f"Joint of '{link_name}' is not revolute (type='{jtype}')"}
|
||||
|
||||
var = str(ji.get("variable", "")).lower()
|
||||
|
||||
# ── FK: joint origin and axis in world ───────────────────
|
||||
# Use known_state with the TARGET joint at 0
|
||||
zero_state = dict(known_state)
|
||||
zero_state[var] = 0.0
|
||||
|
||||
origin_world = fk.joint_origin_world(link_name, zero_state)
|
||||
axis_world = fk.joint_axis_world(link_name, zero_state)
|
||||
|
||||
# ── collect matched markers ───────────────────────────────
|
||||
model_local: Dict[int, np.ndarray] = {}
|
||||
for m in links[link_name].get("markers", []):
|
||||
mid = int(m.get("id", -1))
|
||||
if mid >= 0 and "position" in m:
|
||||
model_local[mid] = np.array(m["position"], dtype=float)
|
||||
|
||||
matched = {mid: (model_local[mid], observed_mm[mid])
|
||||
for mid in model_local if mid in observed_mm}
|
||||
|
||||
if len(matched) < 2:
|
||||
return {
|
||||
"status": "failed",
|
||||
"reason": (f"Need ≥2 matched markers, found {len(matched)}: "
|
||||
f"{list(matched.keys())}. "
|
||||
f"Model marker IDs: {list(model_local.keys())}"),
|
||||
}
|
||||
|
||||
# ── pairwise estimation ───────────────────────────────────
|
||||
ids = sorted(matched.keys())
|
||||
angle_rad_list: List[float] = []
|
||||
weight_list: List[float] = []
|
||||
per_pair: List[dict] = []
|
||||
|
||||
for id1, id2 in combinations(ids, 2):
|
||||
l1, o1 = matched[id1]
|
||||
l2, o2 = matched[id2]
|
||||
|
||||
v_model = l2 - l1 # local frame, both in same link
|
||||
v_obs = o2 - o1 # world frame
|
||||
|
||||
v_model_perp = _project_perp(v_model, axis_world)
|
||||
v_obs_perp = _project_perp(v_obs, axis_world)
|
||||
|
||||
bl_model = float(np.linalg.norm(v_model_perp))
|
||||
bl_obs = float(np.linalg.norm(v_obs_perp))
|
||||
|
||||
if bl_model < min_baseline_mm or bl_obs < min_baseline_mm:
|
||||
per_pair.append({
|
||||
"marker_ids": [id1, id2],
|
||||
"skipped": True,
|
||||
"reason": f"bl_model={bl_model:.1f} bl_obs={bl_obs:.1f} < {min_baseline_mm}",
|
||||
})
|
||||
continue
|
||||
|
||||
angle = _wrap(_signed_angle_rad(v_model_perp, v_obs_perp, axis_world))
|
||||
w = bl_model * bl_obs
|
||||
|
||||
angle_rad_list.append(angle)
|
||||
weight_list.append(w)
|
||||
per_pair.append({
|
||||
"marker_ids": [id1, id2],
|
||||
"skipped": False,
|
||||
"angle_deg": math.degrees(angle),
|
||||
"baseline_model_mm": bl_model,
|
||||
"baseline_obs_mm": bl_obs,
|
||||
"weight": w,
|
||||
})
|
||||
|
||||
if not angle_rad_list:
|
||||
return {
|
||||
"status": "failed",
|
||||
"reason": "All pairs below min_baseline_mm. "
|
||||
"Try --min-baseline 5 or check step-3 output.",
|
||||
}
|
||||
|
||||
mean_deg, c_var, c_std_deg = _circular_mean_deg(
|
||||
np.array(angle_rad_list), np.array(weight_list)
|
||||
)
|
||||
|
||||
# ── verbose report ────────────────────────────────────────
|
||||
if verbose:
|
||||
print(f"\n── 4b: '{link_name}' angle ({var}) ──────────────────────")
|
||||
print(f" Joint origin (world): [{', '.join(f'{v:.1f}' for v in origin_world)}] mm")
|
||||
print(f" Joint axis (world): [{', '.join(f'{v:.3f}' for v in axis_world)}]")
|
||||
print(f" Matched markers: {list(matched.keys())}")
|
||||
print(f" Pairs used: {len(angle_rad_list)} / {len(list(combinations(ids, 2)))}")
|
||||
print(f" Angle: {mean_deg:+.2f} ° circular_σ {c_std_deg:.2f} °")
|
||||
if c_std_deg > 5.0:
|
||||
print(f" [WARN] high spread – step-3 errors or marker overlap")
|
||||
print(f"\n Pair detail:")
|
||||
for pp in per_pair:
|
||||
if pp["skipped"]:
|
||||
print(f" M{pp['marker_ids'][0]}↔M{pp['marker_ids'][1]}: SKIPPED – {pp['reason']}")
|
||||
else:
|
||||
print(f" M{pp['marker_ids'][0]}↔M{pp['marker_ids'][1]}: "
|
||||
f"{pp['angle_deg']:+7.2f}° "
|
||||
f"bl_model={pp['baseline_model_mm']:.1f} "
|
||||
f"bl_obs={pp['baseline_obs_mm']:.1f}")
|
||||
|
||||
# ── build accumulated state ───────────────────────────────
|
||||
accumulated = dict(known_state)
|
||||
accumulated[var] = mean_deg
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"link": link_name,
|
||||
"joint": var,
|
||||
"joint_origin_world_mm": origin_world.tolist(),
|
||||
"joint_axis_world": axis_world.tolist(),
|
||||
"mean_angle_deg": mean_deg,
|
||||
"circular_variance": c_var,
|
||||
"circular_std_deg": c_std_deg,
|
||||
"num_pairs_used": len(angle_rad_list),
|
||||
"num_markers_matched": len(matched),
|
||||
"per_pair": per_pair,
|
||||
"accumulated_state": accumulated,
|
||||
}
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# CLI
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> int:
|
||||
p = argparse.ArgumentParser(
|
||||
description="4b: estimate revolute joint angle for any link")
|
||||
p.add_argument("--robot", type=Path, default=Path("robot.json"))
|
||||
p.add_argument("--aruco", type=Path,
|
||||
default=Path("aruco_positions_optimized.json"))
|
||||
p.add_argument("--link", type=str, required=True,
|
||||
help="Link name, e.g. Arm1, Ellbow, Arm2")
|
||||
p.add_argument("--from-state", type=Path, default=None,
|
||||
help="JSON from a previous 4b run (carries accumulated state)")
|
||||
|
||||
# Manual joint-value overrides (for use without --from-state)
|
||||
for k in STATE_KEYS:
|
||||
p.add_argument(f"--{k}-mm" if k in ("x", "e") else f"--{k}-deg",
|
||||
type=float, default=None,
|
||||
help=f"Known value for joint '{k}'"
|
||||
+ (" (mm)" if k in ("x", "e") else " (deg)"))
|
||||
|
||||
p.add_argument("--min-baseline", type=float, default=15.0,
|
||||
help="Skip pairs with perpendicular baseline < this (mm)")
|
||||
p.add_argument("--output", type=Path, default=None,
|
||||
help="Save result JSON (readable by next 4b as --from-state)")
|
||||
args = p.parse_args()
|
||||
|
||||
# Assemble known state
|
||||
if args.from_state:
|
||||
known_state = _load_state(args.from_state)
|
||||
print(f" Loaded state from {args.from_state}: {known_state}")
|
||||
else:
|
||||
known_state = {}
|
||||
|
||||
# CLI overrides
|
||||
for k in STATE_KEYS:
|
||||
attr = f"{k}_mm" if k in ("x", "e") else f"{k}_deg"
|
||||
v = getattr(args, attr, None)
|
||||
if v is not None:
|
||||
known_state[k] = float(v)
|
||||
|
||||
fk = RobotFK.from_file(args.robot)
|
||||
aruco_json = _load_json(args.aruco)
|
||||
observed_mm = _load_observed_mm(aruco_json)
|
||||
|
||||
result = estimate_revolute_angle(
|
||||
fk = fk,
|
||||
observed_mm = observed_mm,
|
||||
link_name = args.link,
|
||||
known_state = known_state,
|
||||
min_baseline_mm = args.min_baseline,
|
||||
verbose = True,
|
||||
)
|
||||
|
||||
if args.output and result["status"] == "ok":
|
||||
args.output.parent.mkdir(parents=True, exist_ok=True)
|
||||
with args.output.open("w", encoding="utf-8") as f:
|
||||
json.dump(result, f, indent=2)
|
||||
print(f"\n Saved → {args.output}")
|
||||
|
||||
return 0 if result["status"] == "ok" else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
464
pipeline/4_v8_4d_direct_hand_pose.py
Normal file
464
pipeline/4_v8_4d_direct_hand_pose.py
Normal file
@@ -0,0 +1,464 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
4d_direct_hand_pose.py
|
||||
----------------------
|
||||
Estimate Hand/Palm/Finger joint angles (b, c, e) directly from ArUco
|
||||
marker observations — WITHOUT relying on step-3 triangulated positions.
|
||||
|
||||
Why this is needed
|
||||
------------------
|
||||
Step 3 triangulates marker centres from multiple cameras. For markers
|
||||
far above the board (arm extended in Z), the depth estimation from
|
||||
triangulation degrades: all reference Board markers lie in a flat plane,
|
||||
so the cameras are poorly conditioned for Z estimation of out-of-plane points.
|
||||
|
||||
This script takes a completely different route:
|
||||
|
||||
1. For each camera that sees a finger or hand marker, run cv2.solvePnP
|
||||
on the four detected corners. solvePnP uses the apparent size /
|
||||
aspect ratio of the marker to recover depth — it is NOT affected
|
||||
by the flat-board reference plane problem.
|
||||
|
||||
2. Use the known camera-in-world pose (from step 2) to convert the
|
||||
per-camera marker pose to world frame.
|
||||
|
||||
3. Fuse all world-frame marker poses with quality weighting.
|
||||
|
||||
4. Given world-frame positions for all visible finger markers, and the
|
||||
known local positions in the FingerA/FingerB frames, solve a small
|
||||
nonlinear LS problem for (b, c, e) using scipy.
|
||||
|
||||
What you need
|
||||
-------------
|
||||
- robot.json
|
||||
- Step-2 camera-pose JSONs for all cameras
|
||||
- Step-1 aruco-detection JSONs for all cameras
|
||||
- Already-estimated x, y, z, a from 4a/4b (pass via --state-json or CLI)
|
||||
|
||||
Usage
|
||||
-----
|
||||
python 4d_direct_hand_pose.py \\
|
||||
--robot robot.json \\
|
||||
--det cam1_aruco_detection.json cam2_aruco_detection.json ... \\
|
||||
--pose cam1_camera_pose.json cam2_camera_pose.json ... \\
|
||||
--state 4b_arm2_result.json \\
|
||||
--output 4d_result.json
|
||||
|
||||
Output
|
||||
------
|
||||
{
|
||||
"b_deg": ..., "c_deg": ..., "e_mm": ...,
|
||||
"rms_mm": ...,
|
||||
"accumulated_state": {"x":..., "y":..., "z":..., "a":..., "b":..., "c":..., "e":...}
|
||||
}
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import math
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Sequence, Tuple
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
try:
|
||||
from scipy.optimize import least_squares
|
||||
_HAS_SCIPY = True
|
||||
except ImportError:
|
||||
_HAS_SCIPY = False
|
||||
|
||||
from robot_fk import RobotFK, STATE_KEYS
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# I/O
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _load(path: Path) -> dict:
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def _load_state(path: Path) -> Dict[str, float]:
|
||||
raw = _load(path)
|
||||
state = raw.get("accumulated_state", raw.get("state", {}))
|
||||
return {k: float(v) for k, v in state.items() if k in STATE_KEYS}
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# ArUco / PnP helpers
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _marker_local_corners(size_m: float) -> np.ndarray:
|
||||
"""Four local corners of a marker (same order as OpenCV ArUco)."""
|
||||
h = size_m / 2.0
|
||||
return np.array([[-h, h, 0], [h, h, 0], [h, -h, 0], [-h, -h, 0]], dtype=np.float32)
|
||||
|
||||
|
||||
def _camera_pose_from_json(pose_json: dict) -> Tuple[np.ndarray, np.ndarray]:
|
||||
"""
|
||||
Load world-to-camera rotation and translation from step-2 JSON.
|
||||
Returns rvec (3,), tvec (3,) — the same convention used in step 2.
|
||||
"""
|
||||
wtc = pose_json["camera_pose"]["world_to_camera"]
|
||||
R = np.array(wtc["rotation_matrix"], dtype=np.float32).reshape(3, 3)
|
||||
t = np.array(wtc["translation_m"], dtype=np.float32).reshape(3)
|
||||
rvec, _ = cv2.Rodrigues(R)
|
||||
return rvec.reshape(3), t
|
||||
|
||||
|
||||
def _intrinsics_from_detection(det: dict) -> Tuple[np.ndarray, np.ndarray]:
|
||||
cam = det["camera"]
|
||||
K = np.array(cam["camera_matrix"], dtype=np.float32).reshape(3, 3)
|
||||
D = np.array(cam["distortion_coefficients"], dtype=np.float32).reshape(-1, 1)
|
||||
return K, D
|
||||
|
||||
|
||||
def _solve_marker_pose_in_camera(
|
||||
corners_px: np.ndarray,
|
||||
marker_size_m: float,
|
||||
K: np.ndarray,
|
||||
D: np.ndarray
|
||||
) -> Optional[Tuple[np.ndarray, np.ndarray]]:
|
||||
"""
|
||||
Estimate marker-in-camera pose via solvePnP (IPPE_SQUARE).
|
||||
Returns (rvec_cm, tvec_cm) or None on failure.
|
||||
"""
|
||||
obj = _marker_local_corners(marker_size_m)
|
||||
img = corners_px.astype(np.float32)
|
||||
ok, rvec, tvec = cv2.solvePnP(obj, img, K, D, flags=cv2.SOLVEPNP_IPPE_SQUARE)
|
||||
if not ok:
|
||||
ok, rvec, tvec = cv2.solvePnP(obj, img, K, D, flags=cv2.SOLVEPNP_ITERATIVE)
|
||||
if not ok:
|
||||
return None
|
||||
return rvec.reshape(3), tvec.reshape(3)
|
||||
|
||||
|
||||
def _marker_world_pose(
|
||||
rvec_cm: np.ndarray,
|
||||
tvec_cm: np.ndarray,
|
||||
rvec_wc: np.ndarray,
|
||||
tvec_wc: np.ndarray
|
||||
) -> Tuple[np.ndarray, np.ndarray]:
|
||||
"""
|
||||
Convert marker-in-camera pose to marker-in-world pose.
|
||||
|
||||
Convention (same as step 2):
|
||||
X_cam = R_wc @ X_world + t_wc
|
||||
So:
|
||||
R_wc = rvec→R(rvec_wc), camera origin in world = -R_wc^T @ t_wc
|
||||
|
||||
Marker pose in world:
|
||||
R_mw = R_wc^T @ R_cm (world_to_marker rotation)
|
||||
t_mw = R_wc^T @ (tvec_cm - tvec_wc)
|
||||
→ marker centre in world = -R_mw^T @ t_mw = R_mw^T @ (-t_mw)
|
||||
|
||||
We return (pos_world_m, R_marker_in_world) where
|
||||
R_marker_in_world maps local marker axes to world axes.
|
||||
"""
|
||||
R_wc, _ = cv2.Rodrigues(rvec_wc.reshape(3, 1))
|
||||
R_cm, _ = cv2.Rodrigues(rvec_cm.reshape(3, 1))
|
||||
|
||||
R_cw = R_wc.T # camera-to-world rotation
|
||||
|
||||
# Marker centre in world frame
|
||||
pos_w = R_cw @ tvec_cm.reshape(3) - R_cw @ tvec_wc.reshape(3)
|
||||
|
||||
# Marker orientation in world frame (columns = marker x,y,z axes in world)
|
||||
R_mw = R_cw @ R_cm
|
||||
|
||||
return pos_w.astype(np.float64), R_mw.astype(np.float64)
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Multi-camera marker pose fusion
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _observation_weight(corners_px: np.ndarray, img_w: int, img_h: int) -> float:
|
||||
"""
|
||||
Simple quality weight: larger markers, more central = higher weight.
|
||||
"""
|
||||
area = float(cv2.contourArea(corners_px.astype(np.float32)))
|
||||
cx, cy = corners_px.mean(axis=0)
|
||||
dx = abs(cx - img_w / 2.0) / (img_w / 2.0)
|
||||
dy = abs(cy - img_h / 2.0) / (img_h / 2.0)
|
||||
centrality = 1.0 - 0.5 * (dx + dy)
|
||||
return max(0.05, (area / 500.0) * centrality)
|
||||
|
||||
|
||||
def collect_marker_world_poses(
|
||||
detection_files: List[Path],
|
||||
pose_files: List[Path],
|
||||
target_marker_ids: set,
|
||||
marker_size_m: float,
|
||||
verbose: bool = True,
|
||||
) -> Dict[int, List[Dict[str, Any]]]:
|
||||
"""
|
||||
For each target marker, collect world-frame pose estimates from all cameras.
|
||||
|
||||
Returns: marker_id → list of {pos_m, R, weight, camera_id}
|
||||
"""
|
||||
result: Dict[int, List[Dict[str, Any]]] = {}
|
||||
|
||||
for det_path, pose_path in zip(detection_files, pose_files):
|
||||
det = _load(det_path)
|
||||
pose_json = _load(pose_path)
|
||||
|
||||
K, D = _intrinsics_from_detection(det)
|
||||
rvec_wc, tvec_wc = _camera_pose_from_json(pose_json)
|
||||
|
||||
cam_id = det.get("camera", {}).get("camera_id", str(det_path))
|
||||
img_w = int(det.get("image", {}).get("width_px", 1280))
|
||||
img_h = int(det.get("image", {}).get("height_px", 720))
|
||||
|
||||
for d in det.get("detections", []):
|
||||
mid = int(d.get("marker_id", -1))
|
||||
if mid not in target_marker_ids:
|
||||
continue
|
||||
corners = np.array(d.get("image_points_px", []), dtype=np.float32).reshape(4, 2)
|
||||
if corners.shape != (4, 2):
|
||||
continue
|
||||
|
||||
pnp = _solve_marker_pose_in_camera(corners, marker_size_m, K, D)
|
||||
if pnp is None:
|
||||
continue
|
||||
rvec_cm, tvec_cm = pnp
|
||||
|
||||
pos_w, R_w = _marker_world_pose(rvec_cm, tvec_cm, rvec_wc, tvec_wc)
|
||||
w = _observation_weight(corners, img_w, img_h)
|
||||
|
||||
result.setdefault(mid, []).append({
|
||||
"pos_m": pos_w,
|
||||
"R": R_w,
|
||||
"weight": w,
|
||||
"camera_id": cam_id,
|
||||
})
|
||||
|
||||
if verbose:
|
||||
for mid, obs in result.items():
|
||||
print(f" M{mid}: {len(obs)} camera(s) — weights "
|
||||
f"{[f'{o[\"weight\"]:.2f}' for o in obs]}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def fuse_marker_positions(
|
||||
observations: Dict[int, List[Dict[str, Any]]]
|
||||
) -> Dict[int, np.ndarray]:
|
||||
"""
|
||||
Weighted-mean fusion of world-frame marker centre positions (metres).
|
||||
"""
|
||||
result: Dict[int, np.ndarray] = {}
|
||||
for mid, obs_list in observations.items():
|
||||
total_w = sum(o["weight"] for o in obs_list)
|
||||
if total_w < 1e-12:
|
||||
continue
|
||||
pos = sum(o["pos_m"] * o["weight"] for o in obs_list) / total_w
|
||||
result[mid] = pos.astype(np.float64)
|
||||
return result
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# FK-based optimizer for b, c, e
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _fk_finger_positions(
|
||||
fk: RobotFK,
|
||||
base_state: Dict[str, float],
|
||||
b: float,
|
||||
c: float,
|
||||
e: float
|
||||
) -> Dict[int, np.ndarray]:
|
||||
"""
|
||||
Compute world positions (m) of all FingerA and FingerB markers.
|
||||
base_state already contains x, y, z, a.
|
||||
"""
|
||||
state = dict(base_state)
|
||||
state.update({"b": b, "c": c, "e": e})
|
||||
transforms = fk.compute(state)
|
||||
result = {}
|
||||
for link_name in ("FingerA", "FingerB", "Hand", "Palm"):
|
||||
if link_name not in fk.links:
|
||||
continue
|
||||
for m in fk.links[link_name].get("markers", []):
|
||||
mid = int(m.get("id", -1))
|
||||
if mid < 0 or "position" not in m:
|
||||
continue
|
||||
# position in METRES
|
||||
local_mm = np.array(m["position"], dtype=float)
|
||||
world_m = fk.marker_world(transforms, link_name, local_mm) / 1000.0
|
||||
result[mid] = world_m
|
||||
return result
|
||||
|
||||
|
||||
def optimize_bce(
|
||||
fk: RobotFK,
|
||||
base_state: Dict[str, float],
|
||||
observed_m: Dict[int, np.ndarray],
|
||||
b_init: float = 0.0,
|
||||
c_init: float = 0.0,
|
||||
e_init: float = 0.0,
|
||||
verbose: bool = True,
|
||||
) -> dict:
|
||||
"""
|
||||
Find (b, c, e) minimising reprojection of FK finger-marker positions
|
||||
to observed world positions.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
fk : RobotFK
|
||||
base_state : already-estimated {x, y, z, a} (all in mm/deg)
|
||||
observed_m : {marker_id: world_position_m}
|
||||
"""
|
||||
|
||||
if not _HAS_SCIPY:
|
||||
return {"status": "failed", "reason": "scipy not available (pip install scipy)"}
|
||||
|
||||
def residuals(p):
|
||||
b, c, e = float(p[0]), float(p[1]), float(p[2])
|
||||
fk_pos = _fk_finger_positions(fk, base_state, b, c, e)
|
||||
r = []
|
||||
for mid, obs_p in observed_m.items():
|
||||
if mid in fk_pos:
|
||||
diff = fk_pos[mid] - obs_p
|
||||
r.extend(diff.tolist())
|
||||
return r
|
||||
|
||||
# Initial residual
|
||||
r0 = residuals([b_init, c_init, e_init])
|
||||
if len(r0) < 3:
|
||||
return {
|
||||
"status": "failed",
|
||||
"reason": f"Only {len(r0)//3} finger markers matched. Need ≥1.",
|
||||
}
|
||||
|
||||
res = least_squares(
|
||||
residuals,
|
||||
x0=[b_init, c_init, e_init],
|
||||
method="lm",
|
||||
max_nfev=2000,
|
||||
)
|
||||
|
||||
b_est, c_est, e_est = float(res.x[0]), float(res.x[1]), float(res.x[2])
|
||||
|
||||
rms_m = float(np.sqrt(np.mean(np.array(res.fun) ** 2)))
|
||||
rms_mm = rms_m * 1000.0
|
||||
|
||||
accum = dict(base_state)
|
||||
accum.update({"b": b_est, "c": c_est, "e": e_est})
|
||||
|
||||
if verbose:
|
||||
print(f"\n── 4d: Hand/Palm/Finger (b, c, e) ──────────────────────")
|
||||
print(f" Markers used: {len(observed_m)} fused positions")
|
||||
print(f" b = {b_est:+.2f} ° c = {c_est:+.2f} ° e = {e_est:+.2f} mm")
|
||||
print(f" RMS position error: {rms_mm:.2f} mm")
|
||||
if not res.success:
|
||||
print(f" [WARN] Optimizer: {res.message}")
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"b_deg": b_est,
|
||||
"c_deg": c_est,
|
||||
"e_mm": e_est,
|
||||
"rms_mm": rms_mm,
|
||||
"optimizer_success": bool(res.success),
|
||||
"num_markers": len(observed_m),
|
||||
"accumulated_state": accum,
|
||||
}
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Main
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> int:
|
||||
p = argparse.ArgumentParser(
|
||||
description="4d: Direct hand/finger pose from per-camera solvePnP")
|
||||
p.add_argument("--robot", type=Path, required=True)
|
||||
p.add_argument("--det", "-d", type=Path, nargs="+", required=True,
|
||||
help="*_aruco_detection.json files (one per camera)")
|
||||
p.add_argument("--pose", "-p", type=Path, nargs="+", required=True,
|
||||
help="*_camera_pose.json files (same order as --det)")
|
||||
p.add_argument("--state", type=Path, default=None,
|
||||
help="JSON from previous 4b step (accumulated_state)")
|
||||
for k in ("x", "y", "z", "a"):
|
||||
p.add_argument(f"--{k}", type=float, default=None,
|
||||
help=f"Known {k} value (overrides --state)")
|
||||
p.add_argument("--b-init", type=float, default=0.0)
|
||||
p.add_argument("--c-init", type=float, default=0.0)
|
||||
p.add_argument("--e-init", type=float, default=0.0)
|
||||
p.add_argument("--output", type=Path, default=None)
|
||||
args = p.parse_args()
|
||||
|
||||
if len(args.det) != len(args.pose):
|
||||
print("[ERROR] --det and --pose must have the same number of files")
|
||||
return 1
|
||||
|
||||
# Base state from file or CLI
|
||||
base_state: Dict[str, float] = {}
|
||||
if args.state:
|
||||
base_state = _load_state(args.state)
|
||||
print(f" Loaded state: {base_state}")
|
||||
for k in ("x", "y", "z", "a"):
|
||||
v = getattr(args, k, None)
|
||||
if v is not None:
|
||||
base_state[k] = float(v)
|
||||
|
||||
# Load FK
|
||||
fk = RobotFK.from_file(args.robot)
|
||||
|
||||
# Which markers belong to Hand/Palm/FingerA/FingerB?
|
||||
target_ids: set = set()
|
||||
for link_name in ("Hand", "Palm", "FingerA", "FingerB"):
|
||||
for m in fk.links.get(link_name, {}).get("markers", []):
|
||||
mid = int(m.get("id", -1))
|
||||
if mid >= 0:
|
||||
target_ids.add(mid)
|
||||
|
||||
marker_size_m = float(
|
||||
fk.robot.get("vision_config", {}).get("MarkerSize", 0.025)
|
||||
)
|
||||
|
||||
print(f"\n Target marker IDs: {sorted(target_ids)}")
|
||||
print(f" Marker size: {marker_size_m*1000:.1f} mm")
|
||||
print(f" Cameras: {len(args.det)}")
|
||||
|
||||
# Collect world-frame poses from all cameras via solvePnP
|
||||
print("\n Per-camera solvePnP:")
|
||||
world_pose_obs = collect_marker_world_poses(
|
||||
args.det, args.pose, target_ids, marker_size_m, verbose=True
|
||||
)
|
||||
|
||||
if not world_pose_obs:
|
||||
print("[ERROR] No target markers found in any detection file.")
|
||||
return 1
|
||||
|
||||
# Fuse positions
|
||||
observed_m = fuse_marker_positions(world_pose_obs)
|
||||
print(f"\n Fused {len(observed_m)} marker world positions")
|
||||
|
||||
# Optimise b, c, e
|
||||
result = optimize_bce(
|
||||
fk = fk,
|
||||
base_state = base_state,
|
||||
observed_m = observed_m,
|
||||
b_init = args.b_init,
|
||||
c_init = args.c_init,
|
||||
e_init = args.e_init,
|
||||
verbose = True,
|
||||
)
|
||||
|
||||
if args.output and result["status"] == "ok":
|
||||
args.output.parent.mkdir(parents=True, exist_ok=True)
|
||||
with args.output.open("w", encoding="utf-8") as f:
|
||||
json.dump(result, f, indent=2)
|
||||
print(f"\n Saved → {args.output}")
|
||||
|
||||
return 0 if result["status"] == "ok" else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Binary file not shown.
BIN
pipeline/__pycache__/robot_fk.cpython-311.pyc
Normal file
BIN
pipeline/__pycache__/robot_fk.cpython-311.pyc
Normal file
Binary file not shown.
310
pipeline/robot_fk.py
Normal file
310
pipeline/robot_fk.py
Normal file
@@ -0,0 +1,310 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
robot_fk.py
|
||||
-----------
|
||||
Minimal forward kinematics engine for the robot.json format.
|
||||
|
||||
Matches the Blender hierarchy used by render_robot.py exactly:
|
||||
world_T_link = world_T_parent
|
||||
@ Translate(mountPosition) @ Rotate_xyz(mountRotation)
|
||||
@ Translate(jointOrigin) @ Rotate_xyz(joint.rotation)
|
||||
@ T_motion
|
||||
|
||||
T_motion = Rotate(axis, value_deg) for revolute joints
|
||||
Translate(axis * value_mm) for linear joints
|
||||
|
||||
Units throughout: millimetres, degrees.
|
||||
|
||||
Public API
|
||||
----------
|
||||
fk = RobotFK.from_file("robot.json")
|
||||
|
||||
transforms = fk.compute({"x": 180, "y": 86, "z": -120,
|
||||
"a": -60, "b": 22, "c": 91, "e": 10})
|
||||
# → dict link_name -> 4×4 np.ndarray (world frame, mm)
|
||||
|
||||
p_world = fk.marker_world(transforms, "Arm1", [0, -160, 35])
|
||||
# → np.ndarray shape (3,), in mm
|
||||
|
||||
all_m = fk.all_markers_world(transforms)
|
||||
# → dict marker_id -> {"world_mm", "link", "local_mm"}
|
||||
|
||||
# Cumulative x-offset for a link at all-zero state
|
||||
# (useful for 4a: x_slider = world_x - local_x - link_x_at_zero)
|
||||
x0 = fk.link_x_at_zero_state("Arm1") # → float mm
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import math
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Sequence, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
STATE_KEYS = ("x", "y", "z", "a", "b", "c", "e")
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# Low-level matrix helpers
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _rot_axis_angle(axis: Sequence[float], angle_deg: float) -> np.ndarray:
|
||||
"""3×3 rotation matrix via Rodrigues (axis need not be normalised)."""
|
||||
ax = np.asarray(axis, dtype=float)
|
||||
n = float(np.linalg.norm(ax))
|
||||
if n < 1e-12:
|
||||
return np.eye(3)
|
||||
ax = ax / n
|
||||
c = math.cos(math.radians(angle_deg))
|
||||
s = math.sin(math.radians(angle_deg))
|
||||
t = 1.0 - c
|
||||
x, y, z = ax
|
||||
return np.array([
|
||||
[t*x*x + c, t*x*y - s*z, t*x*z + s*y],
|
||||
[t*x*y + s*z, t*y*y + c, t*y*z - s*x],
|
||||
[t*x*z - s*y, t*y*z + s*x, t*z*z + c ],
|
||||
])
|
||||
|
||||
|
||||
def _rot_xyz_euler(rx: float, ry: float, rz: float) -> np.ndarray:
|
||||
"""XYZ Euler angles (degrees) → 3×3 — matches Blender XYZ Euler mode."""
|
||||
return (_rot_axis_angle([0, 0, 1], rz)
|
||||
@ _rot_axis_angle([0, 1, 0], ry)
|
||||
@ _rot_axis_angle([1, 0, 0], rx))
|
||||
|
||||
|
||||
def make_T(R: np.ndarray, t: Sequence[float]) -> np.ndarray:
|
||||
"""4×4 homogeneous transform."""
|
||||
T = np.eye(4)
|
||||
T[:3, :3] = R
|
||||
T[:3, 3] = np.asarray(t, dtype=float)
|
||||
return T
|
||||
|
||||
|
||||
def transform_point(T: np.ndarray, p: Sequence[float]) -> np.ndarray:
|
||||
"""Apply 4×4 transform to a 3-D point."""
|
||||
h = np.array([p[0], p[1], p[2], 1.0])
|
||||
return (T @ h)[:3]
|
||||
|
||||
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
# FK engine
|
||||
# ──────────────────────────────────────────────────────────────
|
||||
|
||||
class RobotFK:
|
||||
"""Forward kinematics for the robot.json format."""
|
||||
|
||||
def __init__(self, robot_data: Dict[str, Any]):
|
||||
self.robot = robot_data
|
||||
self.links: Dict[str, Any] = robot_data.get("links", {})
|
||||
self._topo: List[str] = self._topological_sort()
|
||||
|
||||
# ── construction ─────────────────────────────────────────
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, path) -> "RobotFK":
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
return cls(json.load(f))
|
||||
|
||||
def _topological_sort(self) -> List[str]:
|
||||
parent_map = {n: d.get("parent") for n, d in self.links.items()}
|
||||
visited, order = set(), []
|
||||
|
||||
def visit(name: str) -> None:
|
||||
if name in visited:
|
||||
return
|
||||
visited.add(name)
|
||||
p = parent_map.get(name)
|
||||
if p and p in self.links:
|
||||
visit(p)
|
||||
order.append(name)
|
||||
|
||||
for name in self.links:
|
||||
visit(name)
|
||||
return order
|
||||
|
||||
# ── core computation ──────────────────────────────────────
|
||||
|
||||
def compute(self, joint_values: Dict[str, float]) -> Dict[str, np.ndarray]:
|
||||
"""
|
||||
Compute link world transforms for the given joint state.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
joint_values : dict variable_name -> value
|
||||
Linear joints (x, e): mm
|
||||
Revolute joints (y, z, a, b, c): degrees
|
||||
|
||||
Returns
|
||||
-------
|
||||
dict link_name -> 4×4 np.ndarray (world frame, mm)
|
||||
"""
|
||||
state = {k: 0.0 for k in STATE_KEYS}
|
||||
for k, v in joint_values.items():
|
||||
if k in state:
|
||||
state[k] = float(v)
|
||||
|
||||
transforms: Dict[str, np.ndarray] = {}
|
||||
|
||||
for link_name in self._topo:
|
||||
d = self.links[link_name]
|
||||
parent = d.get("parent")
|
||||
T_parent = transforms.get(parent, np.eye(4)) if parent else np.eye(4)
|
||||
|
||||
# 1 · Mount (static in parent frame)
|
||||
mp = d.get("mountPosition", [0, 0, 0])
|
||||
mr = d.get("mountRotation", [0, 0, 0])
|
||||
T_m = make_T(_rot_xyz_euler(*mr), mp)
|
||||
|
||||
# 2 · Joint origin (pivot point in mount frame)
|
||||
ji = d.get("jointToParent", {}) or {}
|
||||
jp = ji.get("origin", [0, 0, 0])
|
||||
jr = ji.get("rotation", [0, 0, 0])
|
||||
T_j = make_T(_rot_xyz_euler(*jr), jp)
|
||||
|
||||
# 3 · Motion
|
||||
jtype = str(ji.get("type", "fixed")).lower()
|
||||
var = str(ji.get("variable", "")).lower()
|
||||
axis = np.asarray(ji.get("axis", [1, 0, 0]), dtype=float)
|
||||
val = state.get(var, 0.0)
|
||||
|
||||
if jtype == "revolute":
|
||||
T_mot = make_T(_rot_axis_angle(axis, val), [0, 0, 0])
|
||||
elif jtype == "linear":
|
||||
n = float(np.linalg.norm(axis))
|
||||
u = axis / n if n > 1e-12 else np.array([1.0, 0, 0])
|
||||
T_mot = make_T(np.eye(3), u * val)
|
||||
else:
|
||||
T_mot = np.eye(4)
|
||||
|
||||
transforms[link_name] = T_parent @ T_m @ T_j @ T_mot
|
||||
|
||||
return transforms
|
||||
|
||||
# ── marker helpers ────────────────────────────────────────
|
||||
|
||||
@staticmethod
|
||||
def marker_world(transforms: Dict[str, np.ndarray],
|
||||
link_name: str,
|
||||
local_pos: Sequence[float]) -> np.ndarray:
|
||||
"""Transform a local marker position → world (mm)."""
|
||||
return transform_point(transforms.get(link_name, np.eye(4)), local_pos)
|
||||
|
||||
def all_markers_world(self,
|
||||
transforms: Dict[str, np.ndarray]
|
||||
) -> Dict[int, Dict[str, Any]]:
|
||||
"""
|
||||
Returns
|
||||
-------
|
||||
dict marker_id -> {world_mm, local_mm, link, normal_world}
|
||||
"""
|
||||
result: Dict[int, Dict[str, Any]] = {}
|
||||
for lname, ldata in self.links.items():
|
||||
T = transforms.get(lname, np.eye(4))
|
||||
R = T[:3, :3]
|
||||
for m in ldata.get("markers", []):
|
||||
mid = int(m.get("id", -1))
|
||||
if mid < 0 or "position" not in m:
|
||||
continue
|
||||
loc = np.array(m["position"], dtype=float)
|
||||
nor = np.array(m.get("normal", [0, 0, 1]), dtype=float)
|
||||
result[mid] = {
|
||||
"world_mm": transform_point(T, loc),
|
||||
"local_mm": loc,
|
||||
"link": lname,
|
||||
"normal_world": (R @ nor) / max(np.linalg.norm(R @ nor), 1e-12),
|
||||
}
|
||||
return result
|
||||
|
||||
# ── x-axis invariant helpers (used by 4a) ────────────────
|
||||
|
||||
def link_x_at_zero_state(self, link_name: str) -> float:
|
||||
"""
|
||||
Return the world x-coordinate of the link-frame origin
|
||||
when ALL joint variables are zero.
|
||||
|
||||
For any link reachable via only x-axis rotations (Arm1, Ellbow, Arm2),
|
||||
this value is constant regardless of the actual revolute angles.
|
||||
Adding the slider value x_mm gives the true link origin x:
|
||||
link_origin_world_x = x_mm + link_x_at_zero_state(link_name)
|
||||
|
||||
And for any marker in that link:
|
||||
marker_world_x = x_mm + link_x_at_zero_state(link_name) + marker_local_x
|
||||
"""
|
||||
T = self.compute({k: 0.0 for k in STATE_KEYS})
|
||||
return float(T[link_name][0, 3])
|
||||
|
||||
def joint_origin_world(self,
|
||||
link_name: str,
|
||||
joint_state: Dict[str, float]) -> np.ndarray:
|
||||
"""
|
||||
World position of the pivot that link_name rotates / slides around.
|
||||
"""
|
||||
d = self.links[link_name]
|
||||
parent = d.get("parent")
|
||||
T_all = self.compute(joint_state)
|
||||
T_parent = T_all.get(parent, np.eye(4)) if parent else np.eye(4)
|
||||
|
||||
mp = d.get("mountPosition", [0, 0, 0])
|
||||
mr = d.get("mountRotation", [0, 0, 0])
|
||||
T_m = make_T(_rot_xyz_euler(*mr), mp)
|
||||
|
||||
ji = d.get("jointToParent", {}) or {}
|
||||
jp = ji.get("origin", [0, 0, 0])
|
||||
jr = ji.get("rotation", [0, 0, 0])
|
||||
T_j = make_T(_rot_xyz_euler(*jr), jp)
|
||||
|
||||
return transform_point(T_parent @ T_m @ T_j, [0, 0, 0])
|
||||
|
||||
def joint_axis_world(self,
|
||||
link_name: str,
|
||||
joint_state: Dict[str, float]) -> np.ndarray:
|
||||
"""
|
||||
Joint axis of link_name expressed in world frame.
|
||||
"""
|
||||
d = self.links[link_name]
|
||||
parent = d.get("parent")
|
||||
T_all = self.compute(joint_state)
|
||||
T_parent = T_all.get(parent, np.eye(4)) if parent else np.eye(4)
|
||||
|
||||
mp = d.get("mountPosition", [0, 0, 0])
|
||||
mr = d.get("mountRotation", [0, 0, 0])
|
||||
T_m = make_T(_rot_xyz_euler(*mr), mp)
|
||||
|
||||
ji = d.get("jointToParent", {}) or {}
|
||||
jp = ji.get("origin", [0, 0, 0])
|
||||
jr = ji.get("rotation", [0, 0, 0])
|
||||
T_j = make_T(_rot_xyz_euler(*jr), jp)
|
||||
|
||||
R_to_pivot = (T_parent @ T_m @ T_j)[:3, :3]
|
||||
axis_local = np.asarray(ji.get("axis", [1, 0, 0]), dtype=float)
|
||||
world = R_to_pivot @ axis_local
|
||||
n = float(np.linalg.norm(world))
|
||||
return world / n if n > 1e-12 else world
|
||||
|
||||
# ── utility ───────────────────────────────────────────────
|
||||
|
||||
def chain(self, link_name: str) -> List[str]:
|
||||
"""Return chain from root to link_name (inclusive)."""
|
||||
out, cur = [], link_name
|
||||
while cur:
|
||||
out.append(cur)
|
||||
cur = self.links.get(cur, {}).get("parent")
|
||||
return list(reversed(out))
|
||||
|
||||
def board_marker_world_positions(self, length_scale: float = 1.0) -> Dict[int, np.ndarray]:
|
||||
"""
|
||||
Return known world positions for all Board markers (in mm).
|
||||
Board is the root, so its marker positions ARE world positions.
|
||||
length_scale: 1/1000 if robot.json uses mm.
|
||||
"""
|
||||
board = self.links.get("Board", {})
|
||||
result: Dict[int, np.ndarray] = {}
|
||||
for m in board.get("markers", []):
|
||||
mid = int(m.get("id", -1))
|
||||
if mid >= 0 and "position" in m:
|
||||
p = np.array(m["position"], dtype=float) * length_scale
|
||||
result[mid] = p
|
||||
return result
|
||||
Reference in New Issue
Block a user