1159 lines
46 KiB
Python
1159 lines
46 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Phase 1 — robust multiview robot pose estimation from aruco_detection.json + robot.json
|
|
|
|
This version keeps the original kinematic model and optimizer structure, but changes:
|
|
- observation weighting to a saturating factor model: min(1, q + f)
|
|
- quality indicators are normalized to 0..1
|
|
- blur/sharpness is supported but disabled by default (f=1)
|
|
- homography skew quality is added
|
|
- summary is built from the final output, so values stay consistent
|
|
- duplicate marker ids are warned about instead of being silently overwritten
|
|
|
|
Input:
|
|
--robot robot.json
|
|
--detections render_1a_aruco_detection.json ...
|
|
--outDir output
|
|
|
|
Output:
|
|
multiview_pose.json
|
|
multiview_pose_summary.json (optional)
|
|
"""
|
|
|
|
import argparse
|
|
import datetime as _dt
|
|
import json
|
|
import math
|
|
import os
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Tuple, Optional
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from scipy.optimize import least_squares
|
|
|
|
STATE_KEYS = ["x", "y", "z", "a", "b", "c", "e"]
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Small helpers
|
|
# -----------------------------------------------------------------------------
|
|
|
|
def clamp01(x: float) -> float:
|
|
return float(max(0.0, min(1.0, x)))
|
|
|
|
|
|
def load_json(path: str) -> Dict[str, Any]:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_json(data: Dict[str, Any], path: Path) -> None:
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
def resolve_scalar(value: Any, default: float = 0.0) -> float:
|
|
if value is None:
|
|
return default
|
|
if isinstance(value, (int, float)):
|
|
return float(value)
|
|
try:
|
|
return float(str(value).strip())
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def resolve_vector(value: Any, default_len: int = 3) -> Tuple[float, ...]:
|
|
if value is None:
|
|
return tuple(0.0 for _ in range(default_len))
|
|
if isinstance(value, (int, float, str)):
|
|
return (resolve_scalar(value),) + tuple(0.0 for _ in range(default_len - 1))
|
|
if isinstance(value, (list, tuple)):
|
|
resolved = [resolve_scalar(v) for v in value]
|
|
if len(resolved) < default_len:
|
|
resolved.extend([0.0] * (default_len - len(resolved)))
|
|
return tuple(resolved[:default_len])
|
|
return tuple(0.0 for _ in range(default_len))
|
|
|
|
|
|
def parse_metric_scale(robot: Dict[str, Any]) -> float:
|
|
rendering_info = robot.get("renderingInfo", {}) or {}
|
|
metric = str(rendering_info.get("metric", "mm")).strip().lower()
|
|
return 0.001 if metric == "mm" else 1.0
|
|
|
|
|
|
def normalize_axis(axis: Any) -> np.ndarray:
|
|
vec = np.asarray(axis, dtype=np.float64).reshape(-1)[:3]
|
|
norm = np.linalg.norm(vec)
|
|
if norm < 1e-12:
|
|
return np.array([1.0, 0.0, 0.0], dtype=np.float64)
|
|
return vec / norm
|
|
|
|
|
|
def euler_deg_to_matrix(euler_deg: Any) -> np.ndarray:
|
|
x_deg, y_deg, z_deg = resolve_vector(euler_deg, 3)
|
|
x = math.radians(x_deg)
|
|
y = math.radians(y_deg)
|
|
z = math.radians(z_deg)
|
|
|
|
cx = math.cos(x)
|
|
sx = math.sin(x)
|
|
cy = math.cos(y)
|
|
sy = math.sin(y)
|
|
cz = math.cos(z)
|
|
sz = math.sin(z)
|
|
|
|
Rx = np.array([[1.0, 0.0, 0.0], [0.0, cx, -sx], [0.0, sx, cx]], dtype=np.float64)
|
|
Ry = np.array([[cy, 0.0, sy], [0.0, 1.0, 0.0], [-sy, 0.0, cy]], dtype=np.float64)
|
|
Rz = np.array([[cz, -sz, 0.0], [sz, cz, 0.0], [0.0, 0.0, 1.0]], dtype=np.float64)
|
|
return Rz @ Ry @ Rx
|
|
|
|
|
|
def transform_from_translation_rotation(translation: Any, rotation_deg: Any) -> np.ndarray:
|
|
T = np.eye(4, dtype=np.float64)
|
|
T[:3, 3] = np.asarray(resolve_vector(translation, 3), dtype=np.float64)
|
|
T[:3, :3] = euler_deg_to_matrix(rotation_deg)
|
|
return T
|
|
|
|
|
|
def axis_angle_matrix(axis: Any, angle_deg: float) -> np.ndarray:
|
|
axis_vec = normalize_axis(axis)
|
|
theta = math.radians(angle_deg)
|
|
kx, ky, kz = axis_vec
|
|
c = math.cos(theta)
|
|
s = math.sin(theta)
|
|
v = 1.0 - c
|
|
R = np.array([
|
|
[kx * kx * v + c, kx * ky * v - kz * s, kx * kz * v + ky * s],
|
|
[ky * kx * v + kz * s, ky * ky * v + c, ky * kz * v - kx * s],
|
|
[kz * kx * v - ky * s, kz * ky * v + kx * s, kz * kz * v + c],
|
|
], dtype=np.float64)
|
|
T = np.eye(4, dtype=np.float64)
|
|
T[:3, :3] = R
|
|
return T
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Optional quality configuration
|
|
# -----------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class ObservationQualityConfig:
|
|
# q indicators are normalized to 0..1
|
|
size_ref_px: float = 50.0
|
|
border_ref_px: float = 120.0
|
|
center_ref_norm: float = 1.0
|
|
sharpness_ref: float = 2500.0
|
|
homography_ref: float = 0.18
|
|
|
|
# factor f scales the effect of each quality indicator:
|
|
# f = 1 -> indicator is fully active
|
|
# f = 0 -> indicator is ignored (neutral weight = 1)
|
|
size_factor: float = 1.0
|
|
aspect_factor: float = 1.0
|
|
border_factor: float = 1.0
|
|
center_factor: float = 1.0
|
|
sharpness_factor: float = 1.0
|
|
homography_factor: float = 1.0
|
|
|
|
# Placeholders for later phases
|
|
normal_visibility_factor: float = 1.0
|
|
spin_factor: float = 1.0
|
|
|
|
# Keep a tiny floor for weights so the optimizer remains numerically stable
|
|
weight_floor: float = 0.01
|
|
|
|
|
|
def _load_nested_quality_config(src: Dict[str, Any]) -> Dict[str, Any]:
|
|
if not isinstance(src, dict):
|
|
return {}
|
|
for key in ("multiview_quality", "multiviewQuality", "quality_config", "multiview"):
|
|
v = src.get(key)
|
|
if isinstance(v, dict):
|
|
return v
|
|
return {}
|
|
|
|
|
|
def load_quality_config(robot: Dict[str, Any]) -> ObservationQualityConfig:
|
|
cfg = ObservationQualityConfig()
|
|
candidates = []
|
|
candidates.append(_load_nested_quality_config(robot))
|
|
candidates.append(_load_nested_quality_config(robot.get("vision_config", {}) or {}))
|
|
|
|
for src in candidates:
|
|
if not src:
|
|
continue
|
|
for field_name in cfg.__dataclass_fields__.keys():
|
|
if field_name in src:
|
|
setattr(cfg, field_name, resolve_scalar(src.get(field_name), getattr(cfg, field_name)))
|
|
return cfg
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Marker extraction and kinematics
|
|
# -----------------------------------------------------------------------------
|
|
|
|
class ConstraintResult:
|
|
def __init__(self, name: str, enabled: bool, reason: str = ""):
|
|
self.name = name
|
|
self.enabled = enabled
|
|
self.reason = reason
|
|
self.residuals = []
|
|
|
|
def __str__(self) -> str:
|
|
status = "✓ ENABLED" if self.enabled else "✗ DISABLED"
|
|
return f"{self.name:40s} {status:12s} {self.reason}"
|
|
|
|
|
|
def build_link_chain(robot: Dict[str, Any]) -> List[str]:
|
|
links = robot.get("links", {}) or {}
|
|
ordered: List[str] = []
|
|
remaining = set(links.keys())
|
|
while remaining:
|
|
progress = False
|
|
for name in list(remaining):
|
|
parent = links[name].get("parent")
|
|
if not parent or parent in ordered:
|
|
ordered.append(name)
|
|
remaining.remove(name)
|
|
progress = True
|
|
if not progress:
|
|
raise RuntimeError("Cycle detected in robot link tree or missing parent link")
|
|
return ordered
|
|
|
|
|
|
def extract_markers(robot: Dict[str, Any], scale: float) -> Dict[int, Dict[str, Any]]:
|
|
markers: Dict[int, Dict[str, Any]] = {}
|
|
links = robot.get("links", {}) or {}
|
|
marker_defaults = (robot.get("renderingInfo", {}) or {}).get("markerDefaults", {}) or {}
|
|
default_size_mm = float(marker_defaults.get("size", 25.0))
|
|
|
|
for link_name, link_info in links.items():
|
|
for marker in link_info.get("markers", []) or []:
|
|
marker_id = int(marker.get("id", -1))
|
|
if marker_id < 0:
|
|
continue
|
|
if marker_id in markers:
|
|
# Duplicate ids exist in the provided robot.json. Keep the first entry and warn.
|
|
print(f"[WARN] Duplicate marker id {marker_id} on link '{link_name}'. Ignoring this duplicate entry.")
|
|
continue
|
|
|
|
pos = resolve_vector(marker.get("position", [0, 0, 0]), 3)
|
|
size_mm = float(marker.get("size", default_size_mm))
|
|
markers[marker_id] = {
|
|
"marker_id": marker_id,
|
|
"link_name": link_name,
|
|
"position_m": np.asarray([pos[0] * scale, pos[1] * scale, pos[2] * scale], dtype=np.float64),
|
|
"normal": normalize_axis(resolve_vector(marker.get("normal", [0, 0, 1]), 3)),
|
|
"spin_deg": float(marker.get("spin", 0.0)),
|
|
"size_m": size_mm * scale,
|
|
}
|
|
|
|
return markers
|
|
|
|
|
|
def compute_link_transforms(robot: Dict[str, Any], state: Dict[str, float], scale: float) -> Dict[str, np.ndarray]:
|
|
links = robot.get("links", {}) or {}
|
|
ordered_links = build_link_chain(robot)
|
|
transforms: Dict[str, np.ndarray] = {}
|
|
|
|
for link_name in ordered_links:
|
|
link_info = links[link_name] or {}
|
|
parent_name = link_info.get("parent")
|
|
parent_transform = transforms[parent_name] if parent_name else np.eye(4, dtype=np.float64)
|
|
|
|
mount_translation = np.asarray(resolve_vector(link_info.get("mountPosition", [0, 0, 0]), 3), dtype=np.float64) * scale
|
|
mount = transform_from_translation_rotation(mount_translation, link_info.get("mountRotation", [0, 0, 0]))
|
|
|
|
joint_info = link_info.get("jointToParent", {}) or {}
|
|
joint_origin = np.asarray(resolve_vector(joint_info.get("origin", [0, 0, 0]), 3), dtype=np.float64) * scale
|
|
joint = transform_from_translation_rotation(joint_origin, joint_info.get("rotation", [0, 0, 0]))
|
|
|
|
motion = np.eye(4, dtype=np.float64)
|
|
joint_type = str(joint_info.get("type", "fixed")).strip().lower()
|
|
control_var = str(joint_info.get("variable", joint_info.get("control", ""))).strip().lower()
|
|
axis = resolve_vector(joint_info.get("axis", [1, 0, 0]), 3)
|
|
|
|
if joint_type == "linear":
|
|
motion[:3, 3] = normalize_axis(axis) * state.get(control_var, 0.0) * scale
|
|
elif joint_type == "revolute":
|
|
motion = axis_angle_matrix(axis, state.get(control_var, 0.0))
|
|
|
|
transforms[link_name] = parent_transform @ mount @ joint @ motion
|
|
|
|
return transforms
|
|
|
|
|
|
def compute_marker_world_position(marker: Dict[str, Any], link_transforms: Dict[str, np.ndarray]) -> np.ndarray:
|
|
link_transform = link_transforms[marker["link_name"]]
|
|
local = np.ones(4, dtype=np.float64)
|
|
local[:3] = marker["position_m"]
|
|
world = link_transform @ local
|
|
return world[:3]
|
|
|
|
|
|
def marker_plane_axes(normal: np.ndarray, spin_deg: float) -> Tuple[np.ndarray, np.ndarray]:
|
|
n = normalize_axis(normal)
|
|
candidate = np.array((0.0, 0.0, 1.0), dtype=np.float64)
|
|
if abs(np.dot(n, candidate)) > 0.99:
|
|
candidate = np.array((1.0, 0.0, 0.0), dtype=np.float64)
|
|
|
|
x_dir = np.cross(candidate, n)
|
|
x_dir /= max(np.linalg.norm(x_dir), 1e-9)
|
|
y_dir = np.cross(n, x_dir)
|
|
|
|
if abs(spin_deg) > 1e-9:
|
|
theta = math.radians(spin_deg)
|
|
cos_t = math.cos(theta)
|
|
sin_t = math.sin(theta)
|
|
x_rot = x_dir * cos_t + np.cross(n, x_dir) * sin_t + n * np.dot(n, x_dir) * (1.0 - cos_t)
|
|
y_rot = y_dir * cos_t + np.cross(n, y_dir) * sin_t + n * np.dot(n, y_dir) * (1.0 - cos_t)
|
|
return x_rot, y_rot
|
|
|
|
return x_dir, y_dir
|
|
|
|
|
|
def marker_object_corners(marker: Dict[str, Any]) -> np.ndarray:
|
|
half = marker["size_m"] * 0.5
|
|
x_dir, y_dir = marker_plane_axes(marker["normal"], marker["spin_deg"])
|
|
corners = np.stack([
|
|
-x_dir * half + y_dir * half,
|
|
x_dir * half + y_dir * half,
|
|
x_dir * half - y_dir * half,
|
|
-x_dir * half - y_dir * half,
|
|
], axis=0)
|
|
return marker["position_m"].reshape(1, 3) + corners
|
|
|
|
|
|
def compute_marker_world_corners(marker: Dict[str, Any], link_transforms: Dict[str, np.ndarray]) -> np.ndarray:
|
|
link_transform = link_transforms[marker["link_name"]]
|
|
local = marker_object_corners(marker)
|
|
homogeneous = np.concatenate([local, np.ones((local.shape[0], 1), dtype=np.float64)], axis=1)
|
|
world = (link_transform @ homogeneous.T).T
|
|
return world[:, :3]
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Quality model
|
|
# -----------------------------------------------------------------------------
|
|
|
|
def quality_factor(q: float, f: float) -> float:
|
|
# Interpret f in [0, 1] as the per-indicator influence.
|
|
# f = 1.0 -> indicator is fully active; q is applied.
|
|
# f = 0.0 -> indicator is ignored and contributes a neutral weight of 1.0.
|
|
q = clamp01(q)
|
|
f = clamp01(f)
|
|
return 1.0 - f + f * q
|
|
|
|
|
|
def projective_homography_quality(image_points_px: np.ndarray, image_shape: Tuple[int, int], ref: float) -> float:
|
|
if image_points_px is None or len(image_points_px) != 4:
|
|
return 1.0
|
|
|
|
h, w = image_shape
|
|
if h <= 0 or w <= 0:
|
|
return 1.0
|
|
|
|
src = np.array([[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]], dtype=np.float32)
|
|
dst = np.asarray(image_points_px, dtype=np.float32).copy()
|
|
dst[:, 0] /= float(w)
|
|
dst[:, 1] /= float(h)
|
|
|
|
try:
|
|
H = cv2.getPerspectiveTransform(src, dst).astype(np.float64)
|
|
if abs(H[2, 2]) > 1e-12:
|
|
H = H / H[2, 2]
|
|
proj_strength = float(abs(H[2, 0]) + abs(H[2, 1]))
|
|
q = 1.0 / (1.0 + proj_strength / max(ref, 1e-6))
|
|
return clamp01(q)
|
|
except Exception:
|
|
return 1.0
|
|
|
|
|
|
def compute_observation_quality(
|
|
det: Dict[str, Any],
|
|
image_shape: Tuple[int, int],
|
|
cfg: ObservationQualityConfig,
|
|
) -> Dict[str, Any]:
|
|
quality = det.get("quality", {}) or {}
|
|
geometry = quality.get("geometry", {}) or {}
|
|
sharpness = quality.get("sharpness", {}) or {}
|
|
|
|
edge_lengths = quality.get("edge_lengths_px", []) or []
|
|
edge_lengths = [float(x) for x in edge_lengths if x is not None]
|
|
mean_edge_px = float(np.mean(edge_lengths)) if len(edge_lengths) else math.sqrt(max(float(quality.get("area_px", 0.0)), 0.0))
|
|
|
|
edge_ratio = float(quality.get("edge_ratio", 1.0) or 1.0)
|
|
distance_to_border_px = float(geometry.get("distance_to_border_px", 0.0) or 0.0)
|
|
distance_to_center_norm = float(geometry.get("distance_to_center_norm", 1.0) or 1.0)
|
|
laplacian_var = float(sharpness.get("laplacian_var", 0.0) or 0.0)
|
|
|
|
# q in 0..1
|
|
q_size = clamp01(mean_edge_px / max(cfg.size_ref_px, 1e-6))
|
|
q_aspect = clamp01(2.0 / (1.0 + max(edge_ratio, 1e-6)))
|
|
q_border = clamp01(distance_to_border_px / max(cfg.border_ref_px, 1e-6))
|
|
q_center = clamp01(1.0 - (distance_to_center_norm / max(cfg.center_ref_norm, 1e-6)))
|
|
q_sharpness = clamp01(laplacian_var / max(cfg.sharpness_ref, 1e-6))
|
|
q_homography = projective_homography_quality(np.asarray(det.get("image_points_px", []), dtype=np.float64), image_shape, cfg.homography_ref)
|
|
|
|
factor_map = {
|
|
"size": quality_factor(q_size, cfg.size_factor),
|
|
"aspect": quality_factor(q_aspect, cfg.aspect_factor),
|
|
"border": quality_factor(q_border, cfg.border_factor),
|
|
"center": quality_factor(q_center, cfg.center_factor),
|
|
"sharpness": quality_factor(q_sharpness, cfg.sharpness_factor),
|
|
"homography": quality_factor(q_homography, cfg.homography_factor),
|
|
}
|
|
|
|
# Currently not active in phase 1, but already supported for later phases.
|
|
# They default to f=1, which makes them neutral.
|
|
q_normal_visibility = 1.0
|
|
q_spin = 1.0
|
|
factor_map["normal_visibility"] = quality_factor(q_normal_visibility, cfg.normal_visibility_factor)
|
|
factor_map["spin"] = quality_factor(q_spin, cfg.spin_factor)
|
|
|
|
weight_multiplier = 1.0
|
|
for v in factor_map.values():
|
|
weight_multiplier *= float(v)
|
|
|
|
# Conservative default: if no factor is activated in robot.json,
|
|
# this remains effectively neutral.
|
|
|
|
detector_confidence = clamp01(float(det.get("confidence", 1.0) or 1.0))
|
|
weighted_confidence = detector_confidence * weight_multiplier
|
|
weighted_confidence = max(cfg.weight_floor, min(1.0, weighted_confidence))
|
|
|
|
return {
|
|
"detector_confidence": detector_confidence,
|
|
"weighted_confidence": weighted_confidence,
|
|
"q": {
|
|
"size": q_size,
|
|
"aspect": q_aspect,
|
|
"border": q_border,
|
|
"center": q_center,
|
|
"sharpness": q_sharpness,
|
|
"homography": q_homography,
|
|
"normal_visibility": q_normal_visibility,
|
|
"spin": q_spin,
|
|
},
|
|
"factor": factor_map,
|
|
"weight_multiplier": weight_multiplier,
|
|
"raw": {
|
|
"mean_edge_px": mean_edge_px,
|
|
"edge_ratio": edge_ratio,
|
|
"distance_to_border_px": distance_to_border_px,
|
|
"distance_to_center_norm": distance_to_center_norm,
|
|
"laplacian_var": laplacian_var,
|
|
},
|
|
}
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Constraints (kept from the existing approach)
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def validate_constraints(robot: Dict[str, Any], robot_markers: Dict[int, Dict[str, Any]]) -> Dict[str, ConstraintResult]:
|
|
results = {}
|
|
|
|
rigid_body_result = ConstraintResult("RigidBodyDistances", False)
|
|
try:
|
|
rigid_body_count = 0
|
|
for link_name in ["Arm1", "Ellbow", "Arm2"]:
|
|
link_markers = [m for m in robot_markers.values() if m["link_name"] == link_name]
|
|
if len(link_markers) >= 2:
|
|
rigid_body_count += 1
|
|
if rigid_body_count >= 2:
|
|
rigid_body_result.enabled = True
|
|
rigid_body_result.reason = f"Found {rigid_body_count} links with 2+ markers each"
|
|
else:
|
|
rigid_body_result.reason = "Not enough rigid links with multiple markers"
|
|
except Exception as e:
|
|
rigid_body_result.reason = f"Error: {str(e)}"
|
|
results["RigidBodyDistances"] = rigid_body_result
|
|
|
|
inter_link_x_result = ConstraintResult("InterLinkXDistances", False)
|
|
try:
|
|
links_with_markers = set(m["link_name"] for m in robot_markers.values())
|
|
x_rotated_links = []
|
|
for link_name in ["Arm1", "Ellbow"]:
|
|
if link_name in links_with_markers:
|
|
link_markers = [m for m in robot_markers.values() if m["link_name"] == link_name]
|
|
if len(link_markers) >= 1:
|
|
x_rotated_links.append(link_name)
|
|
if len(x_rotated_links) >= 2:
|
|
inter_link_x_result.enabled = True
|
|
inter_link_x_result.reason = f"Found {len(x_rotated_links)} X-rotation links: {', '.join(x_rotated_links)}"
|
|
else:
|
|
inter_link_x_result.reason = "Not enough X-rotation links"
|
|
except Exception as e:
|
|
inter_link_x_result.reason = f"Error: {str(e)}"
|
|
results["InterLinkXDistances"] = inter_link_x_result
|
|
|
|
arm2_sina_result = ConstraintResult("Arm2SinADependency", True, "Sanity check only (not enforced)")
|
|
try:
|
|
arm2_markers = [m for m in robot_markers.values() if m["link_name"] == "Arm2"]
|
|
if len(arm2_markers) >= 2:
|
|
z_values = set(float(m["position_m"][2]) for m in arm2_markers)
|
|
if len(z_values) > 1:
|
|
arm2_sina_result.enabled = True
|
|
arm2_sina_result.reason = "Multiple Z-values detected; sin(a) dependency confirmed"
|
|
else:
|
|
arm2_sina_result.enabled = False
|
|
arm2_sina_result.reason = "No Z-variation in Arm2 markers (cannot use sin(a) constraint)"
|
|
else:
|
|
arm2_sina_result.enabled = False
|
|
arm2_sina_result.reason = "Not enough Arm2 markers"
|
|
except Exception as e:
|
|
arm2_sina_result.reason = f"Error: {str(e)}"
|
|
results["Arm2SinADependency"] = arm2_sina_result
|
|
|
|
return results
|
|
|
|
|
|
def compute_soft_constraint_residuals(
|
|
robot_state: Dict[str, float],
|
|
robot_markers: Dict[int, Dict[str, Any]],
|
|
link_transforms: Dict[str, np.ndarray],
|
|
robot: Dict[str, Any],
|
|
enabled_constraints: Dict[str, ConstraintResult],
|
|
) -> List[float]:
|
|
residuals = []
|
|
weight_scale = 0.1
|
|
|
|
if enabled_constraints["RigidBodyDistances"].enabled:
|
|
for link_name in ["Arm1", "Ellbow", "Arm2"]:
|
|
link_markers = [m for m in robot_markers.values() if m["link_name"] == link_name]
|
|
if len(link_markers) < 2:
|
|
continue
|
|
for i in range(len(link_markers)):
|
|
for j in range(i + 1, len(link_markers)):
|
|
m_i = link_markers[i]
|
|
m_j = link_markers[j]
|
|
pos_i = compute_marker_world_position(m_i, link_transforms)
|
|
pos_j = compute_marker_world_position(m_j, link_transforms)
|
|
dist_world = np.linalg.norm(pos_i - pos_j)
|
|
dist_local = np.linalg.norm(m_i["position_m"] - m_j["position_m"])
|
|
error = dist_world - dist_local
|
|
residuals.append(error * weight_scale * 0.1)
|
|
|
|
if enabled_constraints["InterLinkXDistances"].enabled:
|
|
arm1_markers = [m for m in robot_markers.values() if m["link_name"] == "Arm1"]
|
|
ellbow_markers = [m for m in robot_markers.values() if m["link_name"] == "Ellbow"]
|
|
if len(arm1_markers) >= 1 and len(ellbow_markers) >= 1:
|
|
m_arm1 = arm1_markers[0]
|
|
m_ellbow = ellbow_markers[0]
|
|
pos_arm1 = compute_marker_world_position(m_arm1, link_transforms)
|
|
pos_ellbow = compute_marker_world_position(m_ellbow, link_transforms)
|
|
x_diff_world = pos_ellbow[0] - pos_arm1[0]
|
|
x_diff_ref = m_ellbow["position_m"][0] - m_arm1["position_m"][0]
|
|
residuals.append((x_diff_world - x_diff_ref) * weight_scale)
|
|
|
|
return residuals
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Camera / observation helpers
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def load_intrinsics(detection_json: Dict[str, Any]) -> Tuple[np.ndarray, np.ndarray]:
|
|
cam = detection_json["camera"]
|
|
K = np.asarray(cam["camera_matrix"], dtype=np.float64)
|
|
D = np.asarray(cam.get("distortion_coefficients", [0, 0, 0, 0, 0]), dtype=np.float64).reshape(-1, 1)
|
|
return K, D
|
|
|
|
|
|
def detection_image_shape(detection_json: Dict[str, Any]) -> Tuple[int, int]:
|
|
image = detection_json.get("image", {}) or {}
|
|
h = int(image.get("height_px", detection_json.get("height_px", 720)) or 720)
|
|
w = int(image.get("width_px", detection_json.get("width_px", 1280)) or 1280)
|
|
return h, w
|
|
|
|
|
|
def collect_views_and_observations(
|
|
detection_files: List[str],
|
|
robot_markers: Dict[int, Dict[str, Any]],
|
|
quality_cfg: ObservationQualityConfig,
|
|
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
|
|
views: List[Dict[str, Any]] = []
|
|
observations: List[Dict[str, Any]] = []
|
|
|
|
for idx, det_path in enumerate(detection_files):
|
|
detection_json = load_json(det_path)
|
|
K, D = load_intrinsics(detection_json)
|
|
image_shape = detection_image_shape(detection_json)
|
|
|
|
views.append({
|
|
"index": idx,
|
|
"source_file": os.path.abspath(det_path),
|
|
"camera_id": detection_json.get("camera", {}).get("camera_id", f"cam{idx+1}"),
|
|
"image_file": detection_json.get("image", {}).get("image_file"),
|
|
"image_shape": image_shape,
|
|
"K": K,
|
|
"D": D,
|
|
})
|
|
|
|
for det in detection_json.get("detections", []) or []:
|
|
if str(det.get("type", "aruco")).lower() != "aruco":
|
|
continue
|
|
marker_id = int(det.get("marker_id", -1))
|
|
if marker_id < 0 or marker_id not in robot_markers:
|
|
continue
|
|
|
|
image_points = det.get("image_points_px")
|
|
if not (isinstance(image_points, list) and len(image_points) == 4):
|
|
# Phase 1 uses full marker corners only.
|
|
continue
|
|
|
|
image_points = np.asarray(image_points, dtype=np.float64)
|
|
marker = robot_markers[marker_id]
|
|
obs_quality = compute_observation_quality(det, image_shape, quality_cfg)
|
|
|
|
observations.append({
|
|
"view_index": idx,
|
|
"marker_id": marker_id,
|
|
"marker_link_corners": marker_object_corners(marker),
|
|
"image_points_px": image_points,
|
|
"confidence_base": obs_quality["detector_confidence"],
|
|
"confidence": obs_quality["weighted_confidence"],
|
|
"quality": obs_quality,
|
|
"raw_detection": det,
|
|
})
|
|
|
|
if len(views) == 0:
|
|
raise RuntimeError("No valid detection views found")
|
|
if len(observations) == 0:
|
|
raise RuntimeError("No marker observations matched robot.json markers")
|
|
return views, observations
|
|
|
|
|
|
def initial_camera_guess(
|
|
view: Dict[str, Any],
|
|
observations: List[Dict[str, Any]],
|
|
robot_markers: Dict[int, Dict[str, Any]],
|
|
default_state: Dict[str, float],
|
|
scale: float,
|
|
robot: Dict[str, Any],
|
|
) -> Tuple[np.ndarray, np.ndarray]:
|
|
object_points = []
|
|
image_points = []
|
|
link_transforms = compute_link_transforms(robot, default_state, scale)
|
|
|
|
for obs in observations:
|
|
if obs["view_index"] != view["index"]:
|
|
continue
|
|
marker = robot_markers[obs["marker_id"]]
|
|
object_points.append(compute_marker_world_corners(marker, link_transforms))
|
|
image_points.append(obs["image_points_px"])
|
|
|
|
if len(object_points) == 0:
|
|
return np.zeros((3, 1), dtype=np.float64), np.array([[0.0], [0.0], [1.0]], dtype=np.float64)
|
|
|
|
object_points = np.vstack(object_points)
|
|
image_points = np.vstack(image_points)
|
|
|
|
if object_points.shape[0] < 4:
|
|
return np.zeros((3, 1), dtype=np.float64), np.array([[0.0], [0.0], [1.0]], dtype=np.float64)
|
|
|
|
success, rvec, tvec = cv2.solvePnP(
|
|
object_points,
|
|
image_points,
|
|
view["K"],
|
|
view["D"],
|
|
flags=cv2.SOLVEPNP_ITERATIVE,
|
|
)
|
|
|
|
if not success:
|
|
return np.zeros((3, 1), dtype=np.float64), np.array([[0.0], [0.0], [1.0]], dtype=np.float64)
|
|
|
|
return rvec, tvec
|
|
|
|
|
|
def project_points(points_3d: np.ndarray, rvec: np.ndarray, tvec: np.ndarray, K: np.ndarray, D: np.ndarray) -> np.ndarray:
|
|
projected, _ = cv2.projectPoints(points_3d, rvec, tvec, K, D)
|
|
return projected.reshape(-1, 2)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Optimization
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def pack_parameters(robot_state: Dict[str, float], camera_params: List[Tuple[np.ndarray, np.ndarray]]) -> np.ndarray:
|
|
state_vec = np.asarray([robot_state[k] for k in STATE_KEYS], dtype=np.float64)
|
|
cams = []
|
|
for rvec, tvec in camera_params:
|
|
cams.append(rvec.reshape(3))
|
|
cams.append(tvec.reshape(3))
|
|
return np.concatenate([state_vec] + cams)
|
|
|
|
|
|
def unpack_parameters(params: np.ndarray, n_views: int) -> Tuple[Dict[str, float], List[Tuple[np.ndarray, np.ndarray]]]:
|
|
robot_state = {STATE_KEYS[i]: float(params[i]) for i in range(len(STATE_KEYS))}
|
|
camera_params = []
|
|
offset = len(STATE_KEYS)
|
|
for _ in range(n_views):
|
|
rvec = params[offset:offset + 3].reshape(3, 1)
|
|
tvec = params[offset + 3:offset + 6].reshape(3, 1)
|
|
camera_params.append((rvec, tvec))
|
|
offset += 6
|
|
return robot_state, camera_params
|
|
|
|
|
|
def residuals_for_parameters(
|
|
params: np.ndarray,
|
|
views: List[Dict[str, Any]],
|
|
observations: List[Dict[str, Any]],
|
|
robot_markers: Dict[int, Dict[str, Any]],
|
|
robot: Dict[str, Any],
|
|
scale: float,
|
|
default_state: Dict[str, float],
|
|
enabled_constraints: Dict[str, ConstraintResult],
|
|
) -> np.ndarray:
|
|
robot_state, camera_params = unpack_parameters(params, len(views))
|
|
link_transforms = compute_link_transforms(robot, robot_state, scale)
|
|
|
|
residuals = []
|
|
|
|
for obs in observations:
|
|
marker = robot_markers[obs["marker_id"]]
|
|
world_corners = compute_marker_world_corners(marker, link_transforms)
|
|
rvec, tvec = camera_params[obs["view_index"]]
|
|
proj = project_points(world_corners, rvec, tvec, views[obs["view_index"]]["K"], views[obs["view_index"]]["D"])
|
|
diffs = proj - obs["image_points_px"]
|
|
weight = math.sqrt(max(float(obs["confidence"]), 1e-9))
|
|
residuals.extend((diffs * weight).reshape(-1))
|
|
|
|
for key in STATE_KEYS:
|
|
diff = robot_state[key] - default_state.get(key, 0.0)
|
|
w = 0.001 if key in ("x", "y", "z", "e") else 0.01
|
|
residuals.append(diff * w)
|
|
|
|
residuals.extend(compute_soft_constraint_residuals(robot_state, robot_markers, link_transforms, robot, enabled_constraints))
|
|
|
|
return np.asarray(residuals, dtype=np.float64)
|
|
|
|
|
|
def estimate_uncertainty(result: Any, n_params: int) -> np.ndarray:
|
|
if result.jac is None:
|
|
return np.full(n_params, float("nan"), dtype=np.float64)
|
|
J = result.jac
|
|
m, n = J.shape
|
|
JTJ = J.T @ J
|
|
try:
|
|
cov = np.linalg.pinv(JTJ)
|
|
except np.linalg.LinAlgError:
|
|
cov = np.linalg.pinv(JTJ + np.eye(n) * 1e-9)
|
|
residuals = result.fun
|
|
dof = max(1, m - n)
|
|
sigma2 = float(np.sum(residuals ** 2) / dof)
|
|
cov *= sigma2
|
|
return np.sqrt(np.diag(cov))
|
|
|
|
|
|
def camera_position_world(rvec: np.ndarray, tvec: np.ndarray) -> np.ndarray:
|
|
R, _ = cv2.Rodrigues(rvec)
|
|
return (-R.T @ tvec).reshape(3)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Output building
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def build_output(
|
|
robot_state: Dict[str, float],
|
|
state_uncertainty: np.ndarray,
|
|
views: List[Dict[str, Any]],
|
|
camera_params: List[Tuple[np.ndarray, np.ndarray]],
|
|
observations: List[Dict[str, Any]],
|
|
robot_markers: Dict[int, Dict[str, Any]],
|
|
scale: float,
|
|
robot: Dict[str, Any],
|
|
robot_json_path: str,
|
|
quality_cfg: ObservationQualityConfig,
|
|
final_cost: Optional[float] = None,
|
|
solver_status: Optional[int] = None,
|
|
solver_message: Optional[str] = None,
|
|
) -> Dict[str, Any]:
|
|
link_transforms = compute_link_transforms(robot, robot_state, scale)
|
|
|
|
marker_summary: Dict[int, Dict[str, Any]] = {}
|
|
for marker_id, marker in robot_markers.items():
|
|
marker_summary[marker_id] = {
|
|
"marker_id": marker_id,
|
|
"link_name": marker["link_name"],
|
|
"position_world_m": compute_marker_world_position(marker, link_transforms).tolist(),
|
|
"size_m": marker["size_m"],
|
|
"observation_count": 0,
|
|
"mean_confidence": None,
|
|
"mean_detector_confidence": None,
|
|
"mean_reprojection_error_px": None,
|
|
"observations": [],
|
|
}
|
|
|
|
per_marker_errors: Dict[int, List[float]] = {mid: [] for mid in marker_summary}
|
|
per_marker_confidences: Dict[int, List[float]] = {mid: [] for mid in marker_summary}
|
|
per_marker_detector_conf: Dict[int, List[float]] = {mid: [] for mid in marker_summary}
|
|
|
|
for obs in observations:
|
|
marker_id = obs["marker_id"]
|
|
marker = robot_markers[marker_id]
|
|
object_points_m = compute_marker_world_corners(marker, link_transforms)
|
|
rvec, tvec = camera_params[obs["view_index"]]
|
|
proj = project_points(object_points_m, rvec, tvec, views[obs["view_index"]]["K"], views[obs["view_index"]]["D"])
|
|
diffs = proj - obs["image_points_px"]
|
|
errors = np.linalg.norm(diffs, axis=1)
|
|
repro_error = float(np.mean(errors))
|
|
per_marker_errors[marker_id].extend(errors.tolist())
|
|
per_marker_confidences[marker_id].append(float(obs["confidence"]))
|
|
per_marker_detector_conf[marker_id].append(float(obs["confidence_base"]))
|
|
marker_summary[marker_id]["observation_count"] += 1
|
|
marker_summary[marker_id]["observations"].append({
|
|
"view_index": obs["view_index"],
|
|
"source_file": views[obs["view_index"]]["source_file"],
|
|
"image_file": views[obs["view_index"]]["image_file"],
|
|
"confidence_detector": float(obs["confidence_base"]),
|
|
"confidence_weighted": float(obs["confidence"]),
|
|
"quality": obs["quality"],
|
|
"mean_reprojection_error_px": repro_error,
|
|
"corner_reprojection_errors_px": errors.tolist(),
|
|
})
|
|
|
|
for marker_id, summary in marker_summary.items():
|
|
if summary["observation_count"] > 0:
|
|
summary["mean_confidence"] = float(np.mean(per_marker_confidences[marker_id]))
|
|
summary["mean_detector_confidence"] = float(np.mean(per_marker_detector_conf[marker_id]))
|
|
summary["mean_reprojection_error_px"] = float(np.mean(per_marker_errors[marker_id]))
|
|
|
|
camera_outputs = []
|
|
for idx, view in enumerate(views):
|
|
rvec, tvec = camera_params[idx]
|
|
cam_pos = camera_position_world(rvec, tvec)
|
|
observed_count = sum(1 for obs in observations if obs["view_index"] == idx)
|
|
camera_outputs.append({
|
|
"view_index": idx,
|
|
"source_file": view["source_file"],
|
|
"camera_id": view["camera_id"],
|
|
"camera_position_world_m": cam_pos.tolist(),
|
|
"rvec": rvec.reshape(-1).tolist(),
|
|
"tvec": tvec.reshape(-1).tolist(),
|
|
"intrinsics": {
|
|
"camera_matrix": view["K"].tolist(),
|
|
"distortion_coefficients": view["D"].reshape(-1).tolist(),
|
|
},
|
|
"observation_count": observed_count,
|
|
})
|
|
|
|
robot_pose_output = {
|
|
"state": {k: float(robot_state[k]) for k in STATE_KEYS},
|
|
"uncertainty": {
|
|
"x_mm": float(state_uncertainty[0]),
|
|
"y_mm": float(state_uncertainty[1]),
|
|
"z_mm": float(state_uncertainty[2]),
|
|
"a_deg": float(state_uncertainty[3]),
|
|
"b_deg": float(state_uncertainty[4]),
|
|
"c_deg": float(state_uncertainty[5]),
|
|
"e_mm": float(state_uncertainty[6]),
|
|
},
|
|
"confidence": {
|
|
"x": float(math.exp(-state_uncertainty[0] / 10.0)) if np.isfinite(state_uncertainty[0]) else 0.0,
|
|
"y": float(math.exp(-state_uncertainty[1] / 10.0)) if np.isfinite(state_uncertainty[1]) else 0.0,
|
|
"z": float(math.exp(-state_uncertainty[2] / 10.0)) if np.isfinite(state_uncertainty[2]) else 0.0,
|
|
"a": float(math.exp(-state_uncertainty[3] / 10.0)) if np.isfinite(state_uncertainty[3]) else 0.0,
|
|
"b": float(math.exp(-state_uncertainty[4] / 10.0)) if np.isfinite(state_uncertainty[4]) else 0.0,
|
|
"c": float(math.exp(-state_uncertainty[5] / 10.0)) if np.isfinite(state_uncertainty[5]) else 0.0,
|
|
"e": float(math.exp(-state_uncertainty[6] / max(1.0, state_uncertainty[6]))) if np.isfinite(state_uncertainty[6]) else 0.0,
|
|
},
|
|
}
|
|
|
|
all_conf = np.asarray([obs["confidence"] for obs in observations], dtype=np.float64)
|
|
all_det_conf = np.asarray([obs["confidence_base"] for obs in observations], dtype=np.float64)
|
|
all_q_size = np.asarray([obs["quality"]["q"]["size"] for obs in observations], dtype=np.float64)
|
|
all_q_aspect = np.asarray([obs["quality"]["q"]["aspect"] for obs in observations], dtype=np.float64)
|
|
all_q_border = np.asarray([obs["quality"]["q"]["border"] for obs in observations], dtype=np.float64)
|
|
all_q_homography = np.asarray([obs["quality"]["q"]["homography"] for obs in observations], dtype=np.float64)
|
|
|
|
all_errors = []
|
|
for marker in marker_summary.values():
|
|
if marker["mean_reprojection_error_px"] is not None:
|
|
all_errors.append(marker["mean_reprojection_error_px"])
|
|
|
|
statistics = {
|
|
"observation_count": len(observations),
|
|
"camera_count": len(views),
|
|
"marker_count": len(robot_markers),
|
|
"observed_marker_count": int(sum(1 for m in marker_summary.values() if m["observation_count"] > 0)),
|
|
"mean_detector_confidence": float(np.mean(all_det_conf)) if len(all_det_conf) else None,
|
|
"mean_weighted_confidence": float(np.mean(all_conf)) if len(all_conf) else None,
|
|
"mean_reprojection_error_px": float(np.mean(all_errors)) if len(all_errors) else None,
|
|
"quality_means": {
|
|
"size": float(np.mean(all_q_size)) if len(all_q_size) else None,
|
|
"aspect": float(np.mean(all_q_aspect)) if len(all_q_aspect) else None,
|
|
"border": float(np.mean(all_q_border)) if len(all_q_border) else None,
|
|
"homography": float(np.mean(all_q_homography)) if len(all_q_homography) else None,
|
|
},
|
|
"quality_config": {
|
|
"size_ref_px": quality_cfg.size_ref_px,
|
|
"border_ref_px": quality_cfg.border_ref_px,
|
|
"center_ref_norm": quality_cfg.center_ref_norm,
|
|
"sharpness_ref": quality_cfg.sharpness_ref,
|
|
"homography_ref": quality_cfg.homography_ref,
|
|
"size_factor": quality_cfg.size_factor,
|
|
"aspect_factor": quality_cfg.aspect_factor,
|
|
"border_factor": quality_cfg.border_factor,
|
|
"center_factor": quality_cfg.center_factor,
|
|
"sharpness_factor": quality_cfg.sharpness_factor,
|
|
"homography_factor": quality_cfg.homography_factor,
|
|
},
|
|
}
|
|
|
|
output = {
|
|
"schema_version": "1.0",
|
|
"created_utc": _dt.datetime.utcnow().isoformat() + "Z",
|
|
"source_robot_json": os.path.abspath(robot_json_path),
|
|
"source_detections": [view["source_file"] for view in views],
|
|
"robot_pose": robot_pose_output,
|
|
"camera_poses": camera_outputs,
|
|
"marker_positions": list(marker_summary.values()),
|
|
"statistics": statistics,
|
|
"solver": {
|
|
"final_cost": final_cost,
|
|
"status": solver_status,
|
|
"message": solver_message,
|
|
},
|
|
}
|
|
return output
|
|
|
|
|
|
def build_summary(output: Dict[str, Any]) -> Dict[str, Any]:
|
|
return {
|
|
"schema_version": output.get("schema_version"),
|
|
"created_utc": output.get("created_utc"),
|
|
"source_robot_json": output.get("source_robot_json"),
|
|
"source_detections": output.get("source_detections"),
|
|
"solver": output.get("solver", {}),
|
|
"robot_pose": output.get("robot_pose"),
|
|
"statistics": output.get("statistics", {}),
|
|
}
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Diagnostics
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def print_constraint_sanity_check(
|
|
robot_state: Dict[str, float],
|
|
robot_markers: Dict[int, Dict[str, Any]],
|
|
link_transforms: Dict[str, np.ndarray],
|
|
robot: Dict[str, Any],
|
|
enabled_constraints: Dict[str, ConstraintResult],
|
|
scale: float,
|
|
) -> None:
|
|
print("\n" + "=" * 70)
|
|
print("CONSTRAINT SANITY CHECKS (after optimization)")
|
|
print("=" * 70)
|
|
|
|
if enabled_constraints["RigidBodyDistances"].enabled:
|
|
print("\n1. RIGID BODY DISTANCES")
|
|
for link_name in ["Arm1", "Ellbow", "Arm2"]:
|
|
link_markers = [m for m in robot_markers.values() if m["link_name"] == link_name]
|
|
if len(link_markers) < 2:
|
|
continue
|
|
max_error = 0.0
|
|
for i in range(len(link_markers)):
|
|
for j in range(i + 1, len(link_markers)):
|
|
m_i = link_markers[i]
|
|
m_j = link_markers[j]
|
|
pos_i = compute_marker_world_position(m_i, link_transforms)
|
|
pos_j = compute_marker_world_position(m_j, link_transforms)
|
|
dist_world = np.linalg.norm(pos_i - pos_j)
|
|
dist_local = np.linalg.norm(m_i["position_m"] - m_j["position_m"])
|
|
max_error = max(max_error, abs(dist_world - dist_local))
|
|
status = "✓" if max_error < 1.0 else "⚠" if max_error < 5.0 else "✗"
|
|
print(f" {link_name:10s}: max_error = {max_error:.3f} mm {status}")
|
|
|
|
if enabled_constraints["InterLinkXDistances"].enabled:
|
|
print("\n2. INTER-LINK X-DISTANCES")
|
|
arm1_markers = [m for m in robot_markers.values() if m["link_name"] == "Arm1"]
|
|
ellbow_markers = [m for m in robot_markers.values() if m["link_name"] == "Ellbow"]
|
|
if len(arm1_markers) >= 1 and len(ellbow_markers) >= 1:
|
|
m_arm1 = arm1_markers[0]
|
|
m_ellbow = ellbow_markers[0]
|
|
pos_arm1 = compute_marker_world_position(m_arm1, link_transforms)
|
|
pos_ellbow = compute_marker_world_position(m_ellbow, link_transforms)
|
|
x_diff_world = pos_ellbow[0] - pos_arm1[0]
|
|
x_diff_ref = m_ellbow["position_m"][0] - m_arm1["position_m"][0]
|
|
error = abs(x_diff_world - x_diff_ref)
|
|
status = "✓" if error < 1.0 else "⚠" if error < 5.0 else "✗"
|
|
print(f" Arm1 <-> Ellbow: error = {error:.3f} mm {status}")
|
|
|
|
if enabled_constraints["Arm2SinADependency"].enabled:
|
|
print("\n3. ARM2 sin(a) DEPENDENCY (sanity check)")
|
|
arm2_markers = [m for m in robot_markers.values() if m["link_name"] == "Arm2"]
|
|
if len(arm2_markers) >= 2:
|
|
a_rad = math.radians(robot_state["a"])
|
|
sin_a = math.sin(a_rad)
|
|
cos_a = math.cos(a_rad)
|
|
max_error = 0.0
|
|
# This remains only a qualitative check.
|
|
for m in arm2_markers:
|
|
pos_world = compute_marker_world_position(m, link_transforms)
|
|
x_world = pos_world[0]
|
|
x_local = m["position_m"][0]
|
|
z_local = m["position_m"][2]
|
|
x_expected = (90.0 * scale) + x_local * cos_a - z_local * sin_a
|
|
max_error = max(max_error, abs(x_world - x_expected))
|
|
status = "✓" if max_error < 5.0 else "⚠"
|
|
print(f" X-consistency with sin(a): max_error = {max_error:.3f} mm {status}")
|
|
print(" (Note: this is a consistency check, not a hard constraint)")
|
|
|
|
print("=" * 70)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Main
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Multiview optimization of robot pose and camera extrinsics")
|
|
parser.add_argument("--robot", required=True, help="Path to robot.json")
|
|
parser.add_argument("--detections", required=True, nargs="+", help="List of detection JSON files")
|
|
parser.add_argument("--outDir", required=True, help="Output directory")
|
|
parser.add_argument("--write-summary", action="store_true", help="Write summary file")
|
|
parser.add_argument("--max-iter", type=int, default=500, help="Maximum optimizer iterations")
|
|
args = parser.parse_args()
|
|
|
|
os.makedirs(args.outDir, exist_ok=True)
|
|
|
|
robot_json_path = os.path.abspath(args.robot)
|
|
robot = load_json(robot_json_path)
|
|
scale = parse_metric_scale(robot)
|
|
quality_cfg = load_quality_config(robot)
|
|
|
|
default_state = {k: float(robot.get("defaultPosition", {}).get(k, 0.0) or 0.0) for k in STATE_KEYS}
|
|
robot_markers = extract_markers(robot, scale)
|
|
|
|
print("\n" + "=" * 70)
|
|
print("CONSTRAINT VALIDATION")
|
|
print("=" * 70)
|
|
enabled_constraints = validate_constraints(robot, robot_markers)
|
|
for _, result in enabled_constraints.items():
|
|
print(result)
|
|
print("=" * 70)
|
|
|
|
views, observations = collect_views_and_observations(args.detections, robot_markers, quality_cfg)
|
|
|
|
print("\n" + "=" * 70)
|
|
print("OBSERVATION QUALITY SUMMARY")
|
|
print("=" * 70)
|
|
print(f"Total observations: {len(observations)}")
|
|
print()
|
|
|
|
quality_by_marker: Dict[int, List[Dict[str, Any]]] = {}
|
|
for obs in observations:
|
|
quality_by_marker.setdefault(obs["marker_id"], []).append(obs["quality"])
|
|
|
|
print(f"{'Marker':>8} {'Link':>12} {'Count':>6} {'Avg Size':>10} {'Avg Aspec':>10} {'Avg Hmg.':>10} {'Avg Conf.':>10}")
|
|
print("-" * 74)
|
|
for marker_id in sorted(quality_by_marker.keys()):
|
|
marker = robot_markers[marker_id]
|
|
qlist = quality_by_marker[marker_id]
|
|
avg_size = float(np.mean([q["q"]["size"] for q in qlist]))
|
|
avg_aspect = float(np.mean([q["q"]["aspect"] for q in qlist]))
|
|
avg_homog = float(np.mean([q["q"]["homography"] for q in qlist]))
|
|
obs_for_marker = [o for o in observations if o["marker_id"] == marker_id]
|
|
avg_conf = float(np.mean([o["confidence"] for o in obs_for_marker]))
|
|
print(f"{marker_id:8d} {marker['link_name']:>12} {len(qlist):6d} {avg_size:10.3f} {avg_aspect:10.3f} {avg_homog:10.3f} {avg_conf:10.3f}")
|
|
print("=" * 70)
|
|
|
|
camera_guesses = []
|
|
for view in views:
|
|
rvec, tvec = initial_camera_guess(view, observations, robot_markers, default_state, scale, robot)
|
|
camera_guesses.append((rvec, tvec))
|
|
|
|
x0 = pack_parameters(default_state, camera_guesses)
|
|
|
|
progress = {
|
|
"iter": 0,
|
|
"last_cost": None,
|
|
"last_print": time.time(),
|
|
"prev_x": x0.copy(),
|
|
}
|
|
|
|
def progress_callback(xk: np.ndarray) -> None:
|
|
progress["iter"] += 1
|
|
now = time.time()
|
|
if progress["iter"] == 1 or now - progress["last_print"] >= 1.0:
|
|
res = residuals_for_parameters(xk, views, observations, robot_markers, robot, scale, default_state, enabled_constraints)
|
|
cost = 0.5 * float(np.dot(res, res))
|
|
delta_cost = None
|
|
convergence = ""
|
|
if progress["last_cost"] is not None:
|
|
delta_cost = cost - progress["last_cost"]
|
|
if abs(delta_cost) < 1e-3:
|
|
convergence = " stable"
|
|
elif delta_cost < 0:
|
|
convergence = " improving"
|
|
else:
|
|
convergence = " worsening"
|
|
step_norm = float(np.linalg.norm(xk - progress["prev_x"]))
|
|
print(f'[Multiview] iter={progress["iter"]:4d} cost={cost:.4f}' + (f' delta={delta_cost:.4g}' if delta_cost is not None else "") + f' step={step_norm:.4g}' + convergence)
|
|
progress["last_cost"] = cost
|
|
progress["last_print"] = now
|
|
progress["prev_x"] = xk.copy()
|
|
|
|
result = least_squares(
|
|
residuals_for_parameters,
|
|
x0,
|
|
args=(views, observations, robot_markers, robot, scale, default_state, enabled_constraints),
|
|
jac="2-point",
|
|
method="trf",
|
|
loss="soft_l1",
|
|
f_scale=1.0,
|
|
max_nfev=args.max_iter,
|
|
callback=progress_callback,
|
|
)
|
|
|
|
robot_state, camera_params = unpack_parameters(result.x, len(views))
|
|
uncertainties = estimate_uncertainty(result, len(result.x))
|
|
|
|
link_transforms = compute_link_transforms(robot, robot_state, scale)
|
|
print_constraint_sanity_check(robot_state, robot_markers, link_transforms, robot, enabled_constraints, scale)
|
|
|
|
output = build_output(
|
|
robot_state,
|
|
uncertainties[:len(STATE_KEYS)],
|
|
views,
|
|
camera_params,
|
|
observations,
|
|
robot_markers,
|
|
scale,
|
|
robot,
|
|
robot_json_path,
|
|
quality_cfg,
|
|
final_cost=float(result.cost),
|
|
solver_status=int(result.status),
|
|
solver_message=str(result.message),
|
|
)
|
|
|
|
out_path = Path(args.outDir) / "multiview_pose.json"
|
|
save_json(output, out_path)
|
|
print(f"Saved: {out_path}")
|
|
|
|
if args.write_summary:
|
|
summary_path = Path(args.outDir) / "multiview_pose_summary.json"
|
|
summary = build_summary(output)
|
|
save_json(summary, summary_path)
|
|
print(f"Saved: {summary_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|