Claude: Aufräumen

This commit is contained in:
chk
2026-06-01 21:15:05 +02:00
parent 4b5108aaa8
commit 7b32a50889
146 changed files with 33544 additions and 12331 deletions

View File

@@ -1,674 +0,0 @@
#!/usr/bin/env python3
"""
3_multiview_bundle_adjustment.py
Multi-view ArUco marker position optimization with geometric constraints.
Strategy:
1. Triangulate markers from multi-view observations (rough initial estimate)
2. Optimize marker positions with constraints:
- Same-link distance constraints: markers on same link keep fixed relative distance
- Joint-axis constraints: markers on linked arms via joint keep distance along joint axis
3. Use bundle adjustment: minimize reprojection error + constraint violations
Key insight: We do NOT use robot.json marker positions (they're in different joint config).
We only use robot.json hierarchy to DEFINE constraints.
Dependencies: numpy, opencv-python, scipy
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy as np
# ===================================================================
# Path / JSON helpers
# ===================================================================
def resolve_path(path: str) -> str:
path = os.path.expanduser(path)
if os.path.isabs(path):
return path
return os.path.abspath(path)
def load_json(path: str) -> Dict[str, Any]:
with open(resolve_path(path), "r", encoding="utf-8") as f:
return json.load(f)
def save_json(path: str, data: Dict[str, Any]) -> None:
with open(resolve_path(path), "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
# ===================================================================
# Units
# ===================================================================
def get_length_scale(robot_data: Dict[str, Any]) -> float:
units = robot_data.get("units", {})
length_unit = str(units.get("length", "")).strip().lower()
if length_unit in ("mm", "millimeter", "millimeters"):
return 1.0 / 1000.0
elif length_unit in ("cm", "centimeter", "centimeters"):
return 1.0 / 100.0
return 1.0
# ===================================================================
# Constraint definitions (from robot.json structure ONLY)
# ===================================================================
class MarkerDistanceConstraint:
"""
Two markers on the same link must maintain a fixed distance.
We compute this distance from their relative positions in robot.json,
but scaled to meters. The ACTUAL 3D positions will be optimized,
but their distance must stay constant.
"""
def __init__(self, marker_id_a: int, marker_id_b: int,
link_name: str, target_distance: float, weight: float = 1.0):
self.marker_id_a = marker_id_a
self.marker_id_b = marker_id_b
self.link_name = link_name
self.target_distance = target_distance
self.weight = weight
class JointAxisConstraint:
"""
Two markers on linked arms (connected by joint around axis).
If joint axis is [1,0,0], then delta_x component must stay constant.
"""
def __init__(self, marker_id_parent: int, marker_id_child: int,
parent_link: str, child_link: str,
joint_axis: np.ndarray, # unit vector [x,y,z]
target_delta_along_axis: float,
weight: float = 1.0):
self.marker_id_parent = marker_id_parent
self.marker_id_child = marker_id_child
self.parent_link = parent_link
self.child_link = child_link
self.joint_axis = joint_axis / (np.linalg.norm(joint_axis) + 1e-9)
self.target_delta_along_axis = target_delta_along_axis
self.weight = weight
def extract_constraints_from_hierarchy(robot_data: Dict[str, Any],
length_scale: float) -> Tuple[Dict, List]:
"""
Extract constraint definitions from robot.json.
NOTE: We only extract CONSTRAINT TYPES, not actual marker positions.
Marker positions will come from triangulation.
Returns:
marker_to_link: {marker_id -> link_name}
constraints: List of constraint objects
"""
constraints = []
marker_to_link = {}
links = robot_data.get("links", {})
# Build map: link_name -> markers on that link
link_markers = {}
for link_name, link_data in links.items():
markers = link_data.get("markers", [])
marker_ids = []
marker_positions = {}
for marker in markers:
marker_id = int(marker.get("id", -1))
pos = marker.get("position", None)
if marker_id >= 0 and pos is not None and len(pos) == 3:
marker_ids.append(marker_id)
marker_positions[marker_id] = np.array(pos, dtype=np.float32) * np.float32(length_scale)
marker_to_link[marker_id] = link_name
link_markers[link_name] = (marker_ids, marker_positions)
# Extract same-link distance constraints
for link_name, (marker_ids, positions) in link_markers.items():
if len(marker_ids) >= 2:
# For each pair, compute their distance
for i in range(len(marker_ids)):
for j in range(i + 1, len(marker_ids)):
mid_a = marker_ids[i]
mid_b = marker_ids[j]
pos_a = positions[mid_a]
pos_b = positions[mid_b]
target_dist = float(np.linalg.norm(pos_b - pos_a))
constraint = MarkerDistanceConstraint(
mid_a, mid_b, link_name, target_dist, weight=1.0
)
constraints.append(constraint)
# Extract joint-axis constraints (if we have markers on parent and child links)
for link_name, link_data in links.items():
parent_link = link_data.get("parent", None)
if parent_link is None:
continue
joint_info = link_data.get("jointToParent", {})
if not joint_info:
continue
joint_axis = joint_info.get("axis", [1, 0, 0])
joint_axis = np.array(joint_axis, dtype=np.float32)
# Get markers on this link and parent link
child_marker_ids, child_positions = link_markers.get(link_name, ([], {}))
parent_marker_ids, parent_positions = link_markers.get(parent_link, ([], {}))
if len(child_marker_ids) > 0 and len(parent_marker_ids) > 0:
# Create constraint between first marker of parent and first marker of child
for mid_parent in parent_marker_ids:
for mid_child in child_marker_ids:
if mid_parent == mid_child:
continue
pos_parent = parent_positions[mid_parent]
pos_child = child_positions[mid_child]
delta = pos_child - pos_parent
target_delta_along_axis = float(
np.dot(delta, joint_axis)
)
constraint = JointAxisConstraint(
mid_parent,
mid_child,
parent_link,
link_name,
joint_axis,
target_delta_along_axis,
weight=0.5
)
constraints.append(constraint)
return marker_to_link, constraints
# ===
# Helper Function
#
def print_constraints_with_errors(
title: str,
constraints: List,
positions: Dict[int, np.ndarray]
) -> None:
print(f"\n[INFO] {title}")
for idx, constraint in enumerate(constraints):
# ----------------------------------------------------------
# Distance constraint
# ----------------------------------------------------------
if isinstance(constraint, MarkerDistanceConstraint):
if (
constraint.marker_id_a not in positions or
constraint.marker_id_b not in positions
):
continue
pos_a = positions[constraint.marker_id_a]
pos_b = positions[constraint.marker_id_b]
actual_dist = np.linalg.norm(pos_b - pos_a)
error_m = actual_dist - constraint.target_distance
print(
f" [{idx:03d}] DISTANCE | "
f"Link='{constraint.link_name}' | "
f"M{constraint.marker_id_a} <-> M{constraint.marker_id_b} | "
f"target={constraint.target_distance*1000:.2f} mm | "
f"actual={actual_dist*1000:.2f} mm | "
f"error={error_m*1000:+.2f} mm"
)
# ----------------------------------------------------------
# Joint-axis constraint
# ----------------------------------------------------------
elif isinstance(constraint, JointAxisConstraint):
if (
constraint.marker_id_parent not in positions or
constraint.marker_id_child not in positions
):
continue
pos_parent = positions[constraint.marker_id_parent]
pos_child = positions[constraint.marker_id_child]
delta = pos_child - pos_parent
actual = np.dot(delta, constraint.joint_axis)
error_m = actual - constraint.target_delta_along_axis
axis_str = np.array2string(
constraint.joint_axis,
precision=2,
suppress_small=True
)
print(
f" [{idx:03d}] JOINT_AXIS | "
f"{constraint.parent_link}(M{constraint.marker_id_parent}) -> "
f"{constraint.child_link}(M{constraint.marker_id_child}) | "
f"axis={axis_str} | "
f"target={constraint.target_delta_along_axis*1000:.2f} mm | "
f"actual={actual*1000:.2f} mm | "
f"error={error_m*1000:+.2f} mm"
)
# ===================================================================
# Multi-view loading
# ===================================================================
def load_observations_and_poses(
detection_files: List[str],
pose_files: List[str]
) -> Tuple[Dict, List, List]:
"""Load observations and camera poses."""
if len(detection_files) != len(pose_files):
raise ValueError(f"Mismatch: {len(detection_files)} detections vs {len(pose_files)} poses")
marker_observations = {}
cameras = []
obs_metadata = []
for cam_idx, (det_file, pose_file) in enumerate(zip(detection_files, pose_files)):
det = load_json(det_file)
pose_data = load_json(pose_file)
# Camera intrinsics
cam_section = det.get("camera", {})
K = np.array(cam_section.get("camera_matrix", []), dtype=np.float32).reshape(3, 3)
D = np.array(cam_section.get("distortion_coefficients", []), dtype=np.float32).reshape(-1, 1)
# Camera pose
pose_section = pose_data.get("camera_pose", {})
world_to_cam = pose_section.get("world_to_camera", {})
R_wc = np.array(world_to_cam.get("rotation_matrix", []), dtype=np.float32).reshape(3, 3)
t_wc = np.array(world_to_cam.get("translation_m", []), dtype=np.float32).reshape(3)
cameras.append((K, D, R_wc, t_wc))
# Extract observations
detections = det.get("detections", [])
for det_obj in detections:
marker_id = int(det_obj.get("marker_id", -1))
if marker_id < 0:
continue
center_px = np.array(det_obj.get("center_px", []), dtype=np.float32)
# Undistort to normalized coordinates
pts = center_px.reshape(1, 1, 2).astype(np.float32)
und = cv2.undistortPoints(pts, K, D, P=None)
norm_coords = und.reshape(2).astype(np.float32)
if marker_id not in marker_observations:
marker_observations[marker_id] = []
marker_observations[marker_id].append((cam_idx, norm_coords))
return marker_observations, cameras, obs_metadata
# ===================================================================
# Initial triangulation
# ===================================================================
def triangulate_marker_initial(
marker_id: int,
observations: List[Tuple[int, np.ndarray]],
cameras: List[Tuple]
) -> Optional[np.ndarray]:
"""Rough triangulation using first two views."""
if len(observations) < 2:
return None
K1, D1, R_wc_1, t_wc_1 = cameras[observations[0][0]]
K2, D2, R_wc_2, t_wc_2 = cameras[observations[1][0]]
norm_coords_1 = observations[0][1]
norm_coords_2 = observations[1][1]
# Convert to pixel coordinates
x1_px = K1[0, 0] * norm_coords_1[0] + K1[0, 2]
y1_px = K1[1, 1] * norm_coords_1[1] + K1[1, 2]
x2_px = K2[0, 0] * norm_coords_2[0] + K2[0, 2]
y2_px = K2[1, 1] * norm_coords_2[1] + K2[1, 2]
P1 = K1 @ np.hstack([R_wc_1, t_wc_1.reshape(3, 1)])
P2 = K2 @ np.hstack([R_wc_2, t_wc_2.reshape(3, 1)])
try:
X_h = cv2.triangulatePoints(P1, P2,
np.array([[x1_px], [y1_px]]).astype(np.float32),
np.array([[x2_px], [y2_px]]).astype(np.float32))
X = (X_h[:3] / X_h[3]).reshape(3).astype(np.float32)
return X
except Exception:
return None
def initial_triangulation(
marker_observations: Dict[int, List[Tuple[int, np.ndarray]]],
cameras: List[Tuple]
) -> Dict[int, np.ndarray]:
"""Compute initial marker positions via triangulation."""
triangulated = {}
for marker_id, observations in marker_observations.items():
X = triangulate_marker_initial(marker_id, observations, cameras)
if X is not None:
triangulated[marker_id] = X
return triangulated
# ===================================================================
# Bundle adjustment with constraints
# ===================================================================
def bundle_adjustment_residuals(
marker_positions_flat: np.ndarray,
marker_ids: List[int],
marker_observations: Dict[int, List[Tuple[int, np.ndarray]]],
cameras: List[Tuple],
constraints: List,
lambda_constraint: float = 100.0
) -> np.ndarray:
"""
Compute residuals for bundle adjustment.
marker_positions_flat: [x0, y0, z0, x1, y1, z1, ...]
Returns: [reprojection_residuals, constraint_residuals]
"""
marker_dict = {}
for i, marker_id in enumerate(marker_ids):
marker_dict[marker_id] = marker_positions_flat[i*3:(i+1)*3]
residuals = []
# Reprojection residuals
for marker_id, observations in marker_observations.items():
if marker_id not in marker_dict:
continue
X_world = marker_dict[marker_id]
for cam_idx, norm_coords_obs in observations:
K, D, R_wc, t_wc = cameras[cam_idx]
# Project
X_cam = R_wc @ X_world + t_wc
if X_cam[2] > 0.001: # in front of camera
proj_norm = X_cam[:2] / X_cam[2]
residual = proj_norm - norm_coords_obs
residuals.append(residual[0])
residuals.append(residual[1])
# Constraint residuals
for constraint in constraints:
if isinstance(constraint, MarkerDistanceConstraint):
if constraint.marker_id_a in marker_dict and constraint.marker_id_b in marker_dict:
pos_a = marker_dict[constraint.marker_id_a]
pos_b = marker_dict[constraint.marker_id_b]
actual_dist = np.linalg.norm(pos_b - pos_a)
residual = (actual_dist - constraint.target_distance) * constraint.weight * lambda_constraint
residuals.append(residual)
elif isinstance(constraint, JointAxisConstraint):
if constraint.marker_id_parent in marker_dict and constraint.marker_id_child in marker_dict:
pos_parent = marker_dict[constraint.marker_id_parent]
pos_child = marker_dict[constraint.marker_id_child]
delta = pos_child - pos_parent
actual_delta_along_axis = np.dot(delta, constraint.joint_axis)
residual = (actual_delta_along_axis - constraint.target_delta_along_axis) * constraint.weight * lambda_constraint
residuals.append(residual)
return np.array(residuals, dtype=np.float64)
def optimize_with_constraints(
initial_positions: Dict[int, np.ndarray],
marker_observations: Dict[int, List[Tuple[int, np.ndarray]]],
cameras: List[Tuple],
constraints: List,
lambda_constraint: float = 100.0,
max_iterations: int = 50
) -> Dict[int, np.ndarray]:
"""
Optimize marker positions using scipy least squares.
"""
try:
from scipy.optimize import least_squares
except ImportError:
print("[WARN] scipy not available, skipping optimization. Using initial triangulation.")
return initial_positions
marker_ids = sorted(initial_positions.keys())
x0 = np.concatenate([initial_positions[mid] for mid in marker_ids])
def residuals_fn(x):
return bundle_adjustment_residuals(x, marker_ids, marker_observations, cameras,
constraints, lambda_constraint)
print(f"[INFO] Starting optimization with {len(x0)} variables and {len(constraints)} constraints...")
result = least_squares(
residuals_fn,
x0,
max_nfev=max_iterations * len(marker_ids),
verbose=1
)
optimized_flat = result.x
optimized = {}
for i, marker_id in enumerate(marker_ids):
optimized[marker_id] = optimized_flat[i*3:(i+1)*3]
print(f"[INFO] Optimization complete. Final cost: {np.sum(result.fun**2):.6f}")
return optimized
# ===================================================================
# Main
# ===================================================================
def main() -> None:
parser = argparse.ArgumentParser(
description="Multi-view bundle adjustment with geometric constraints"
)
parser.add_argument(
"-det", "--detections",
action="append",
required=True,
help="*_aruco_detection.json files"
)
parser.add_argument(
"-pose", "--poses",
action="append",
required=True,
help="*_camera_pose.json files"
)
parser.add_argument(
"-robot", "--robot",
required=True,
help="robot.json"
)
parser.add_argument(
"-outDir", "--outDir",
default=None,
help="Output directory"
)
parser.add_argument(
"-lambdaWeight", "--lambdaWeight",
type=float,
default=100.0,
help="Constraint weight"
)
args = parser.parse_args()
if len(args.detections) != len(args.poses):
print(f"[ERROR] Mismatch: {len(args.detections)} vs {len(args.poses)}")
sys.exit(1)
# Load robot
robot_data = load_json(args.robot)
length_scale = get_length_scale(robot_data)
print("[STEP 1] Extract constraint definitions from robot.json hierarchy...")
marker_to_link, constraints = extract_constraints_from_hierarchy(robot_data, length_scale)
print(f"[INFO] Extracted {len(constraints)} constraints")
print("\n[INFO] Constraint list:")
for idx, constraint in enumerate(constraints):
if isinstance(constraint, MarkerDistanceConstraint):
print(
f" [{idx:03d}] DISTANCE | "
f"Link='{constraint.link_name}' | "
f"Marker {constraint.marker_id_a} <-> {constraint.marker_id_b} | "
f"TargetDistance={constraint.target_distance:.6f} m | "
f"Weight={constraint.weight}"
)
elif isinstance(constraint, JointAxisConstraint):
axis_str = np.array2string(
constraint.joint_axis,
precision=3,
suppress_small=True
)
print(
f" [{idx:03d}] JOINT_AXIS | "
f"{constraint.parent_link}({constraint.marker_id_parent}) -> "
f"{constraint.child_link}({constraint.marker_id_child}) | "
f"Axis={axis_str} | "
f"TargetDelta={constraint.target_delta_along_axis:.6f} m | "
f"Weight={constraint.weight}"
)
print("\n[STEP 2] Load observations and camera poses...")
marker_observations, cameras, _ = load_observations_and_poses(args.detections, args.poses)
print(f"[INFO] {len(cameras)} cameras, {len(marker_observations)} observed markers")
print("\n[STEP 3] Initial triangulation...")
initial_pos = initial_triangulation(marker_observations, cameras)
print(f"[INFO] Triangulated {len(initial_pos)} markers")
# ------------------------------------------------------------
# Save initial triangulated positions (before optimization)
# ------------------------------------------------------------
initial_output_markers = []
for marker_id, position in sorted(initial_pos.items()):
initial_output_markers.append({
"marker_id": int(marker_id),
"position_m": [float(x) for x in position],
"position_mm": [float(x * 1000.0) for x in position],
"link": marker_to_link.get(marker_id, "unknown")
})
initial_output = {
"schema_version": "1.0",
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"summary": {
"stage": "initial_triangulation",
"num_cameras": len(cameras),
"num_markers": len(initial_pos)
},
"markers": initial_output_markers
}
out_dir = args.outDir or os.path.dirname(args.detections[0]) or "."
os.makedirs(resolve_path(out_dir), exist_ok=True)
initial_out_file = os.path.join(
out_dir,
"aruco_positions_initial.json"
)
save_json(initial_out_file, initial_output)
print(f"[INFO] Initial triangulation saved to {initial_out_file}")
print("\n[STEP 4] Bundle adjustment with constraints...")
optimized_pos = optimize_with_constraints(
initial_pos, marker_observations, cameras, constraints,
lambda_constraint=args.lambdaWeight
)
print_constraints_with_errors(
"Constraint list AFTER optimization",
constraints,
optimized_pos
)
# Output
output_markers = []
for marker_id, position in sorted(optimized_pos.items()):
output_markers.append({
"marker_id": int(marker_id),
"position_m": [float(x) for x in position],
"position_mm": [float(x * 1000.0) for x in position],
"link": marker_to_link.get(marker_id, "unknown")
})
output = {
"schema_version": "1.0",
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"summary": {
"num_cameras": len(cameras),
"num_markers": len(optimized_pos),
"num_constraints": len(constraints)
},
"markers": output_markers
}
out_dir = args.outDir or os.path.dirname(args.detections[0]) or "."
os.makedirs(resolve_path(out_dir), exist_ok=True)
out_file = os.path.join(out_dir, "aruco_positions_optimized.json")
save_json(out_file, output)
print(f"\n[INFO] Saved to {out_file}")
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,484 +0,0 @@
#!/usr/bin/env python3
"""
4_robotState_estimation_v7.py
Estimate the Arm1 joint angle from ArUco marker positions.
What this script does
---------------------
1. Loads a robot configuration JSON and extracts the local marker positions of Arm1.
2. Loads an observation JSON with marker positions (e.g. aruco_positions_optimized.json).
3. Matches observed markers to Arm1 markers by marker_id.
4. Estimates the Arm1 angle in two ways:
Option 1 (single marker, optional):
----------------------------------
If a world-space joint origin is known, estimate angle from one marker by comparing
the vector from joint origin -> marker in the model and in the observation.
Option 2 (marker pairs, default and robust):
------------------------------------------
For every pair of Arm1 markers, estimate the rotation angle from the relative vector
between the two markers. This is translation-invariant and usually the preferred method.
5. Aggregates all valid estimates using a weighted circular mean and reports:
mean angle, circular variance, circular std, linear std on wrapped samples, and per-estimate details.
Coordinate conventions
----------------------
The provided robot config says:
- right-handed coordinate system
- x right, y backward, z up
Arm1 rotates around the x-axis (joint axis [-1, 0, 0] in the supplied config).
The script therefore works in 3D and automatically respects the joint axis sign.
Usage
-----
python 4_robotState_estimation_v7.py \
--robot-json robot_running_2026_05_30.json \
--aruco-json aruco_positions_optimized.json
Optional:
--joint-origin-world X Y Z # enables single-marker estimates
--output-json result.json
--arm-link Arm1
The script prints a concise report and can also write a JSON summary.
Notes
-----
- This script is intentionally defensive: it accepts positions in meters or millimeters.
- It does not require any additional robot state beyond the Arm1 marker geometry.
- If only a subset of Arm1 markers is observed, the pairwise estimator still works.
"""
from __future__ import annotations
import argparse
import json
import math
from dataclasses import dataclass, asdict
from itertools import combinations
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
import numpy as np
# ----------------------------
# Data structures
# ----------------------------
@dataclass
class MarkerSpec:
marker_id: int
local_position_mm: np.ndarray
@dataclass
class MarkerObservation:
marker_id: int
position_mm: np.ndarray
link: Optional[str] = None
@dataclass
class AngleEstimate:
source: str
angle_rad: float
angle_deg: float
weight: float
marker_ids: Tuple[int, ...]
# ----------------------------
# Helpers
# ----------------------------
def _as_np3(value: Sequence[float]) -> np.ndarray:
arr = np.asarray(value, dtype=float).reshape(3)
return arr
def load_json(path: Path) -> dict:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def unit_scale_to_mm(units: Optional[str]) -> float:
if not units:
return 1.0
u = str(units).strip().lower()
if u in {"mm", "millimeter", "millimeters"}:
return 1.0
if u in {"m", "meter", "meters"}:
return 1000.0
if u in {"cm", "centimeter", "centimeters"}:
return 10.0
# conservative default
return 1.0
def normalize_angle_rad(angle: float) -> float:
"""Wrap to (-pi, pi]."""
return (angle + math.pi) % (2 * math.pi) - math.pi
def angle_between_vectors(v_from: np.ndarray, v_to: np.ndarray, axis: np.ndarray) -> float:
"""
Signed angle rotating v_from onto v_to around axis.
Uses atan2( axis_hat · (v_from × v_to), v_from · v_to ).
"""
a = np.asarray(axis, dtype=float).reshape(3)
an = np.linalg.norm(a)
if an == 0:
raise ValueError("Rotation axis must be non-zero.")
a = a / an
v1 = np.asarray(v_from, dtype=float).reshape(3)
v2 = np.asarray(v_to, dtype=float).reshape(3)
n1 = np.linalg.norm(v1)
n2 = np.linalg.norm(v2)
if n1 == 0 or n2 == 0:
raise ValueError("Cannot compute angle for zero-length vector.")
cross_term = float(np.dot(a, np.cross(v1, v2)))
dot_term = float(np.dot(v1, v2))
return math.atan2(cross_term, dot_term)
def weighted_circular_mean(angles_rad: Sequence[float], weights: Sequence[float]) -> Tuple[float, float, float]:
"""
Returns:
mean_angle_rad, circular_variance, circular_std_rad
circular_variance = 1 - R, where R is the normalized resultant length.
circular_std_rad = sqrt(-2 ln R) if R>0 else pi.
"""
if len(angles_rad) == 0:
raise ValueError("No angles provided.")
ang = np.asarray(angles_rad, dtype=float)
w = np.asarray(weights, dtype=float)
w = np.clip(w, 0.0, None)
if np.sum(w) <= 0:
w = np.ones_like(w)
s = np.sum(w * np.sin(ang))
c = np.sum(w * np.cos(ang))
mean = math.atan2(s, c)
R = math.sqrt(s * s + c * c) / float(np.sum(w))
R = min(max(R, 0.0), 1.0)
circular_variance = 1.0 - R
circular_std = math.sqrt(max(0.0, -2.0 * math.log(max(R, 1e-15)))) if R > 0 else math.pi
return mean, circular_variance, circular_std
def weighted_linear_std_rad(angles_rad: Sequence[float], mean_rad: float, weights: Sequence[float]) -> float:
if len(angles_rad) == 0:
return float("nan")
ang = np.asarray([normalize_angle_rad(a - mean_rad) for a in angles_rad], dtype=float)
w = np.asarray(weights, dtype=float)
w = np.clip(w, 0.0, None)
if np.sum(w) <= 0:
w = np.ones_like(w)
mu = np.sum(w * ang) / np.sum(w)
var = np.sum(w * (ang - mu) ** 2) / np.sum(w)
return float(math.sqrt(max(0.0, var)))
def extract_arm_markers(robot: dict, arm_link_name: str) -> Tuple[np.ndarray, List[MarkerSpec], np.ndarray]:
"""
Returns:
joint_origin_mm, markers, joint_axis
"""
links = robot.get("links", {})
if arm_link_name not in links:
# case-insensitive fallback
lookup = {k.lower(): k for k in links.keys()}
if arm_link_name.lower() not in lookup:
raise KeyError(f"Link '{arm_link_name}' not found in robot.json.")
arm_link_name = lookup[arm_link_name.lower()]
arm = links[arm_link_name]
jt = arm.get("jointToParent", {})
joint_origin_mm = _as_np3(jt.get("origin", [0, 0, 0]))
joint_axis = _as_np3(jt.get("axis", [1, 0, 0]))
markers = []
for m in arm.get("markers", []):
if "id" not in m or "position" not in m:
continue
markers.append(MarkerSpec(marker_id=int(m["id"]), local_position_mm=_as_np3(m["position"])))
if not markers:
raise ValueError(f"No markers found on link '{arm_link_name}'.")
return joint_origin_mm, markers, joint_axis
def load_observations(observations_json: dict) -> List[MarkerObservation]:
markers = observations_json.get("markers", [])
units = observations_json.get("units", {})
scale = 1.0
if isinstance(units, dict):
scale = unit_scale_to_mm(units.get("length"))
else:
scale = unit_scale_to_mm(units)
obs: List[MarkerObservation] = []
for m in markers:
if "marker_id" not in m and "id" not in m:
continue
marker_id = int(m.get("marker_id", m.get("id")))
if "position_mm" in m:
pos = _as_np3(m["position_mm"])
elif "position_m" in m:
pos = _as_np3(m["position_m"]) * 1000.0
elif "position" in m:
pos = _as_np3(m["position"]) * scale
else:
continue
obs.append(MarkerObservation(marker_id=marker_id, position_mm=pos, link=m.get("link")))
return obs
def match_markers(specs: Sequence[MarkerSpec], observations: Sequence[MarkerObservation]) -> Dict[int, Tuple[np.ndarray, np.ndarray]]:
"""
Returns marker_id -> (local_position_mm, observed_position_mm)
"""
spec_map = {m.marker_id: m.local_position_mm for m in specs}
out = {}
for o in observations:
if o.marker_id in spec_map:
out[o.marker_id] = (spec_map[o.marker_id], o.position_mm)
return out
def estimate_pairwise_angles(
matched: Dict[int, Tuple[np.ndarray, np.ndarray]],
axis: np.ndarray,
min_baseline_mm: float = 1.0,
) -> List[AngleEstimate]:
estimates: List[AngleEstimate] = []
ids = sorted(matched.keys())
for id1, id2 in combinations(ids, 2):
p1_local, p1_obs = matched[id1]
p2_local, p2_obs = matched[id2]
v_local = p2_local - p1_local
v_obs = p2_obs - p1_obs
baseline_local = float(np.linalg.norm(v_local))
baseline_obs = float(np.linalg.norm(v_obs))
if baseline_local < min_baseline_mm or baseline_obs < min_baseline_mm:
continue
try:
angle = angle_between_vectors(v_local, v_obs, axis)
except ValueError:
continue
# Weight long baselines a little more strongly; if a pair is very short, it becomes noisy.
weight = max(1e-6, baseline_local * baseline_obs)
estimates.append(
AngleEstimate(
source="pair",
angle_rad=normalize_angle_rad(angle),
angle_deg=math.degrees(normalize_angle_rad(angle)),
weight=weight,
marker_ids=(id1, id2),
)
)
return estimates
def estimate_single_marker_angles(
matched: Dict[int, Tuple[np.ndarray, np.ndarray]],
joint_origin_world_mm: np.ndarray,
axis: np.ndarray,
) -> List[AngleEstimate]:
"""
Single-marker estimates require a known world-space joint origin.
For each marker, compare:
v_local = marker_local - joint_origin_local
v_obs = marker_obs - joint_origin_world
In the provided Arm1 config the joint origin is the local origin of Arm1, so
joint_origin_local is taken as [0,0,0].
"""
estimates: List[AngleEstimate] = []
joint_origin_world_mm = _as_np3(joint_origin_world_mm)
for marker_id, (p_local, p_obs) in matched.items():
v_local = p_local # Arm1 local origin is at the joint
v_obs = p_obs - joint_origin_world_mm
if np.linalg.norm(v_local) < 1e-9 or np.linalg.norm(v_obs) < 1e-9:
continue
try:
angle = angle_between_vectors(v_local, v_obs, axis)
except ValueError:
continue
weight = float(np.linalg.norm(v_local))
estimates.append(
AngleEstimate(
source="single",
angle_rad=normalize_angle_rad(angle),
angle_deg=math.degrees(normalize_angle_rad(angle)),
weight=weight,
marker_ids=(marker_id,),
)
)
return estimates
def build_report(
arm_link_name: str,
joint_origin_mm: np.ndarray,
joint_axis: np.ndarray,
matched: Dict[int, Tuple[np.ndarray, np.ndarray]],
estimates: List[AngleEstimate],
) -> dict:
if not estimates:
raise ValueError("No valid angle estimates could be computed.")
mean_rad, circ_var, circ_std_rad = weighted_circular_mean(
[e.angle_rad for e in estimates],
[e.weight for e in estimates],
)
lin_std_rad = weighted_linear_std_rad(
[e.angle_rad for e in estimates],
mean_rad,
[e.weight for e in estimates],
)
# unwrap around mean for a stable arithmetic mean/std reporting
unwrapped_deg = []
for e in estimates:
d = math.degrees(normalize_angle_rad(e.angle_rad - mean_rad))
unwrapped_deg.append(d)
report = {
"link": arm_link_name,
"joint_origin_mm": joint_origin_mm.tolist(),
"joint_axis": joint_axis.tolist(),
"num_matched_markers": len(matched),
"num_estimates": len(estimates),
"mean_angle_rad": mean_rad,
"mean_angle_deg": math.degrees(mean_rad),
"circular_variance": circ_var,
"circular_std_rad": circ_std_rad,
"circular_std_deg": math.degrees(circ_std_rad),
"linear_std_rad": lin_std_rad,
"linear_std_deg": math.degrees(lin_std_rad),
"estimate_spread_deg_unwrapped": {
"min": float(np.min(unwrapped_deg)) if unwrapped_deg else None,
"max": float(np.max(unwrapped_deg)) if unwrapped_deg else None,
"mean_abs": float(np.mean(np.abs(unwrapped_deg))) if unwrapped_deg else None,
},
"estimates": [asdict(e) for e in estimates],
}
return report
def pretty_print_report(report: dict) -> None:
print("\nArm angle estimation report")
print("=" * 32)
print(f"Link : {report['link']}")
print(f"Matched markers : {report['num_matched_markers']}")
print(f"Valid estimates : {report['num_estimates']}")
print(f"Mean angle : {report['mean_angle_deg']:.3f} deg")
print(f"Circular variance : {report['circular_variance']:.6f}")
print(f"Circular std : {report['circular_std_deg']:.3f} deg")
print(f"Linear std (wrapped): {report['linear_std_deg']:.3f} deg")
print("\nIndividual estimates")
print("-" * 32)
for e in report["estimates"]:
mids = ",".join(str(x) for x in e["marker_ids"])
print(f"{e['source']:>6s} markers [{mids:<8s}] angle={e['angle_deg']:>8.3f} deg weight={e['weight']:.3f}")
def estimate_arm1_state(
robot_json: dict,
observations_json: dict,
arm_link_name: str = "Arm1",
joint_origin_world_mm: Optional[Sequence[float]] = None,
) -> dict:
joint_origin_mm, arm_markers, joint_axis = extract_arm_markers(robot_json, arm_link_name)
observations = load_observations(observations_json)
matched = match_markers(arm_markers, observations)
if not matched:
available = sorted(m.marker_id for m in arm_markers)
observed = sorted(o.marker_id for o in observations)
raise ValueError(
f"No Arm1 marker matches found. Arm markers: {available}. Observed IDs present in file: {observed[:25]}"
+ ("..." if len(observed) > 25 else "")
)
estimates = estimate_pairwise_angles(matched, axis=joint_axis)
# Optional single-marker estimates only if a world-space joint origin is supplied.
if joint_origin_world_mm is not None:
estimates.extend(estimate_single_marker_angles(matched, _as_np3(joint_origin_world_mm), axis=joint_axis))
report = build_report(
arm_link_name=arm_link_name,
joint_origin_mm=joint_origin_mm,
joint_axis=joint_axis,
matched=matched,
estimates=estimates,
)
return report
def main() -> int:
parser = argparse.ArgumentParser(description="Estimate Arm1 angle from ArUco marker positions.")
parser.add_argument("--robot-json", type=Path, default=Path("robot.json"), help="Path to robot config JSON.")
parser.add_argument("--aruco-json", type=Path, default=Path("aruco_positions_optimized.json"), help="Path to marker observation JSON.")
parser.add_argument("--arm-link", type=str, default="Arm1", help="Name of the link to estimate.")
parser.add_argument(
"--joint-origin-world",
type=float,
nargs=3,
default=None,
metavar=("X", "Y", "Z"),
help="Optional world-space joint origin for single-marker estimates.",
)
parser.add_argument("--output-json", type=Path, default=None, help="Optional path to write the result JSON.")
args = parser.parse_args()
robot_json = load_json(args.robot_json)
observations_json = load_json(args.aruco_json)
report = estimate_arm1_state(
robot_json=robot_json,
observations_json=observations_json,
arm_link_name=args.arm_link,
joint_origin_world_mm=args.joint_origin_world,
)
pretty_print_report(report)
if args.output_json:
args.output_json.parent.mkdir(parents=True, exist_ok=True)
with args.output_json.open("w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\nWrote JSON summary to: {args.output_json}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,207 +0,0 @@
#!/usr/bin/env python3
"""
4a_base_slider.py
-----------------
Estimate the Base linear-slider position (variable 'x', in mm) from
observed world-space marker positions produced by step 3.
Mathematical principle
----------------------
The Base slides along the world x-axis.
Every joint between Base and the observable arm markers rotates around
the x-axis (axis ≈ [-1,0,0] for Arm1, Ellbow, Arm2).
Rotation around the x-axis NEVER changes the x-coordinate of any point.
Therefore, for every marker on Arm1, Ellbow, or Arm2:
world_x = x_slider + link_frame_x_at_zero + local_x
where link_frame_x_at_zero is the x of the link-frame origin in world
when all joint variables are zero (constant, readable from robot.json).
Rearranging:
x_slider = world_x - link_frame_x_at_zero - local_x
This holds for ANY rotation angle y, z, a of the arm links.
This gives one independent estimate per observed marker.
We report their mean (and statistics) as the final estimate.
Usage
-----
python 4a_base_slider.py --robot robot.json --aruco aruco_positions_optimized.json
python 4a_base_slider.py --robot robot.json --aruco aruco_positions_optimized.json \\
--links Arm1 Ellbow Arm2 --output 4a_result.json
"""
from __future__ import annotations
import argparse
import json
import math
from pathlib import Path
from typing import Dict, List, Optional
import numpy as np
from robot_fk import RobotFK
# ──────────────────────────────────────────────────────────────
# I/O helpers
# ──────────────────────────────────────────────────────────────
def _load_json(path: Path) -> dict:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def _load_observed_mm(aruco_json: dict) -> Dict[int, np.ndarray]:
"""marker_id → world position in mm, from aruco_positions_optimized.json."""
result: Dict[int, np.ndarray] = {}
for m in aruco_json.get("markers", []):
mid = int(m.get("marker_id", m.get("id", -1)))
if mid < 0:
continue
if "position_mm" in m:
result[mid] = np.array(m["position_mm"], dtype=float)
elif "position_m" in m:
result[mid] = np.array(m["position_m"], dtype=float) * 1000.0
return result
# ──────────────────────────────────────────────────────────────
# Core estimator
# ──────────────────────────────────────────────────────────────
def estimate_base_slider(
fk: RobotFK,
observed_mm: Dict[int, np.ndarray],
use_links: Optional[List[str]] = None,
verbose: bool = True,
) -> dict:
"""
Parameters
----------
fk : RobotFK instance
observed_mm : dict marker_id -> world position [mm]
use_links : links whose markers are used (default: Arm1, Ellbow, Arm2)
All must be connected to Base via only x-axis rotations.
verbose : print breakdown
Returns
-------
dict with keys:
status, x_mean_mm, x_median_mm, x_std_mm,
num_markers, per_marker
"""
if use_links is None:
use_links = ["Arm1", "Ellbow", "Arm2"]
# Pre-compute link_frame_x at zero-state for each target link
link_x0: Dict[str, float] = {}
for lname in use_links:
if lname in fk.links:
link_x0[lname] = fk.link_x_at_zero_state(lname)
estimates: List[float] = []
per_marker: List[dict] = []
for lname in use_links:
if lname not in fk.links or lname not in link_x0:
continue
x0 = link_x0[lname]
for m in fk.links[lname].get("markers", []):
mid = int(m.get("id", -1))
if mid < 0 or "position" not in m:
continue
if mid not in observed_mm:
continue # marker not seen in this pose
local_x = float(m["position"][0])
world_x = float(observed_mm[mid][0])
x_est = world_x - x0 - local_x
estimates.append(x_est)
per_marker.append({
"marker_id": mid,
"link": lname,
"local_x_mm": local_x,
"link_x0_mm": x0,
"world_x_mm": world_x,
"x_estimate_mm": x_est,
})
if not estimates:
msg = (f"No observed markers found on links {use_links}. "
"Check that aruco_positions_optimized.json contains "
"markers from those links.")
if verbose:
print(f"[4a FAILED] {msg}")
return {"status": "failed", "reason": msg, "use_links": use_links}
arr = np.array(estimates)
x_mean = float(np.mean(arr))
x_median = float(np.median(arr))
x_std = float(np.std(arr, ddof=0))
if verbose:
print("\n── 4a: Base slider (x) ──────────────────────────────────")
print(f" Links used: {use_links}")
print(f" Markers used: {len(estimates)}")
print(f" x = {x_mean:+.2f} mm (median {x_median:+.2f}, σ {x_std:.2f})")
if x_std > 5.0:
print(f" [WARN] high spread ({x_std:.1f} mm) step-3 errors or wrong link list")
print(f"\n Per-marker:")
print(f" {'ID':>5} {'Link':8} {'local_x':>8} {'world_x':>8} {'x_est':>8}")
print(f" " + "-"*46)
for pm in per_marker:
print(f" {pm['marker_id']:>5} {pm['link']:8} "
f"{pm['local_x_mm']:>8.2f} {pm['world_x_mm']:>8.2f} {pm['x_estimate_mm']:>8.2f}")
return {
"status": "ok",
"joint": "x",
"description": "Base linear slider",
"x_mean_mm": x_mean,
"x_median_mm": x_median,
"x_std_mm": x_std,
"num_markers": len(estimates),
"use_links": use_links,
"per_marker": per_marker,
}
# ──────────────────────────────────────────────────────────────
# CLI
# ──────────────────────────────────────────────────────────────
def main() -> int:
p = argparse.ArgumentParser(description="4a: estimate Base slider (x)")
p.add_argument("--robot", type=Path, default=Path("robot.json"))
p.add_argument("--aruco", type=Path, default=Path("aruco_positions_optimized.json"))
p.add_argument("--links", nargs="+", default=["Arm1", "Ellbow", "Arm2"],
dest="links",
help="Links whose markers to use (must be x-axis-rotation chain)")
p.add_argument("--output", type=Path, default=None,
help="Optional JSON output path (e.g. 4a_result.json)")
args = p.parse_args()
fk = RobotFK.from_file(args.robot)
aruco_json = _load_json(args.aruco)
observed_mm = _load_observed_mm(aruco_json)
result = estimate_base_slider(fk, observed_mm, use_links=args.links)
if args.output and result["status"] == "ok":
args.output.parent.mkdir(parents=True, exist_ok=True)
with args.output.open("w", encoding="utf-8") as f:
json.dump(result, f, indent=2)
print(f"\n Saved → {args.output}")
return 0 if result["status"] == "ok" else 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,464 +0,0 @@
#!/usr/bin/env python3
"""
4d_direct_hand_pose.py
----------------------
Estimate Hand/Palm/Finger joint angles (b, c, e) directly from ArUco
marker observations — WITHOUT relying on step-3 triangulated positions.
Why this is needed
------------------
Step 3 triangulates marker centres from multiple cameras. For markers
far above the board (arm extended in Z), the depth estimation from
triangulation degrades: all reference Board markers lie in a flat plane,
so the cameras are poorly conditioned for Z estimation of out-of-plane points.
This script takes a completely different route:
1. For each camera that sees a finger or hand marker, run cv2.solvePnP
on the four detected corners. solvePnP uses the apparent size /
aspect ratio of the marker to recover depth — it is NOT affected
by the flat-board reference plane problem.
2. Use the known camera-in-world pose (from step 2) to convert the
per-camera marker pose to world frame.
3. Fuse all world-frame marker poses with quality weighting.
4. Given world-frame positions for all visible finger markers, and the
known local positions in the FingerA/FingerB frames, solve a small
nonlinear LS problem for (b, c, e) using scipy.
What you need
-------------
- robot.json
- Step-2 camera-pose JSONs for all cameras
- Step-1 aruco-detection JSONs for all cameras
- Already-estimated x, y, z, a from 4a/4b (pass via --state-json or CLI)
Usage
-----
python 4d_direct_hand_pose.py \\
--robot robot.json \\
--det cam1_aruco_detection.json cam2_aruco_detection.json ... \\
--pose cam1_camera_pose.json cam2_camera_pose.json ... \\
--state 4b_arm2_result.json \\
--output 4d_result.json
Output
------
{
"b_deg": ..., "c_deg": ..., "e_mm": ...,
"rms_mm": ...,
"accumulated_state": {"x":..., "y":..., "z":..., "a":..., "b":..., "c":..., "e":...}
}
"""
from __future__ import annotations
import argparse
import json
import math
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
import cv2
import numpy as np
try:
from scipy.optimize import least_squares
_HAS_SCIPY = True
except ImportError:
_HAS_SCIPY = False
from robot_fk import RobotFK, STATE_KEYS
# ──────────────────────────────────────────────────────────────
# I/O
# ──────────────────────────────────────────────────────────────
def _load(path: Path) -> dict:
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def _load_state(path: Path) -> Dict[str, float]:
raw = _load(path)
state = raw.get("accumulated_state", raw.get("state", {}))
return {k: float(v) for k, v in state.items() if k in STATE_KEYS}
# ──────────────────────────────────────────────────────────────
# ArUco / PnP helpers
# ──────────────────────────────────────────────────────────────
def _marker_local_corners(size_m: float) -> np.ndarray:
"""Four local corners of a marker (same order as OpenCV ArUco)."""
h = size_m / 2.0
return np.array([[-h, h, 0], [h, h, 0], [h, -h, 0], [-h, -h, 0]], dtype=np.float32)
def _camera_pose_from_json(pose_json: dict) -> Tuple[np.ndarray, np.ndarray]:
"""
Load world-to-camera rotation and translation from step-2 JSON.
Returns rvec (3,), tvec (3,) — the same convention used in step 2.
"""
wtc = pose_json["camera_pose"]["world_to_camera"]
R = np.array(wtc["rotation_matrix"], dtype=np.float32).reshape(3, 3)
t = np.array(wtc["translation_m"], dtype=np.float32).reshape(3)
rvec, _ = cv2.Rodrigues(R)
return rvec.reshape(3), t
def _intrinsics_from_detection(det: dict) -> Tuple[np.ndarray, np.ndarray]:
cam = det["camera"]
K = np.array(cam["camera_matrix"], dtype=np.float32).reshape(3, 3)
D = np.array(cam["distortion_coefficients"], dtype=np.float32).reshape(-1, 1)
return K, D
def _solve_marker_pose_in_camera(
corners_px: np.ndarray,
marker_size_m: float,
K: np.ndarray,
D: np.ndarray
) -> Optional[Tuple[np.ndarray, np.ndarray]]:
"""
Estimate marker-in-camera pose via solvePnP (IPPE_SQUARE).
Returns (rvec_cm, tvec_cm) or None on failure.
"""
obj = _marker_local_corners(marker_size_m)
img = corners_px.astype(np.float32)
ok, rvec, tvec = cv2.solvePnP(obj, img, K, D, flags=cv2.SOLVEPNP_IPPE_SQUARE)
if not ok:
ok, rvec, tvec = cv2.solvePnP(obj, img, K, D, flags=cv2.SOLVEPNP_ITERATIVE)
if not ok:
return None
return rvec.reshape(3), tvec.reshape(3)
def _marker_world_pose(
rvec_cm: np.ndarray,
tvec_cm: np.ndarray,
rvec_wc: np.ndarray,
tvec_wc: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""
Convert marker-in-camera pose to marker-in-world pose.
Convention (same as step 2):
X_cam = R_wc @ X_world + t_wc
So:
R_wc = rvec→R(rvec_wc), camera origin in world = -R_wc^T @ t_wc
Marker pose in world:
R_mw = R_wc^T @ R_cm (world_to_marker rotation)
t_mw = R_wc^T @ (tvec_cm - tvec_wc)
→ marker centre in world = -R_mw^T @ t_mw = R_mw^T @ (-t_mw)
We return (pos_world_m, R_marker_in_world) where
R_marker_in_world maps local marker axes to world axes.
"""
R_wc, _ = cv2.Rodrigues(rvec_wc.reshape(3, 1))
R_cm, _ = cv2.Rodrigues(rvec_cm.reshape(3, 1))
R_cw = R_wc.T # camera-to-world rotation
# Marker centre in world frame
pos_w = R_cw @ tvec_cm.reshape(3) - R_cw @ tvec_wc.reshape(3)
# Marker orientation in world frame (columns = marker x,y,z axes in world)
R_mw = R_cw @ R_cm
return pos_w.astype(np.float64), R_mw.astype(np.float64)
# ──────────────────────────────────────────────────────────────
# Multi-camera marker pose fusion
# ──────────────────────────────────────────────────────────────
def _observation_weight(corners_px: np.ndarray, img_w: int, img_h: int) -> float:
"""
Simple quality weight: larger markers, more central = higher weight.
"""
area = float(cv2.contourArea(corners_px.astype(np.float32)))
cx, cy = corners_px.mean(axis=0)
dx = abs(cx - img_w / 2.0) / (img_w / 2.0)
dy = abs(cy - img_h / 2.0) / (img_h / 2.0)
centrality = 1.0 - 0.5 * (dx + dy)
return max(0.05, (area / 500.0) * centrality)
def collect_marker_world_poses(
detection_files: List[Path],
pose_files: List[Path],
target_marker_ids: set,
marker_size_m: float,
verbose: bool = True,
) -> Dict[int, List[Dict[str, Any]]]:
"""
For each target marker, collect world-frame pose estimates from all cameras.
Returns: marker_id → list of {pos_m, R, weight, camera_id}
"""
result: Dict[int, List[Dict[str, Any]]] = {}
for det_path, pose_path in zip(detection_files, pose_files):
det = _load(det_path)
pose_json = _load(pose_path)
K, D = _intrinsics_from_detection(det)
rvec_wc, tvec_wc = _camera_pose_from_json(pose_json)
cam_id = det.get("camera", {}).get("camera_id", str(det_path))
img_w = int(det.get("image", {}).get("width_px", 1280))
img_h = int(det.get("image", {}).get("height_px", 720))
for d in det.get("detections", []):
mid = int(d.get("marker_id", -1))
if mid not in target_marker_ids:
continue
corners = np.array(d.get("image_points_px", []), dtype=np.float32).reshape(4, 2)
if corners.shape != (4, 2):
continue
pnp = _solve_marker_pose_in_camera(corners, marker_size_m, K, D)
if pnp is None:
continue
rvec_cm, tvec_cm = pnp
pos_w, R_w = _marker_world_pose(rvec_cm, tvec_cm, rvec_wc, tvec_wc)
w = _observation_weight(corners, img_w, img_h)
result.setdefault(mid, []).append({
"pos_m": pos_w,
"R": R_w,
"weight": w,
"camera_id": cam_id,
})
if verbose:
for mid, obs in result.items():
print(f" M{mid}: {len(obs)} camera(s) — weights "
f"{[f'{o[\"weight\"]:.2f}' for o in obs]}")
return result
def fuse_marker_positions(
observations: Dict[int, List[Dict[str, Any]]]
) -> Dict[int, np.ndarray]:
"""
Weighted-mean fusion of world-frame marker centre positions (metres).
"""
result: Dict[int, np.ndarray] = {}
for mid, obs_list in observations.items():
total_w = sum(o["weight"] for o in obs_list)
if total_w < 1e-12:
continue
pos = sum(o["pos_m"] * o["weight"] for o in obs_list) / total_w
result[mid] = pos.astype(np.float64)
return result
# ──────────────────────────────────────────────────────────────
# FK-based optimizer for b, c, e
# ──────────────────────────────────────────────────────────────
def _fk_finger_positions(
fk: RobotFK,
base_state: Dict[str, float],
b: float,
c: float,
e: float
) -> Dict[int, np.ndarray]:
"""
Compute world positions (m) of all FingerA and FingerB markers.
base_state already contains x, y, z, a.
"""
state = dict(base_state)
state.update({"b": b, "c": c, "e": e})
transforms = fk.compute(state)
result = {}
for link_name in ("FingerA", "FingerB", "Hand", "Palm"):
if link_name not in fk.links:
continue
for m in fk.links[link_name].get("markers", []):
mid = int(m.get("id", -1))
if mid < 0 or "position" not in m:
continue
# position in METRES
local_mm = np.array(m["position"], dtype=float)
world_m = fk.marker_world(transforms, link_name, local_mm) / 1000.0
result[mid] = world_m
return result
def optimize_bce(
fk: RobotFK,
base_state: Dict[str, float],
observed_m: Dict[int, np.ndarray],
b_init: float = 0.0,
c_init: float = 0.0,
e_init: float = 0.0,
verbose: bool = True,
) -> dict:
"""
Find (b, c, e) minimising reprojection of FK finger-marker positions
to observed world positions.
Parameters
----------
fk : RobotFK
base_state : already-estimated {x, y, z, a} (all in mm/deg)
observed_m : {marker_id: world_position_m}
"""
if not _HAS_SCIPY:
return {"status": "failed", "reason": "scipy not available (pip install scipy)"}
def residuals(p):
b, c, e = float(p[0]), float(p[1]), float(p[2])
fk_pos = _fk_finger_positions(fk, base_state, b, c, e)
r = []
for mid, obs_p in observed_m.items():
if mid in fk_pos:
diff = fk_pos[mid] - obs_p
r.extend(diff.tolist())
return r
# Initial residual
r0 = residuals([b_init, c_init, e_init])
if len(r0) < 3:
return {
"status": "failed",
"reason": f"Only {len(r0)//3} finger markers matched. Need ≥1.",
}
res = least_squares(
residuals,
x0=[b_init, c_init, e_init],
method="lm",
max_nfev=2000,
)
b_est, c_est, e_est = float(res.x[0]), float(res.x[1]), float(res.x[2])
rms_m = float(np.sqrt(np.mean(np.array(res.fun) ** 2)))
rms_mm = rms_m * 1000.0
accum = dict(base_state)
accum.update({"b": b_est, "c": c_est, "e": e_est})
if verbose:
print(f"\n── 4d: Hand/Palm/Finger (b, c, e) ──────────────────────")
print(f" Markers used: {len(observed_m)} fused positions")
print(f" b = {b_est:+.2f} ° c = {c_est:+.2f} ° e = {e_est:+.2f} mm")
print(f" RMS position error: {rms_mm:.2f} mm")
if not res.success:
print(f" [WARN] Optimizer: {res.message}")
return {
"status": "ok",
"b_deg": b_est,
"c_deg": c_est,
"e_mm": e_est,
"rms_mm": rms_mm,
"optimizer_success": bool(res.success),
"num_markers": len(observed_m),
"accumulated_state": accum,
}
# ──────────────────────────────────────────────────────────────
# Main
# ──────────────────────────────────────────────────────────────
def main() -> int:
p = argparse.ArgumentParser(
description="4d: Direct hand/finger pose from per-camera solvePnP")
p.add_argument("--robot", type=Path, required=True)
p.add_argument("--det", "-d", type=Path, nargs="+", required=True,
help="*_aruco_detection.json files (one per camera)")
p.add_argument("--pose", "-p", type=Path, nargs="+", required=True,
help="*_camera_pose.json files (same order as --det)")
p.add_argument("--state", type=Path, default=None,
help="JSON from previous 4b step (accumulated_state)")
for k in ("x", "y", "z", "a"):
p.add_argument(f"--{k}", type=float, default=None,
help=f"Known {k} value (overrides --state)")
p.add_argument("--b-init", type=float, default=0.0)
p.add_argument("--c-init", type=float, default=0.0)
p.add_argument("--e-init", type=float, default=0.0)
p.add_argument("--output", type=Path, default=None)
args = p.parse_args()
if len(args.det) != len(args.pose):
print("[ERROR] --det and --pose must have the same number of files")
return 1
# Base state from file or CLI
base_state: Dict[str, float] = {}
if args.state:
base_state = _load_state(args.state)
print(f" Loaded state: {base_state}")
for k in ("x", "y", "z", "a"):
v = getattr(args, k, None)
if v is not None:
base_state[k] = float(v)
# Load FK
fk = RobotFK.from_file(args.robot)
# Which markers belong to Hand/Palm/FingerA/FingerB?
target_ids: set = set()
for link_name in ("Hand", "Palm", "FingerA", "FingerB"):
for m in fk.links.get(link_name, {}).get("markers", []):
mid = int(m.get("id", -1))
if mid >= 0:
target_ids.add(mid)
marker_size_m = float(
fk.robot.get("vision_config", {}).get("MarkerSize", 0.025)
)
print(f"\n Target marker IDs: {sorted(target_ids)}")
print(f" Marker size: {marker_size_m*1000:.1f} mm")
print(f" Cameras: {len(args.det)}")
# Collect world-frame poses from all cameras via solvePnP
print("\n Per-camera solvePnP:")
world_pose_obs = collect_marker_world_poses(
args.det, args.pose, target_ids, marker_size_m, verbose=True
)
if not world_pose_obs:
print("[ERROR] No target markers found in any detection file.")
return 1
# Fuse positions
observed_m = fuse_marker_positions(world_pose_obs)
print(f"\n Fused {len(observed_m)} marker world positions")
# Optimise b, c, e
result = optimize_bce(
fk = fk,
base_state = base_state,
observed_m = observed_m,
b_init = args.b_init,
c_init = args.c_init,
e_init = args.e_init,
verbose = True,
)
if args.output and result["status"] == "ok":
args.output.parent.mkdir(parents=True, exist_ok=True)
with args.output.open("w", encoding="utf-8") as f:
json.dump(result, f, indent=2)
print(f"\n Saved → {args.output}")
return 0 if result["status"] == "ok" else 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
"""
5_v8_camera_z_refine.py
5_camera_z_refine.py
-----------------------
Refines camera z-positions using the FK-predicted z of the Ellbow link.
@@ -10,7 +10,7 @@ Camera poses from step 2 are estimated from board markers alone. Board
markers are roughly coplanar (z 0), so the camera's z-height above the
board is poorly constrained and can be 2040 % off.
Once we have the elbow angle (step 3b, from 4_v8_4b_revolute_angle.py),
Once we have the elbow angle (step 3b, from 4b_revolute_angle.py),
FK predicts where the Ellbow markers *should* be in world space. The
difference between FK-predicted z and triangulated z gives a global
z-offset for the cameras.
@@ -30,7 +30,7 @@ Algorithm
Usage
-----
python 5_v8_camera_z_refine.py \\
python 5_camera_z_refine.py \\
--angle path/to/v8_ellbow_angle.json \\
--robot path/to/robot.json \\
--aruco path/to/aruco_positions_optimized.json \\
@@ -96,7 +96,7 @@ def main() -> None:
description="5v8: refine camera z-positions using FK-predicted Ellbow marker z"
)
parser.add_argument("--angle", required=True,
help="v8_ellbow_angle.json from 4_v8_4b_revolute_angle.py")
help="v8_ellbow_angle.json from 4b_revolute_angle.py")
parser.add_argument("--robot", required=True, help="robot.json")
parser.add_argument("--aruco", required=True,
help="aruco_positions_optimized.json (first-pass triangulation)")