1080 lines
41 KiB
Python
1080 lines
41 KiB
Python
#!/usr/bin/env python3
|
|
|
|
"""
|
|
============================================================
|
|
STEP 2b — Simultane Multiview-Optimierung für Roboterpose
|
|
============================================================
|
|
|
|
Ziel:
|
|
Aus mehreren ArUco-Detektionsdateien die gemeinsame
|
|
Roboterpose (x,y,z,a,b,c,e) schätzen und jede Kamera-Pose
|
|
sowie Marker-Weltpositionen ausgeben.
|
|
|
|
Eingabe:
|
|
--robot ../robot.json
|
|
--detections render_1a_aruco_detection.json render_1b_aruco_detection.json ...
|
|
--outDir .
|
|
|
|
Ausgabe:
|
|
multiview_pose.json
|
|
|
|
Hinweis:
|
|
Dieses Skript verwendet die Markerpositionen aus robot.json
|
|
als kinematische Constraints und optimiert gleichzeitig:
|
|
- Roboterzustand (x,y,z,a,b,c,e)
|
|
- Kameraextrinsische Parameter pro Bild
|
|
|
|
"""
|
|
|
|
import argparse
|
|
import datetime
|
|
import json
|
|
import math
|
|
import os
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Tuple
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from scipy.optimize import least_squares
|
|
|
|
STATE_KEYS = ["x", "y", "z", "a", "b", "c", "e"]
|
|
|
|
# ------------------------------------------------------------------
|
|
# Constraint definitions and validation
|
|
# ------------------------------------------------------------------
|
|
|
|
class ConstraintResult:
|
|
"""Result of validating/applying a single constraint"""
|
|
def __init__(self, name: str, enabled: bool, reason: str = ""):
|
|
self.name = name
|
|
self.enabled = enabled
|
|
self.reason = reason
|
|
self.residuals = []
|
|
|
|
def __str__(self) -> str:
|
|
status = "✓ ENABLED" if self.enabled else "✗ DISABLED"
|
|
return f"{self.name:40s} {status:12s} {self.reason}"
|
|
|
|
|
|
def validate_constraints(robot: Dict[str, Any], robot_markers: Dict[int, Dict[str, Any]]) -> Dict[str, ConstraintResult]:
|
|
"""
|
|
Validate which constraints can be applied based on robot geometry.
|
|
Returns a dict of constraint_name -> ConstraintResult
|
|
"""
|
|
results = {}
|
|
|
|
# --- Constraint 1: Rigid body distances within each link ---
|
|
rigid_body_result = ConstraintResult("RigidBodyDistances", False)
|
|
try:
|
|
rigid_body_count = 0
|
|
for link_name in ['Arm1', 'Ellbow', 'Arm2']:
|
|
link_markers = [m for m in robot_markers.values() if m['link_name'] == link_name]
|
|
if len(link_markers) >= 2:
|
|
rigid_body_count += 1
|
|
if rigid_body_count >= 2:
|
|
rigid_body_result.enabled = True
|
|
rigid_body_result.reason = f"Found {rigid_body_count} links with 2+ markers each"
|
|
else:
|
|
rigid_body_result.reason = "Not enough rigid links with multiple markers"
|
|
except Exception as e:
|
|
rigid_body_result.reason = f"Error: {str(e)}"
|
|
results['RigidBodyDistances'] = rigid_body_result
|
|
|
|
# --- Constraint 2: Fixed X-distances between links (rotation around X-axis) ---
|
|
inter_link_x_result = ConstraintResult("InterLinkXDistances", False)
|
|
try:
|
|
links_with_markers = set(m['link_name'] for m in robot_markers.values())
|
|
x_rotated_links = []
|
|
for link_name in ['Arm1', 'Ellbow']:
|
|
if link_name in links_with_markers:
|
|
link_markers = [m for m in robot_markers.values() if m['link_name'] == link_name]
|
|
if len(link_markers) >= 1:
|
|
x_rotated_links.append(link_name)
|
|
if len(x_rotated_links) >= 2:
|
|
inter_link_x_result.enabled = True
|
|
inter_link_x_result.reason = f"Found {len(x_rotated_links)} X-rotation links: {', '.join(x_rotated_links)}"
|
|
else:
|
|
inter_link_x_result.reason = "Not enough X-rotation links"
|
|
except Exception as e:
|
|
inter_link_x_result.reason = f"Error: {str(e)}"
|
|
results['InterLinkXDistances'] = inter_link_x_result
|
|
|
|
# --- Sanity check (not a hard constraint): Arm2 sin(a) dependency ---
|
|
arm2_sina_result = ConstraintResult("Arm2SinADependency", True, "Sanity check only (not enforced)")
|
|
try:
|
|
arm2_markers = [m for m in robot_markers.values() if m['link_name'] == 'Arm2']
|
|
if len(arm2_markers) >= 2:
|
|
z_values = set(m['position_m'][2] for m in arm2_markers)
|
|
if len(z_values) > 1:
|
|
arm2_sina_result.enabled = True
|
|
arm2_sina_result.reason = "Multiple Z-values detected; sin(a) dependency confirmed"
|
|
else:
|
|
arm2_sina_result.enabled = False
|
|
arm2_sina_result.reason = "No Z-variation in Arm2 markers (cannot use sin(a) constraint)"
|
|
else:
|
|
arm2_sina_result.enabled = False
|
|
arm2_sina_result.reason = "Not enough Arm2 markers"
|
|
except Exception as e:
|
|
arm2_sina_result.reason = f"Error: {str(e)}"
|
|
results['Arm2SinADependency'] = arm2_sina_result
|
|
|
|
return results
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# JSON helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def load_json(path: str) -> Dict[str, Any]:
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_json(data: Dict[str, Any], path: Path) -> None:
|
|
with open(path, 'w', encoding='utf-8') as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# robot.json helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def resolve_scalar(value: Any, default: float = 0.0) -> float:
|
|
if value is None:
|
|
return default
|
|
if isinstance(value, (int, float)):
|
|
return float(value)
|
|
try:
|
|
return float(str(value).strip())
|
|
except ValueError:
|
|
return default
|
|
|
|
|
|
def resolve_vector(value: Any, default_len: int = 3) -> Tuple[float, ...]:
|
|
if value is None:
|
|
return tuple(0.0 for _ in range(default_len))
|
|
if isinstance(value, (int, float, str)):
|
|
return (resolve_scalar(value),) + tuple(0.0 for _ in range(default_len - 1))
|
|
if isinstance(value, (list, tuple)):
|
|
resolved = [resolve_scalar(v) for v in value]
|
|
if len(resolved) < default_len:
|
|
resolved.extend([0.0] * (default_len - len(resolved)))
|
|
return tuple(resolved[:default_len])
|
|
return tuple(0.0 for _ in range(default_len))
|
|
|
|
|
|
def parse_metric_scale(robot: Dict[str, Any]) -> float:
|
|
rendering_info = robot.get('renderingInfo', {}) or {}
|
|
metric = rendering_info.get('metric', 'mm')
|
|
return 0.001 if str(metric).strip().lower() == 'mm' else 1.0
|
|
|
|
|
|
def normalize_axis(axis: Any) -> np.ndarray:
|
|
vec = np.asarray(axis, dtype=np.float64)
|
|
if vec.shape != (3,):
|
|
vec = vec.reshape(-1)[:3]
|
|
norm = np.linalg.norm(vec)
|
|
return vec / max(norm, 1e-9)
|
|
|
|
|
|
def euler_deg_to_matrix(euler_deg: Any) -> np.ndarray:
|
|
x_deg, y_deg, z_deg = resolve_vector(euler_deg, 3)
|
|
x = math.radians(x_deg)
|
|
y = math.radians(y_deg)
|
|
z = math.radians(z_deg)
|
|
|
|
cx = math.cos(x)
|
|
sx = math.sin(x)
|
|
cy = math.cos(y)
|
|
sy = math.sin(y)
|
|
cz = math.cos(z)
|
|
sz = math.sin(z)
|
|
|
|
Rx = np.array([
|
|
[1.0, 0.0, 0.0],
|
|
[0.0, cx, -sx],
|
|
[0.0, sx, cx]
|
|
], dtype=np.float64)
|
|
|
|
Ry = np.array([
|
|
[cy, 0.0, sy],
|
|
[0.0, 1.0, 0.0],
|
|
[-sy, 0.0, cy]
|
|
], dtype=np.float64)
|
|
|
|
Rz = np.array([
|
|
[cz, -sz, 0.0],
|
|
[sz, cz, 0.0],
|
|
[0.0, 0.0, 1.0]
|
|
], dtype=np.float64)
|
|
|
|
return Rz @ Ry @ Rx
|
|
|
|
|
|
def transform_from_translation_rotation(translation: Any, rotation_deg: Any) -> np.ndarray:
|
|
T = np.eye(4, dtype=np.float64)
|
|
pos = np.asarray(resolve_vector(translation, 3), dtype=np.float64)
|
|
T[:3, 3] = pos
|
|
T[:3, :3] = euler_deg_to_matrix(rotation_deg)
|
|
return T
|
|
|
|
|
|
def axis_angle_matrix(axis: Any, angle_deg: float) -> np.ndarray:
|
|
axis_vec = normalize_axis(axis)
|
|
theta = math.radians(angle_deg)
|
|
kx, ky, kz = axis_vec
|
|
c = math.cos(theta)
|
|
s = math.sin(theta)
|
|
v = 1.0 - c
|
|
R = np.array([
|
|
[kx * kx * v + c, kx * ky * v - kz * s, kx * kz * v + ky * s],
|
|
[ky * kx * v + kz * s, ky * ky * v + c, ky * kz * v - kx * s],
|
|
[kz * kx * v - ky * s, kz * ky * v + kx * s, kz * kz * v + c]
|
|
], dtype=np.float64)
|
|
T = np.eye(4, dtype=np.float64)
|
|
T[:3, :3] = R
|
|
return T
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Kinematics and marker extraction
|
|
# ------------------------------------------------------------------
|
|
|
|
def extract_markers(robot: Dict[str, Any], scale: float) -> Dict[int, Dict[str, Any]]:
|
|
markers = {}
|
|
links = robot.get('links', {}) or {}
|
|
marker_defaults = (robot.get('renderingInfo', {}) or {}).get('markerDefaults', {}) or {}
|
|
default_size_mm = float(marker_defaults.get('size', 25.0))
|
|
|
|
for link_name, link_info in links.items():
|
|
for marker in link_info.get('markers', []) or []:
|
|
marker_id = int(marker.get('id', -1))
|
|
if marker_id < 0:
|
|
continue
|
|
|
|
pos = resolve_vector(marker.get('position', [0, 0, 0]), 3)
|
|
size_mm = float(marker.get('size', default_size_mm))
|
|
markers[marker_id] = {
|
|
'marker_id': marker_id,
|
|
'link_name': link_name,
|
|
'position_m': np.asarray([pos[0] * scale, pos[1] * scale, pos[2] * scale], dtype=np.float64),
|
|
'normal': normalize_axis(resolve_vector(marker.get('normal', [0, 0, 1]), 3)),
|
|
'spin_deg': float(marker.get('spin', 0.0)),
|
|
'size_m': size_mm * scale,
|
|
}
|
|
|
|
return markers
|
|
|
|
|
|
def compute_marker_observation_quality(
|
|
corners_px: np.ndarray,
|
|
image_shape: Tuple[int, int]
|
|
) -> Dict[str, float]:
|
|
"""
|
|
Compute quality metrics for a marker observation based on:
|
|
- Orientation/distortion in the image
|
|
- Size (distance) to camera
|
|
|
|
Returns dict with:
|
|
- 'orientation_quality': 0..1 (1 = frontal, 0 = very skewed)
|
|
- 'size_quality': 0..1 (1 = reasonable size, 0 = too small or too large)
|
|
- 'combined_quality': product of both
|
|
"""
|
|
if corners_px.shape[0] != 4:
|
|
return {
|
|
'orientation_quality': 0.5,
|
|
'size_quality': 0.5,
|
|
'combined_quality': 0.25
|
|
}
|
|
|
|
# Compute edge lengths
|
|
corners = corners_px.astype(np.float64)
|
|
edges = [
|
|
np.linalg.norm(corners[(i+1) % 4] - corners[i])
|
|
for i in range(4)
|
|
]
|
|
|
|
# Orientation quality: low aspect ratio means frontal (1.0 = square)
|
|
aspect_ratio = max(edges) / (min(edges) + 1e-6)
|
|
# For aspect ratio 1.0->1, 1.5->0.8, 3.0->0.4, 5.0->0.2
|
|
orientation_quality = max(0.1, 2.0 / (1.0 + aspect_ratio))
|
|
|
|
# Size quality: area-based
|
|
area_px = cv2.contourArea(corners.astype(np.int32))
|
|
image_area = image_shape[0] * image_shape[1]
|
|
|
|
# Ideal: marker should be 5-20% of image area
|
|
# Too small: <1%, too large: >40%
|
|
area_ratio = area_px / image_area
|
|
|
|
if area_ratio < 0.01:
|
|
size_quality = 0.3 + area_ratio * 20 # linear growth 0-0.3
|
|
elif area_ratio > 0.4:
|
|
size_quality = 0.3 + (1.0 - area_ratio) * 2.33 # linear decay from 1.0
|
|
else:
|
|
# 1% to 40%: bell curve, max at ~5-20%
|
|
ideal_ratio = 0.10
|
|
size_quality = 1.0 - (area_ratio - ideal_ratio) ** 2 / (0.15 ** 2)
|
|
size_quality = max(0.3, min(1.0, size_quality))
|
|
|
|
combined_quality = orientation_quality * size_quality
|
|
|
|
return {
|
|
'orientation_quality': float(orientation_quality),
|
|
'size_quality': float(size_quality),
|
|
'combined_quality': float(combined_quality)
|
|
}
|
|
|
|
|
|
def marker_plane_axes(normal: np.ndarray, spin_deg: float) -> Tuple[np.ndarray, np.ndarray]:
|
|
n = normalize_axis(normal)
|
|
candidate = np.array((0.0, 0.0, 1.0), dtype=np.float64)
|
|
if abs(np.dot(n, candidate)) > 0.99:
|
|
candidate = np.array((1.0, 0.0, 0.0), dtype=np.float64)
|
|
|
|
x_dir = np.cross(candidate, n)
|
|
x_dir /= max(np.linalg.norm(x_dir), 1e-9)
|
|
y_dir = np.cross(n, x_dir)
|
|
|
|
if abs(spin_deg) > 1e-6:
|
|
theta = math.radians(spin_deg)
|
|
cos_t = math.cos(theta)
|
|
sin_t = math.sin(theta)
|
|
x_rot = x_dir * cos_t + np.cross(n, x_dir) * sin_t + n * np.dot(n, x_dir) * (1.0 - cos_t)
|
|
y_rot = y_dir * cos_t + np.cross(n, y_dir) * sin_t + n * np.dot(n, y_dir) * (1.0 - cos_t)
|
|
return x_rot, y_rot
|
|
|
|
return x_dir, y_dir
|
|
|
|
|
|
def marker_object_corners(marker: Dict[str, Any]) -> np.ndarray:
|
|
half = marker['size_m'] * 0.5
|
|
x_dir, y_dir = marker_plane_axes(marker['normal'], marker['spin_deg'])
|
|
corners = np.stack([
|
|
-x_dir * half + y_dir * half,
|
|
x_dir * half + y_dir * half,
|
|
x_dir * half - y_dir * half,
|
|
-x_dir * half - y_dir * half
|
|
], axis=0)
|
|
return marker['position_m'].reshape(1, 3) + corners
|
|
|
|
|
|
def build_link_chain(robot: Dict[str, Any]) -> List[str]:
|
|
links = robot.get('links', {}) or {}
|
|
ordered: List[str] = []
|
|
remaining = set(links.keys())
|
|
|
|
while remaining:
|
|
progress = False
|
|
for name in list(remaining):
|
|
parent = links[name].get('parent')
|
|
if not parent or parent in ordered:
|
|
ordered.append(name)
|
|
remaining.remove(name)
|
|
progress = True
|
|
if not progress:
|
|
raise RuntimeError('Cycle detected in robot link tree or missing parent link')
|
|
return ordered
|
|
|
|
|
|
def compute_link_transforms(robot: Dict[str, Any], state: Dict[str, float], scale: float) -> Dict[str, np.ndarray]:
|
|
links = robot.get('links', {}) or {}
|
|
ordered_links = build_link_chain(robot)
|
|
transforms: Dict[str, np.ndarray] = {}
|
|
|
|
for link_name in ordered_links:
|
|
link_info = links[link_name] or {}
|
|
parent_name = link_info.get('parent')
|
|
parent_transform = transforms[parent_name] if parent_name else np.eye(4, dtype=np.float64)
|
|
|
|
mount_translation = np.asarray(resolve_vector(link_info.get('mountPosition', [0, 0, 0]), 3), dtype=np.float64) * scale
|
|
mount = transform_from_translation_rotation(
|
|
mount_translation,
|
|
link_info.get('mountRotation', [0, 0, 0])
|
|
)
|
|
|
|
joint_info = link_info.get('jointToParent', {}) or {}
|
|
joint_origin = np.asarray(resolve_vector(joint_info.get('origin', [0, 0, 0]), 3), dtype=np.float64) * scale
|
|
joint = transform_from_translation_rotation(
|
|
joint_origin,
|
|
joint_info.get('rotation', [0, 0, 0])
|
|
)
|
|
|
|
motion = np.eye(4, dtype=np.float64)
|
|
joint_type = str(joint_info.get('type', 'fixed')).strip().lower()
|
|
control_var = str(joint_info.get('variable', joint_info.get('control', ''))).strip().lower()
|
|
axis = resolve_vector(joint_info.get('axis', [1, 0, 0]), 3)
|
|
|
|
if joint_type == 'linear':
|
|
motion[:3, 3] = normalize_axis(axis) * state.get(control_var, 0.0) * scale
|
|
elif joint_type == 'revolute':
|
|
motion = axis_angle_matrix(axis, state.get(control_var, 0.0))
|
|
|
|
transforms[link_name] = parent_transform @ mount @ joint @ motion
|
|
|
|
return transforms
|
|
|
|
|
|
def compute_marker_world_position(marker: Dict[str, Any], link_transforms: Dict[str, np.ndarray]) -> np.ndarray:
|
|
link_transform = link_transforms[marker['link_name']]
|
|
local = np.ones(4, dtype=np.float64)
|
|
local[:3] = marker['position_m']
|
|
world = link_transform @ local
|
|
return world[:3]
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Camera / observation helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def load_intrinsics(detection_json: Dict[str, Any]) -> Tuple[np.ndarray, np.ndarray]:
|
|
cam = detection_json['camera']
|
|
K = np.asarray(cam['camera_matrix'], dtype=np.float64)
|
|
D = np.asarray(cam.get('distortion_coefficients', [0, 0, 0, 0, 0]), dtype=np.float64).reshape(-1, 1)
|
|
return K, D
|
|
|
|
|
|
def collect_views_and_observations(
|
|
detection_files: List[str],
|
|
robot_markers: Dict[int, Dict[str, Any]]
|
|
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
|
|
views: List[Dict[str, Any]] = []
|
|
observations: List[Dict[str, Any]] = []
|
|
|
|
for idx, det_path in enumerate(detection_files):
|
|
detection_json = load_json(det_path)
|
|
K, D = load_intrinsics(detection_json)
|
|
image_shape = detection_json.get('image', {}).get('image_shape')
|
|
if not image_shape:
|
|
image_shape = (720, 1280) # fallback
|
|
|
|
views.append({
|
|
'index': idx,
|
|
'source_file': os.path.abspath(det_path),
|
|
'camera_id': detection_json.get('camera', {}).get('camera_id', f'cam{idx+1}'),
|
|
'image_file': detection_json.get('image', {}).get('image_file'),
|
|
'image_shape': image_shape,
|
|
'K': K,
|
|
'D': D
|
|
})
|
|
|
|
for det in detection_json.get('detections', []) or []:
|
|
marker_id = int(det.get('marker_id', -1))
|
|
if marker_id < 0 or marker_id not in robot_markers:
|
|
continue
|
|
|
|
image_points = det.get('image_points_px')
|
|
if isinstance(image_points, list) and len(image_points) == 4:
|
|
image_points = np.asarray(image_points, dtype=np.float64)
|
|
else:
|
|
center = resolve_vector(det.get('center_px', [0, 0]), 2)
|
|
image_points = np.asarray([center], dtype=np.float64)
|
|
|
|
confidence = float(det.get('confidence', 1.0))
|
|
|
|
# Compute observation quality metrics
|
|
quality_metrics = compute_marker_observation_quality(image_points, tuple(image_shape))
|
|
|
|
# Blend base confidence and observation quality instead of multiplying them.
|
|
# Reduce the distance/size-based quality influence by roughly 40% relative to the previous blend.
|
|
base_confidence = max(0.01, min(1.0, confidence))
|
|
combined_quality = quality_metrics['combined_quality']
|
|
combined_confidence = base_confidence * 0.76 + combined_quality * 0.24
|
|
|
|
marker = robot_markers[marker_id]
|
|
observations.append({
|
|
'view_index': idx,
|
|
'marker_id': marker_id,
|
|
'marker_link_corners': marker_object_corners(marker),
|
|
'image_points_px': image_points,
|
|
'confidence_base': base_confidence,
|
|
'quality_metrics': quality_metrics,
|
|
'confidence': max(0.01, min(1.0, combined_confidence))
|
|
})
|
|
|
|
if len(views) == 0:
|
|
raise RuntimeError('No valid detection views found')
|
|
|
|
if len(observations) == 0:
|
|
raise RuntimeError('No marker observations matched robot.json markers')
|
|
|
|
return views, observations
|
|
|
|
|
|
def compute_marker_world_corners(marker: Dict[str, Any], link_transforms: Dict[str, np.ndarray]) -> np.ndarray:
|
|
link_transform = link_transforms[marker['link_name']]
|
|
local = marker_object_corners(marker)
|
|
homogeneous = np.concatenate([local, np.ones((local.shape[0], 1), dtype=np.float64)], axis=1)
|
|
world = (link_transform @ homogeneous.T).T
|
|
return world[:, :3]
|
|
|
|
|
|
def initial_camera_guess(
|
|
view: Dict[str, Any],
|
|
observations: List[Dict[str, Any]],
|
|
robot_markers: Dict[int, Dict[str, Any]],
|
|
default_state: Dict[str, float],
|
|
scale: float,
|
|
robot: Dict[str, Any]
|
|
) -> Tuple[np.ndarray, np.ndarray]:
|
|
object_points = []
|
|
image_points = []
|
|
|
|
link_transforms = compute_link_transforms(robot, default_state, scale)
|
|
|
|
for obs in observations:
|
|
if obs['view_index'] != view['index']:
|
|
continue
|
|
marker = robot_markers[obs['marker_id']]
|
|
object_points.append(compute_marker_world_corners(marker, link_transforms))
|
|
image_points.append(obs['image_points_px'])
|
|
|
|
if len(object_points) == 0:
|
|
return np.zeros((3, 1), dtype=np.float64), np.array([[0.0], [0.0], [1.0]], dtype=np.float64)
|
|
|
|
object_points = np.vstack(object_points)
|
|
image_points = np.vstack(image_points)
|
|
|
|
if object_points.shape[0] < 4:
|
|
return np.zeros((3, 1), dtype=np.float64), np.array([[0.0], [0.0], [1.0]], dtype=np.float64)
|
|
|
|
success, rvec, tvec = cv2.solvePnP(
|
|
object_points,
|
|
image_points,
|
|
view['K'],
|
|
view['D'],
|
|
flags=cv2.SOLVEPNP_ITERATIVE
|
|
)
|
|
|
|
if not success:
|
|
return np.zeros((3, 1), dtype=np.float64), np.array([[0.0], [0.0], [1.0]], dtype=np.float64)
|
|
|
|
return rvec, tvec
|
|
|
|
|
|
def project_points(
|
|
points_3d: np.ndarray,
|
|
rvec: np.ndarray,
|
|
tvec: np.ndarray,
|
|
K: np.ndarray,
|
|
D: np.ndarray
|
|
) -> np.ndarray:
|
|
projected, _ = cv2.projectPoints(points_3d, rvec, tvec, K, D)
|
|
return projected.reshape(-1, 2)
|
|
|
|
|
|
def compute_soft_constraint_residuals(
|
|
robot_state: Dict[str, float],
|
|
robot_markers: Dict[int, Dict[str, Any]],
|
|
link_transforms: Dict[str, np.ndarray],
|
|
robot: Dict[str, Any],
|
|
enabled_constraints: Dict[str, ConstraintResult]
|
|
) -> List[float]:
|
|
"""
|
|
Compute residuals from soft constraints (kinematic consistency, rigid body distances).
|
|
Returns a list of constraint residuals to append to the total residual vector.
|
|
"""
|
|
residuals = []
|
|
weight_scale = 0.1 # Weight for soft constraints relative to reprojection errors
|
|
|
|
# Constraint 1: Rigid body distances within each link
|
|
if enabled_constraints['RigidBodyDistances'].enabled:
|
|
for link_name in ['Arm1', 'Ellbow', 'Arm2']:
|
|
link_markers = [m for m in robot_markers.values() if m['link_name'] == link_name]
|
|
if len(link_markers) < 2:
|
|
continue
|
|
|
|
# Compute all pairwise distances in world coords
|
|
for i in range(len(link_markers)):
|
|
for j in range(i + 1, len(link_markers)):
|
|
m_i = link_markers[i]
|
|
m_j = link_markers[j]
|
|
|
|
pos_i = compute_marker_world_position(m_i, link_transforms)
|
|
pos_j = compute_marker_world_position(m_j, link_transforms)
|
|
|
|
dist_world = np.linalg.norm(pos_i - pos_j)
|
|
|
|
# Reference distance in local coords
|
|
dist_local = np.linalg.norm(m_i['position_m'] - m_j['position_m'])
|
|
|
|
# Residual: difference should be zero (rigid body)
|
|
error = dist_world - dist_local
|
|
residuals.append(error * weight_scale * 0.1) # Very soft weight
|
|
|
|
# Constraint 2: Fixed X-distances between links (Arm1 <-> Ellbow)
|
|
if enabled_constraints['InterLinkXDistances'].enabled:
|
|
arm1_markers = [m for m in robot_markers.values() if m['link_name'] == 'Arm1']
|
|
ellbow_markers = [m for m in robot_markers.values() if m['link_name'] == 'Ellbow']
|
|
|
|
if len(arm1_markers) >= 1 and len(ellbow_markers) >= 1:
|
|
# Get first marker from each link
|
|
m_arm1 = arm1_markers[0]
|
|
m_ellbow = ellbow_markers[0]
|
|
|
|
pos_arm1 = compute_marker_world_position(m_arm1, link_transforms)
|
|
pos_ellbow = compute_marker_world_position(m_ellbow, link_transforms)
|
|
|
|
# X-distance in world should match reference (relative position)
|
|
# Since both rotate around X-axis at different points, we check consistency
|
|
x_diff_world = pos_ellbow[0] - pos_arm1[0]
|
|
x_diff_ref = m_ellbow['position_m'][0] - m_arm1['position_m'][0]
|
|
|
|
error = x_diff_world - x_diff_ref
|
|
residuals.append(error * weight_scale)
|
|
|
|
return residuals
|
|
|
|
|
|
def compute_marker_world_position(marker: Dict[str, Any], link_transforms: Dict[str, np.ndarray]) -> np.ndarray:
|
|
"""Compute the world position of a marker given current link transforms."""
|
|
link_transform = link_transforms[marker['link_name']]
|
|
local_pos = np.concatenate([marker['position_m'], [1.0]])
|
|
world_pos = (link_transform @ local_pos)[:3]
|
|
return world_pos
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Optimization
|
|
# ------------------------------------------------------------------
|
|
|
|
def pack_parameters(robot_state: Dict[str, float], camera_params: List[Tuple[np.ndarray, np.ndarray]]) -> np.ndarray:
|
|
state_vec = np.asarray([robot_state[k] for k in STATE_KEYS], dtype=np.float64)
|
|
cams = []
|
|
for rvec, tvec in camera_params:
|
|
cams.append(rvec.reshape(3))
|
|
cams.append(tvec.reshape(3))
|
|
return np.concatenate([state_vec] + cams)
|
|
|
|
|
|
def unpack_parameters(params: np.ndarray, n_views: int) -> Tuple[Dict[str, float], List[Tuple[np.ndarray, np.ndarray]]]:
|
|
robot_state = {STATE_KEYS[i]: float(params[i]) for i in range(len(STATE_KEYS))}
|
|
camera_params = []
|
|
offset = len(STATE_KEYS)
|
|
for _ in range(n_views):
|
|
rvec = params[offset:offset + 3].reshape(3, 1)
|
|
tvec = params[offset + 3:offset + 6].reshape(3, 1)
|
|
camera_params.append((rvec, tvec))
|
|
offset += 6
|
|
return robot_state, camera_params
|
|
|
|
|
|
def residuals_for_parameters(
|
|
params: np.ndarray,
|
|
views: List[Dict[str, Any]],
|
|
observations: List[Dict[str, Any]],
|
|
robot_markers: Dict[int, Dict[str, Any]],
|
|
robot: Dict[str, Any],
|
|
scale: float,
|
|
default_state: Dict[str, float],
|
|
enabled_constraints: Dict[str, ConstraintResult]
|
|
) -> np.ndarray:
|
|
robot_state, camera_params = unpack_parameters(params, len(views))
|
|
link_transforms = compute_link_transforms(robot, robot_state, scale)
|
|
|
|
residuals = []
|
|
|
|
# Reprojection residuals (primary observation)
|
|
for obs in observations:
|
|
marker = robot_markers[obs['marker_id']]
|
|
world_corners = compute_marker_world_corners(marker, link_transforms)
|
|
rvec, tvec = camera_params[obs['view_index']]
|
|
proj = project_points(world_corners, rvec, tvec, views[obs['view_index']]['K'], views[obs['view_index']]['D'])
|
|
diffs = proj - obs['image_points_px']
|
|
weight = math.sqrt(obs['confidence'])
|
|
residuals.extend((diffs * weight).reshape(-1))
|
|
|
|
# Weak priors on robot state
|
|
for key in STATE_KEYS:
|
|
diff = robot_state[key] - default_state.get(key, 0.0)
|
|
if key in ('x', 'y', 'z', 'e'):
|
|
w = 0.001
|
|
else:
|
|
w = 0.01
|
|
residuals.append(diff * w)
|
|
|
|
# Soft constraints (kinematic consistency, rigid body constraints)
|
|
soft_constraint_residuals = compute_soft_constraint_residuals(
|
|
robot_state, robot_markers, link_transforms, robot, enabled_constraints
|
|
)
|
|
residuals.extend(soft_constraint_residuals)
|
|
|
|
return np.asarray(residuals, dtype=np.float64)
|
|
|
|
|
|
|
|
def estimate_uncertainty(result: Any, n_params: int) -> np.ndarray:
|
|
if result.jac is None:
|
|
return np.full(n_params, float('nan'), dtype=np.float64)
|
|
J = result.jac
|
|
m, n = J.shape
|
|
JTJ = J.T @ J
|
|
try:
|
|
cov = np.linalg.pinv(JTJ)
|
|
except np.linalg.LinAlgError:
|
|
cov = np.linalg.pinv(JTJ + np.eye(n) * 1e-9)
|
|
residuals = result.fun
|
|
dof = max(1, m - n)
|
|
sigma2 = float(np.sum(residuals ** 2) / dof)
|
|
cov *= sigma2
|
|
return np.sqrt(np.diag(cov))
|
|
|
|
|
|
def print_constraint_sanity_check(
|
|
robot_state: Dict[str, float],
|
|
robot_markers: Dict[int, Dict[str, Any]],
|
|
link_transforms: Dict[str, np.ndarray],
|
|
robot: Dict[str, Any],
|
|
enabled_constraints: Dict[str, ConstraintResult]
|
|
) -> None:
|
|
"""
|
|
Print sanity checks for all constraints to verify the optimization result.
|
|
"""
|
|
print("\n" + "=" * 70)
|
|
print("CONSTRAINT SANITY CHECKS (after optimization)")
|
|
print("=" * 70)
|
|
|
|
# Check 1: Rigid body distances
|
|
if enabled_constraints['RigidBodyDistances'].enabled:
|
|
print("\n1. RIGID BODY DISTANCES")
|
|
for link_name in ['Arm1', 'Ellbow', 'Arm2']:
|
|
link_markers = [m for m in robot_markers.values() if m['link_name'] == link_name]
|
|
if len(link_markers) < 2:
|
|
continue
|
|
|
|
max_error = 0.0
|
|
for i in range(len(link_markers)):
|
|
for j in range(i + 1, len(link_markers)):
|
|
m_i = link_markers[i]
|
|
m_j = link_markers[j]
|
|
|
|
pos_i = compute_marker_world_position(m_i, link_transforms)
|
|
pos_j = compute_marker_world_position(m_j, link_transforms)
|
|
|
|
dist_world = np.linalg.norm(pos_i - pos_j)
|
|
dist_local = np.linalg.norm(m_i['position_m'] - m_j['position_m'])
|
|
error = abs(dist_world - dist_local)
|
|
max_error = max(max_error, error)
|
|
|
|
status = "✓" if max_error < 1.0 else "⚠" if max_error < 5.0 else "✗"
|
|
print(f" {link_name:10s}: max_error = {max_error:.3f} mm {status}")
|
|
|
|
# Check 2: Inter-link X distances
|
|
if enabled_constraints['InterLinkXDistances'].enabled:
|
|
print("\n2. INTER-LINK X-DISTANCES")
|
|
arm1_markers = [m for m in robot_markers.values() if m['link_name'] == 'Arm1']
|
|
ellbow_markers = [m for m in robot_markers.values() if m['link_name'] == 'Ellbow']
|
|
|
|
if len(arm1_markers) >= 1 and len(ellbow_markers) >= 1:
|
|
m_arm1 = arm1_markers[0]
|
|
m_ellbow = ellbow_markers[0]
|
|
|
|
pos_arm1 = compute_marker_world_position(m_arm1, link_transforms)
|
|
pos_ellbow = compute_marker_world_position(m_ellbow, link_transforms)
|
|
|
|
x_diff_world = pos_ellbow[0] - pos_arm1[0]
|
|
x_diff_ref = m_ellbow['position_m'][0] - m_arm1['position_m'][0]
|
|
error = abs(x_diff_world - x_diff_ref)
|
|
|
|
status = "✓" if error < 1.0 else "⚠" if error < 5.0 else "✗"
|
|
print(f" Arm1 <-> Ellbow: error = {error:.3f} mm {status}")
|
|
|
|
# Check 3: Arm2 sin(a) dependency
|
|
if enabled_constraints['Arm2SinADependency'].enabled:
|
|
print("\n3. ARM2 sin(a) DEPENDENCY (sanity check)")
|
|
arm2_markers = [m for m in robot_markers.values() if m['link_name'] == 'Arm2']
|
|
if len(arm2_markers) >= 2:
|
|
# Check that markers with different Z values have different X spreads
|
|
a_rad = math.radians(robot_state['a'])
|
|
sin_a = math.sin(a_rad)
|
|
cos_a = math.cos(a_rad)
|
|
|
|
z_variations = {}
|
|
for m in arm2_markers:
|
|
z_local = m['position_m'][2]
|
|
x_local = m['position_m'][0]
|
|
pos_world = compute_marker_world_position(m, link_transforms)
|
|
x_world = pos_world[0]
|
|
|
|
# Expected: x_world = 90 + x_local * cos(a) - z_local * sin(a)
|
|
x_expected = 90 * (robot.get('renderingInfo', {}).get('metric', 'mm') == 'mm' and 0.09 or 0.09) + x_local * cos_a - z_local * sin_a
|
|
x_error = abs(x_world - x_expected)
|
|
|
|
if z_local not in z_variations:
|
|
z_variations[z_local] = []
|
|
z_variations[z_local].append(x_error)
|
|
|
|
max_error = max(max(errors) for errors in z_variations.values()) if z_variations else 0.0
|
|
status = "✓" if max_error < 5.0 else "⚠" if max_error < 10.0 else "⚠"
|
|
print(f" X-consistency with sin(a): max_error = {max_error:.3f} mm {status}")
|
|
print(f" (Note: this is a consistency check, not a hard constraint)")
|
|
|
|
print("=" * 70)
|
|
|
|
|
|
def camera_position_world(rvec: np.ndarray, tvec: np.ndarray) -> np.ndarray:
|
|
R, _ = cv2.Rodrigues(rvec)
|
|
return (-R.T @ tvec).reshape(3)
|
|
|
|
|
|
def build_output(
|
|
robot_state: Dict[str, float],
|
|
state_uncertainty: np.ndarray,
|
|
views: List[Dict[str, Any]],
|
|
camera_params: List[Tuple[np.ndarray, np.ndarray]],
|
|
observations: List[Dict[str, Any]],
|
|
robot_markers: Dict[int, Dict[str, Any]],
|
|
scale: float,
|
|
robot: Dict[str, Any],
|
|
robot_json_path: str
|
|
) -> Dict[str, Any]:
|
|
link_transforms = compute_link_transforms(robot, robot_state, scale)
|
|
|
|
marker_summary: Dict[int, Dict[str, Any]] = {}
|
|
for marker_id, marker in robot_markers.items():
|
|
marker_summary[marker_id] = {
|
|
'marker_id': marker_id,
|
|
'link_name': marker['link_name'],
|
|
'position_world_m': compute_marker_world_position(marker, link_transforms).tolist(),
|
|
'size_m': marker['size_m'],
|
|
'observation_count': 0,
|
|
'mean_confidence': None,
|
|
'mean_reprojection_error_px': None,
|
|
'observations': []
|
|
}
|
|
|
|
per_marker_errors: Dict[int, List[float]] = {mid: [] for mid in marker_summary}
|
|
per_marker_confidences: Dict[int, List[float]] = {mid: [] for mid in marker_summary}
|
|
|
|
link_transforms = compute_link_transforms(robot, robot_state, scale)
|
|
for obs in observations:
|
|
marker_id = obs['marker_id']
|
|
marker = robot_markers[marker_id]
|
|
object_points_m = compute_marker_world_corners(marker, link_transforms)
|
|
rvec, tvec = camera_params[obs['view_index']]
|
|
proj = project_points(object_points_m, rvec, tvec, views[obs['view_index']]['K'], views[obs['view_index']]['D'])
|
|
diffs = proj - obs['image_points_px']
|
|
errors = np.linalg.norm(diffs, axis=1)
|
|
repro_error = float(np.mean(errors))
|
|
per_marker_errors[marker_id].extend(errors.tolist())
|
|
per_marker_confidences[marker_id].append(obs['confidence'])
|
|
marker_summary[marker_id]['observation_count'] += 1
|
|
marker_summary[marker_id]['observations'].append({
|
|
'view_index': obs['view_index'],
|
|
'source_file': views[obs['view_index']]['source_file'],
|
|
'image_file': views[obs['view_index']]['image_file'],
|
|
'confidence': obs['confidence'],
|
|
'mean_reprojection_error_px': repro_error,
|
|
'corner_reprojection_errors_px': errors.tolist()
|
|
})
|
|
|
|
for marker_id, summary in marker_summary.items():
|
|
if summary['observation_count'] > 0:
|
|
summary['mean_confidence'] = float(np.mean(per_marker_confidences[marker_id]))
|
|
summary['mean_reprojection_error_px'] = float(np.mean(per_marker_errors[marker_id]))
|
|
|
|
camera_outputs = []
|
|
for idx, view in enumerate(views):
|
|
rvec, tvec = camera_params[idx]
|
|
cam_pos = camera_position_world(rvec, tvec)
|
|
observed_count = sum(1 for obs in observations if obs['view_index'] == idx)
|
|
camera_outputs.append({
|
|
'view_index': idx,
|
|
'source_file': view['source_file'],
|
|
'camera_id': view['camera_id'],
|
|
'camera_position_world_m': cam_pos.tolist(),
|
|
'rvec': rvec.reshape(-1).tolist(),
|
|
'tvec': tvec.reshape(-1).tolist(),
|
|
'intrinsics': {
|
|
'camera_matrix': view['K'].tolist(),
|
|
'distortion_coefficients': view['D'].reshape(-1).tolist()
|
|
},
|
|
'observation_count': observed_count
|
|
})
|
|
|
|
robot_pose_output = {
|
|
'state': {k: float(robot_state[k]) for k in STATE_KEYS},
|
|
'uncertainty': {
|
|
'x_mm': float(state_uncertainty[0]),
|
|
'y_mm': float(state_uncertainty[1]),
|
|
'z_mm': float(state_uncertainty[2]),
|
|
'a_deg': float(state_uncertainty[3]),
|
|
'b_deg': float(state_uncertainty[4]),
|
|
'c_deg': float(state_uncertainty[5]),
|
|
'e_mm': float(state_uncertainty[6])
|
|
},
|
|
'confidence': {
|
|
'x': float(math.exp(-state_uncertainty[0] / 10.0)),
|
|
'y': float(math.exp(-state_uncertainty[1] / 10.0)),
|
|
'z': float(math.exp(-state_uncertainty[2] / 10.0)),
|
|
'a': float(math.exp(-state_uncertainty[3] / 10.0)),
|
|
'b': float(math.exp(-state_uncertainty[4] / 10.0)),
|
|
'c': float(math.exp(-state_uncertainty[5] / 10.0)),
|
|
'e': float(math.exp(-state_uncertainty[6] / max(1.0, state_uncertainty[6])))
|
|
}
|
|
}
|
|
|
|
return {
|
|
'schema_version': '1.0',
|
|
'created_utc': datetime.datetime.utcnow().isoformat() + 'Z',
|
|
'source_robot_json': os.path.abspath(robot_json_path),
|
|
'source_detections': [view['source_file'] for view in views],
|
|
'robot_pose': robot_pose_output,
|
|
'camera_poses': camera_outputs,
|
|
'marker_positions': list(marker_summary.values())
|
|
}
|
|
|
|
|
|
# ------------------------------------------------------------------
|
|
# Main
|
|
# ------------------------------------------------------------------
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description='Multiview optimization of robot pose and camera extrinsics')
|
|
parser.add_argument('--robot', required=True, help='Path to robot.json')
|
|
parser.add_argument('--detections', required=True, nargs='+', help='List of detection JSON files')
|
|
parser.add_argument('--outDir', required=True, help='Output directory')
|
|
parser.add_argument('--write-summary', action='store_true', help='Write summary file')
|
|
parser.add_argument('--max-iter', type=int, default=500, help='Maximum optimizer iterations')
|
|
args = parser.parse_args()
|
|
|
|
os.makedirs(args.outDir, exist_ok=True)
|
|
|
|
robot_json_path = os.path.abspath(args.robot)
|
|
robot = load_json(robot_json_path)
|
|
scale = parse_metric_scale(robot)
|
|
|
|
default_state = {
|
|
k: float(robot.get('defaultPosition', {}).get(k, 0.0) or 0.0)
|
|
for k in STATE_KEYS
|
|
}
|
|
|
|
robot_markers = extract_markers(robot, scale)
|
|
|
|
# Validate constraints
|
|
print("\n" + "=" * 70)
|
|
print("CONSTRAINT VALIDATION")
|
|
print("=" * 70)
|
|
enabled_constraints = validate_constraints(robot, robot_markers)
|
|
for constraint_name, result in enabled_constraints.items():
|
|
print(result)
|
|
print("=" * 70)
|
|
|
|
views, observations = collect_views_and_observations(args.detections, robot_markers)
|
|
|
|
# Print observation quality summary
|
|
print("\n" + "=" * 70)
|
|
print("OBSERVATION QUALITY SUMMARY")
|
|
print("=" * 70)
|
|
print(f"Total observations: {len(observations)}")
|
|
print()
|
|
|
|
# Aggregate quality metrics
|
|
quality_by_marker = {}
|
|
for obs in observations:
|
|
mid = obs['marker_id']
|
|
if mid not in quality_by_marker:
|
|
quality_by_marker[mid] = []
|
|
quality_by_marker[mid].append(obs['quality_metrics'])
|
|
|
|
print(f"{'Marker':>8} {'Link':>12} {'Count':>6} {'Avg Orient.':>12} {'Avg Size':>12} {'Avg Conf.':>12}")
|
|
print("-" * 70)
|
|
for marker_id in sorted(quality_by_marker.keys()):
|
|
marker = robot_markers[marker_id]
|
|
quality_list = quality_by_marker[marker_id]
|
|
avg_orient = np.mean([q['orientation_quality'] for q in quality_list])
|
|
avg_size = np.mean([q['size_quality'] for q in quality_list])
|
|
|
|
obs_for_marker = [o for o in observations if o['marker_id'] == marker_id]
|
|
avg_conf = np.mean([o['confidence'] for o in obs_for_marker])
|
|
|
|
print(f"{marker_id:8d} {marker['link_name']:>12} {len(quality_list):6d} "
|
|
f"{avg_orient:12.3f} {avg_size:12.3f} {avg_conf:12.3f}")
|
|
print("=" * 70)
|
|
|
|
camera_guesses = []
|
|
for view in views:
|
|
rvec, tvec = initial_camera_guess(view, observations, robot_markers, default_state, scale, robot)
|
|
camera_guesses.append((rvec, tvec))
|
|
|
|
|
|
x0 = pack_parameters(default_state, camera_guesses)
|
|
|
|
progress = {
|
|
'iter': 0,
|
|
'last_cost': None,
|
|
'last_print': time.time(),
|
|
'prev_x': x0.copy()
|
|
}
|
|
|
|
def progress_callback(xk: np.ndarray) -> None:
|
|
progress['iter'] += 1
|
|
now = time.time()
|
|
if progress['iter'] == 1 or now - progress['last_print'] >= 1.0:
|
|
res = residuals_for_parameters(xk, views, observations, robot_markers, robot, scale, default_state, enabled_constraints)
|
|
cost = 0.5 * float(np.dot(res, res))
|
|
delta_cost = None
|
|
convergence = ''
|
|
if progress['last_cost'] is not None:
|
|
delta_cost = cost - progress['last_cost']
|
|
if abs(delta_cost) < 1e-3:
|
|
convergence = ' stable'
|
|
elif delta_cost < 0:
|
|
convergence = ' improving'
|
|
else:
|
|
convergence = ' worsening'
|
|
step_norm = float(np.linalg.norm(xk - progress['prev_x']))
|
|
print(
|
|
f'[Multiview] iter={progress["iter"]:4d} cost={cost:.4f}'
|
|
+ (f' delta={delta_cost:.4g}' if delta_cost is not None else '')
|
|
+ f' step={step_norm:.4g}'
|
|
+ convergence
|
|
)
|
|
progress['last_cost'] = cost
|
|
progress['last_print'] = now
|
|
progress['prev_x'] = xk.copy()
|
|
|
|
result = least_squares(
|
|
residuals_for_parameters,
|
|
x0,
|
|
args=(views, observations, robot_markers, robot, scale, default_state, enabled_constraints),
|
|
jac='2-point',
|
|
method='trf',
|
|
loss='soft_l1',
|
|
f_scale=1.0,
|
|
max_nfev=args.max_iter,
|
|
callback=progress_callback
|
|
)
|
|
|
|
robot_state, camera_params = unpack_parameters(result.x, len(views))
|
|
uncertainties = estimate_uncertainty(result, len(result.x))
|
|
|
|
# Print constraint sanity checks
|
|
link_transforms = compute_link_transforms(robot, robot_state, scale)
|
|
print_constraint_sanity_check(robot_state, robot_markers, link_transforms, robot, enabled_constraints)
|
|
|
|
output = build_output(robot_state, uncertainties[:len(STATE_KEYS)], views, camera_params, observations, robot_markers, scale, robot, robot_json_path)
|
|
|
|
out_path = Path(args.outDir) / 'multiview_pose.json'
|
|
save_json(output, out_path)
|
|
|
|
print(f'Saved: {out_path}')
|
|
if args.write_summary:
|
|
summary_path = Path(args.outDir) / 'multiview_pose_summary.json'
|
|
summary = {
|
|
'final_cost': float(result.cost),
|
|
'status': int(result.status),
|
|
'message': result.message,
|
|
'robot_state': output['robot_pose'],
|
|
'camera_count': len(views),
|
|
'marker_count': len(robot_markers)
|
|
}
|
|
save_json(summary, summary_path)
|
|
print(f'Saved: {summary_path}')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|