465 lines
16 KiB
Python
465 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
4d_direct_hand_pose.py
|
|
----------------------
|
|
Estimate Hand/Palm/Finger joint angles (b, c, e) directly from ArUco
|
|
marker observations — WITHOUT relying on step-3 triangulated positions.
|
|
|
|
Why this is needed
|
|
------------------
|
|
Step 3 triangulates marker centres from multiple cameras. For markers
|
|
far above the board (arm extended in Z), the depth estimation from
|
|
triangulation degrades: all reference Board markers lie in a flat plane,
|
|
so the cameras are poorly conditioned for Z estimation of out-of-plane points.
|
|
|
|
This script takes a completely different route:
|
|
|
|
1. For each camera that sees a finger or hand marker, run cv2.solvePnP
|
|
on the four detected corners. solvePnP uses the apparent size /
|
|
aspect ratio of the marker to recover depth — it is NOT affected
|
|
by the flat-board reference plane problem.
|
|
|
|
2. Use the known camera-in-world pose (from step 2) to convert the
|
|
per-camera marker pose to world frame.
|
|
|
|
3. Fuse all world-frame marker poses with quality weighting.
|
|
|
|
4. Given world-frame positions for all visible finger markers, and the
|
|
known local positions in the FingerA/FingerB frames, solve a small
|
|
nonlinear LS problem for (b, c, e) using scipy.
|
|
|
|
What you need
|
|
-------------
|
|
- robot.json
|
|
- Step-2 camera-pose JSONs for all cameras
|
|
- Step-1 aruco-detection JSONs for all cameras
|
|
- Already-estimated x, y, z, a from 4a/4b (pass via --state-json or CLI)
|
|
|
|
Usage
|
|
-----
|
|
python 4d_direct_hand_pose.py \\
|
|
--robot robot.json \\
|
|
--det cam1_aruco_detection.json cam2_aruco_detection.json ... \\
|
|
--pose cam1_camera_pose.json cam2_camera_pose.json ... \\
|
|
--state 4b_arm2_result.json \\
|
|
--output 4d_result.json
|
|
|
|
Output
|
|
------
|
|
{
|
|
"b_deg": ..., "c_deg": ..., "e_mm": ...,
|
|
"rms_mm": ...,
|
|
"accumulated_state": {"x":..., "y":..., "z":..., "a":..., "b":..., "c":..., "e":...}
|
|
}
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Sequence, Tuple
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
try:
|
|
from scipy.optimize import least_squares
|
|
_HAS_SCIPY = True
|
|
except ImportError:
|
|
_HAS_SCIPY = False
|
|
|
|
from robot_fk import RobotFK, STATE_KEYS
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# I/O
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
def _load(path: Path) -> dict:
|
|
with path.open("r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def _load_state(path: Path) -> Dict[str, float]:
|
|
raw = _load(path)
|
|
state = raw.get("accumulated_state", raw.get("state", {}))
|
|
return {k: float(v) for k, v in state.items() if k in STATE_KEYS}
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# ArUco / PnP helpers
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
def _marker_local_corners(size_m: float) -> np.ndarray:
|
|
"""Four local corners of a marker (same order as OpenCV ArUco)."""
|
|
h = size_m / 2.0
|
|
return np.array([[-h, h, 0], [h, h, 0], [h, -h, 0], [-h, -h, 0]], dtype=np.float32)
|
|
|
|
|
|
def _camera_pose_from_json(pose_json: dict) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""
|
|
Load world-to-camera rotation and translation from step-2 JSON.
|
|
Returns rvec (3,), tvec (3,) — the same convention used in step 2.
|
|
"""
|
|
wtc = pose_json["camera_pose"]["world_to_camera"]
|
|
R = np.array(wtc["rotation_matrix"], dtype=np.float32).reshape(3, 3)
|
|
t = np.array(wtc["translation_m"], dtype=np.float32).reshape(3)
|
|
rvec, _ = cv2.Rodrigues(R)
|
|
return rvec.reshape(3), t
|
|
|
|
|
|
def _intrinsics_from_detection(det: dict) -> Tuple[np.ndarray, np.ndarray]:
|
|
cam = det["camera"]
|
|
K = np.array(cam["camera_matrix"], dtype=np.float32).reshape(3, 3)
|
|
D = np.array(cam["distortion_coefficients"], dtype=np.float32).reshape(-1, 1)
|
|
return K, D
|
|
|
|
|
|
def _solve_marker_pose_in_camera(
|
|
corners_px: np.ndarray,
|
|
marker_size_m: float,
|
|
K: np.ndarray,
|
|
D: np.ndarray
|
|
) -> Optional[Tuple[np.ndarray, np.ndarray]]:
|
|
"""
|
|
Estimate marker-in-camera pose via solvePnP (IPPE_SQUARE).
|
|
Returns (rvec_cm, tvec_cm) or None on failure.
|
|
"""
|
|
obj = _marker_local_corners(marker_size_m)
|
|
img = corners_px.astype(np.float32)
|
|
ok, rvec, tvec = cv2.solvePnP(obj, img, K, D, flags=cv2.SOLVEPNP_IPPE_SQUARE)
|
|
if not ok:
|
|
ok, rvec, tvec = cv2.solvePnP(obj, img, K, D, flags=cv2.SOLVEPNP_ITERATIVE)
|
|
if not ok:
|
|
return None
|
|
return rvec.reshape(3), tvec.reshape(3)
|
|
|
|
|
|
def _marker_world_pose(
|
|
rvec_cm: np.ndarray,
|
|
tvec_cm: np.ndarray,
|
|
rvec_wc: np.ndarray,
|
|
tvec_wc: np.ndarray
|
|
) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""
|
|
Convert marker-in-camera pose to marker-in-world pose.
|
|
|
|
Convention (same as step 2):
|
|
X_cam = R_wc @ X_world + t_wc
|
|
So:
|
|
R_wc = rvec→R(rvec_wc), camera origin in world = -R_wc^T @ t_wc
|
|
|
|
Marker pose in world:
|
|
R_mw = R_wc^T @ R_cm (world_to_marker rotation)
|
|
t_mw = R_wc^T @ (tvec_cm - tvec_wc)
|
|
→ marker centre in world = -R_mw^T @ t_mw = R_mw^T @ (-t_mw)
|
|
|
|
We return (pos_world_m, R_marker_in_world) where
|
|
R_marker_in_world maps local marker axes to world axes.
|
|
"""
|
|
R_wc, _ = cv2.Rodrigues(rvec_wc.reshape(3, 1))
|
|
R_cm, _ = cv2.Rodrigues(rvec_cm.reshape(3, 1))
|
|
|
|
R_cw = R_wc.T # camera-to-world rotation
|
|
|
|
# Marker centre in world frame
|
|
pos_w = R_cw @ tvec_cm.reshape(3) - R_cw @ tvec_wc.reshape(3)
|
|
|
|
# Marker orientation in world frame (columns = marker x,y,z axes in world)
|
|
R_mw = R_cw @ R_cm
|
|
|
|
return pos_w.astype(np.float64), R_mw.astype(np.float64)
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# Multi-camera marker pose fusion
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
def _observation_weight(corners_px: np.ndarray, img_w: int, img_h: int) -> float:
|
|
"""
|
|
Simple quality weight: larger markers, more central = higher weight.
|
|
"""
|
|
area = float(cv2.contourArea(corners_px.astype(np.float32)))
|
|
cx, cy = corners_px.mean(axis=0)
|
|
dx = abs(cx - img_w / 2.0) / (img_w / 2.0)
|
|
dy = abs(cy - img_h / 2.0) / (img_h / 2.0)
|
|
centrality = 1.0 - 0.5 * (dx + dy)
|
|
return max(0.05, (area / 500.0) * centrality)
|
|
|
|
|
|
def collect_marker_world_poses(
|
|
detection_files: List[Path],
|
|
pose_files: List[Path],
|
|
target_marker_ids: set,
|
|
marker_size_m: float,
|
|
verbose: bool = True,
|
|
) -> Dict[int, List[Dict[str, Any]]]:
|
|
"""
|
|
For each target marker, collect world-frame pose estimates from all cameras.
|
|
|
|
Returns: marker_id → list of {pos_m, R, weight, camera_id}
|
|
"""
|
|
result: Dict[int, List[Dict[str, Any]]] = {}
|
|
|
|
for det_path, pose_path in zip(detection_files, pose_files):
|
|
det = _load(det_path)
|
|
pose_json = _load(pose_path)
|
|
|
|
K, D = _intrinsics_from_detection(det)
|
|
rvec_wc, tvec_wc = _camera_pose_from_json(pose_json)
|
|
|
|
cam_id = det.get("camera", {}).get("camera_id", str(det_path))
|
|
img_w = int(det.get("image", {}).get("width_px", 1280))
|
|
img_h = int(det.get("image", {}).get("height_px", 720))
|
|
|
|
for d in det.get("detections", []):
|
|
mid = int(d.get("marker_id", -1))
|
|
if mid not in target_marker_ids:
|
|
continue
|
|
corners = np.array(d.get("image_points_px", []), dtype=np.float32).reshape(4, 2)
|
|
if corners.shape != (4, 2):
|
|
continue
|
|
|
|
pnp = _solve_marker_pose_in_camera(corners, marker_size_m, K, D)
|
|
if pnp is None:
|
|
continue
|
|
rvec_cm, tvec_cm = pnp
|
|
|
|
pos_w, R_w = _marker_world_pose(rvec_cm, tvec_cm, rvec_wc, tvec_wc)
|
|
w = _observation_weight(corners, img_w, img_h)
|
|
|
|
result.setdefault(mid, []).append({
|
|
"pos_m": pos_w,
|
|
"R": R_w,
|
|
"weight": w,
|
|
"camera_id": cam_id,
|
|
})
|
|
|
|
if verbose:
|
|
for mid, obs in result.items():
|
|
print(f" M{mid}: {len(obs)} camera(s) — weights "
|
|
f"{[f'{o[\"weight\"]:.2f}' for o in obs]}")
|
|
|
|
return result
|
|
|
|
|
|
def fuse_marker_positions(
|
|
observations: Dict[int, List[Dict[str, Any]]]
|
|
) -> Dict[int, np.ndarray]:
|
|
"""
|
|
Weighted-mean fusion of world-frame marker centre positions (metres).
|
|
"""
|
|
result: Dict[int, np.ndarray] = {}
|
|
for mid, obs_list in observations.items():
|
|
total_w = sum(o["weight"] for o in obs_list)
|
|
if total_w < 1e-12:
|
|
continue
|
|
pos = sum(o["pos_m"] * o["weight"] for o in obs_list) / total_w
|
|
result[mid] = pos.astype(np.float64)
|
|
return result
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# FK-based optimizer for b, c, e
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
def _fk_finger_positions(
|
|
fk: RobotFK,
|
|
base_state: Dict[str, float],
|
|
b: float,
|
|
c: float,
|
|
e: float
|
|
) -> Dict[int, np.ndarray]:
|
|
"""
|
|
Compute world positions (m) of all FingerA and FingerB markers.
|
|
base_state already contains x, y, z, a.
|
|
"""
|
|
state = dict(base_state)
|
|
state.update({"b": b, "c": c, "e": e})
|
|
transforms = fk.compute(state)
|
|
result = {}
|
|
for link_name in ("FingerA", "FingerB", "Hand", "Palm"):
|
|
if link_name not in fk.links:
|
|
continue
|
|
for m in fk.links[link_name].get("markers", []):
|
|
mid = int(m.get("id", -1))
|
|
if mid < 0 or "position" not in m:
|
|
continue
|
|
# position in METRES
|
|
local_mm = np.array(m["position"], dtype=float)
|
|
world_m = fk.marker_world(transforms, link_name, local_mm) / 1000.0
|
|
result[mid] = world_m
|
|
return result
|
|
|
|
|
|
def optimize_bce(
|
|
fk: RobotFK,
|
|
base_state: Dict[str, float],
|
|
observed_m: Dict[int, np.ndarray],
|
|
b_init: float = 0.0,
|
|
c_init: float = 0.0,
|
|
e_init: float = 0.0,
|
|
verbose: bool = True,
|
|
) -> dict:
|
|
"""
|
|
Find (b, c, e) minimising reprojection of FK finger-marker positions
|
|
to observed world positions.
|
|
|
|
Parameters
|
|
----------
|
|
fk : RobotFK
|
|
base_state : already-estimated {x, y, z, a} (all in mm/deg)
|
|
observed_m : {marker_id: world_position_m}
|
|
"""
|
|
|
|
if not _HAS_SCIPY:
|
|
return {"status": "failed", "reason": "scipy not available (pip install scipy)"}
|
|
|
|
def residuals(p):
|
|
b, c, e = float(p[0]), float(p[1]), float(p[2])
|
|
fk_pos = _fk_finger_positions(fk, base_state, b, c, e)
|
|
r = []
|
|
for mid, obs_p in observed_m.items():
|
|
if mid in fk_pos:
|
|
diff = fk_pos[mid] - obs_p
|
|
r.extend(diff.tolist())
|
|
return r
|
|
|
|
# Initial residual
|
|
r0 = residuals([b_init, c_init, e_init])
|
|
if len(r0) < 3:
|
|
return {
|
|
"status": "failed",
|
|
"reason": f"Only {len(r0)//3} finger markers matched. Need ≥1.",
|
|
}
|
|
|
|
res = least_squares(
|
|
residuals,
|
|
x0=[b_init, c_init, e_init],
|
|
method="lm",
|
|
max_nfev=2000,
|
|
)
|
|
|
|
b_est, c_est, e_est = float(res.x[0]), float(res.x[1]), float(res.x[2])
|
|
|
|
rms_m = float(np.sqrt(np.mean(np.array(res.fun) ** 2)))
|
|
rms_mm = rms_m * 1000.0
|
|
|
|
accum = dict(base_state)
|
|
accum.update({"b": b_est, "c": c_est, "e": e_est})
|
|
|
|
if verbose:
|
|
print(f"\n── 4d: Hand/Palm/Finger (b, c, e) ──────────────────────")
|
|
print(f" Markers used: {len(observed_m)} fused positions")
|
|
print(f" b = {b_est:+.2f} ° c = {c_est:+.2f} ° e = {e_est:+.2f} mm")
|
|
print(f" RMS position error: {rms_mm:.2f} mm")
|
|
if not res.success:
|
|
print(f" [WARN] Optimizer: {res.message}")
|
|
|
|
return {
|
|
"status": "ok",
|
|
"b_deg": b_est,
|
|
"c_deg": c_est,
|
|
"e_mm": e_est,
|
|
"rms_mm": rms_mm,
|
|
"optimizer_success": bool(res.success),
|
|
"num_markers": len(observed_m),
|
|
"accumulated_state": accum,
|
|
}
|
|
|
|
|
|
# ──────────────────────────────────────────────────────────────
|
|
# Main
|
|
# ──────────────────────────────────────────────────────────────
|
|
|
|
def main() -> int:
|
|
p = argparse.ArgumentParser(
|
|
description="4d: Direct hand/finger pose from per-camera solvePnP")
|
|
p.add_argument("--robot", type=Path, required=True)
|
|
p.add_argument("--det", "-d", type=Path, nargs="+", required=True,
|
|
help="*_aruco_detection.json files (one per camera)")
|
|
p.add_argument("--pose", "-p", type=Path, nargs="+", required=True,
|
|
help="*_camera_pose.json files (same order as --det)")
|
|
p.add_argument("--state", type=Path, default=None,
|
|
help="JSON from previous 4b step (accumulated_state)")
|
|
for k in ("x", "y", "z", "a"):
|
|
p.add_argument(f"--{k}", type=float, default=None,
|
|
help=f"Known {k} value (overrides --state)")
|
|
p.add_argument("--b-init", type=float, default=0.0)
|
|
p.add_argument("--c-init", type=float, default=0.0)
|
|
p.add_argument("--e-init", type=float, default=0.0)
|
|
p.add_argument("--output", type=Path, default=None)
|
|
args = p.parse_args()
|
|
|
|
if len(args.det) != len(args.pose):
|
|
print("[ERROR] --det and --pose must have the same number of files")
|
|
return 1
|
|
|
|
# Base state from file or CLI
|
|
base_state: Dict[str, float] = {}
|
|
if args.state:
|
|
base_state = _load_state(args.state)
|
|
print(f" Loaded state: {base_state}")
|
|
for k in ("x", "y", "z", "a"):
|
|
v = getattr(args, k, None)
|
|
if v is not None:
|
|
base_state[k] = float(v)
|
|
|
|
# Load FK
|
|
fk = RobotFK.from_file(args.robot)
|
|
|
|
# Which markers belong to Hand/Palm/FingerA/FingerB?
|
|
target_ids: set = set()
|
|
for link_name in ("Hand", "Palm", "FingerA", "FingerB"):
|
|
for m in fk.links.get(link_name, {}).get("markers", []):
|
|
mid = int(m.get("id", -1))
|
|
if mid >= 0:
|
|
target_ids.add(mid)
|
|
|
|
marker_size_m = float(
|
|
fk.robot.get("vision_config", {}).get("MarkerSize", 0.025)
|
|
)
|
|
|
|
print(f"\n Target marker IDs: {sorted(target_ids)}")
|
|
print(f" Marker size: {marker_size_m*1000:.1f} mm")
|
|
print(f" Cameras: {len(args.det)}")
|
|
|
|
# Collect world-frame poses from all cameras via solvePnP
|
|
print("\n Per-camera solvePnP:")
|
|
world_pose_obs = collect_marker_world_poses(
|
|
args.det, args.pose, target_ids, marker_size_m, verbose=True
|
|
)
|
|
|
|
if not world_pose_obs:
|
|
print("[ERROR] No target markers found in any detection file.")
|
|
return 1
|
|
|
|
# Fuse positions
|
|
observed_m = fuse_marker_positions(world_pose_obs)
|
|
print(f"\n Fused {len(observed_m)} marker world positions")
|
|
|
|
# Optimise b, c, e
|
|
result = optimize_bce(
|
|
fk = fk,
|
|
base_state = base_state,
|
|
observed_m = observed_m,
|
|
b_init = args.b_init,
|
|
c_init = args.c_init,
|
|
e_init = args.e_init,
|
|
verbose = True,
|
|
)
|
|
|
|
if args.output and result["status"] == "ok":
|
|
args.output.parent.mkdir(parents=True, exist_ok=True)
|
|
with args.output.open("w", encoding="utf-8") as f:
|
|
json.dump(result, f, indent=2)
|
|
print(f"\n Saved → {args.output}")
|
|
|
|
return 0 if result["status"] == "ok" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|