Files
appRobotRender/pipeline/2_Multiview.py
2026-05-29 07:44:06 +02:00

1159 lines
46 KiB
Python

#!/usr/bin/env python3
"""
Phase 1 — robust multiview robot pose estimation from aruco_detection.json + robot.json
This version keeps the original kinematic model and optimizer structure, but changes:
- observation weighting to a saturating factor model: min(1, q + f)
- quality indicators are normalized to 0..1
- blur/sharpness is supported but disabled by default (f=1)
- homography skew quality is added
- summary is built from the final output, so values stay consistent
- duplicate marker ids are warned about instead of being silently overwritten
Input:
--robot robot.json
--detections render_1a_aruco_detection.json ...
--outDir output
Output:
multiview_pose.json
multiview_pose_summary.json (optional)
"""
import argparse
import datetime as _dt
import json
import math
import os
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Tuple, Optional
import cv2
import numpy as np
from scipy.optimize import least_squares
STATE_KEYS = ["x", "y", "z", "a", "b", "c", "e"]
# -----------------------------------------------------------------------------
# Small helpers
# -----------------------------------------------------------------------------
def clamp01(x: float) -> float:
return float(max(0.0, min(1.0, x)))
def load_json(path: str) -> Dict[str, Any]:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def save_json(data: Dict[str, Any], path: Path) -> None:
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
def resolve_scalar(value: Any, default: float = 0.0) -> float:
if value is None:
return default
if isinstance(value, (int, float)):
return float(value)
try:
return float(str(value).strip())
except Exception:
return default
def resolve_vector(value: Any, default_len: int = 3) -> Tuple[float, ...]:
if value is None:
return tuple(0.0 for _ in range(default_len))
if isinstance(value, (int, float, str)):
return (resolve_scalar(value),) + tuple(0.0 for _ in range(default_len - 1))
if isinstance(value, (list, tuple)):
resolved = [resolve_scalar(v) for v in value]
if len(resolved) < default_len:
resolved.extend([0.0] * (default_len - len(resolved)))
return tuple(resolved[:default_len])
return tuple(0.0 for _ in range(default_len))
def parse_metric_scale(robot: Dict[str, Any]) -> float:
rendering_info = robot.get("renderingInfo", {}) or {}
metric = str(rendering_info.get("metric", "mm")).strip().lower()
return 0.001 if metric == "mm" else 1.0
def normalize_axis(axis: Any) -> np.ndarray:
vec = np.asarray(axis, dtype=np.float64).reshape(-1)[:3]
norm = np.linalg.norm(vec)
if norm < 1e-12:
return np.array([1.0, 0.0, 0.0], dtype=np.float64)
return vec / norm
def euler_deg_to_matrix(euler_deg: Any) -> np.ndarray:
x_deg, y_deg, z_deg = resolve_vector(euler_deg, 3)
x = math.radians(x_deg)
y = math.radians(y_deg)
z = math.radians(z_deg)
cx = math.cos(x)
sx = math.sin(x)
cy = math.cos(y)
sy = math.sin(y)
cz = math.cos(z)
sz = math.sin(z)
Rx = np.array([[1.0, 0.0, 0.0], [0.0, cx, -sx], [0.0, sx, cx]], dtype=np.float64)
Ry = np.array([[cy, 0.0, sy], [0.0, 1.0, 0.0], [-sy, 0.0, cy]], dtype=np.float64)
Rz = np.array([[cz, -sz, 0.0], [sz, cz, 0.0], [0.0, 0.0, 1.0]], dtype=np.float64)
return Rz @ Ry @ Rx
def transform_from_translation_rotation(translation: Any, rotation_deg: Any) -> np.ndarray:
T = np.eye(4, dtype=np.float64)
T[:3, 3] = np.asarray(resolve_vector(translation, 3), dtype=np.float64)
T[:3, :3] = euler_deg_to_matrix(rotation_deg)
return T
def axis_angle_matrix(axis: Any, angle_deg: float) -> np.ndarray:
axis_vec = normalize_axis(axis)
theta = math.radians(angle_deg)
kx, ky, kz = axis_vec
c = math.cos(theta)
s = math.sin(theta)
v = 1.0 - c
R = np.array([
[kx * kx * v + c, kx * ky * v - kz * s, kx * kz * v + ky * s],
[ky * kx * v + kz * s, ky * ky * v + c, ky * kz * v - kx * s],
[kz * kx * v - ky * s, kz * ky * v + kx * s, kz * kz * v + c],
], dtype=np.float64)
T = np.eye(4, dtype=np.float64)
T[:3, :3] = R
return T
# -----------------------------------------------------------------------------
# Optional quality configuration
# -----------------------------------------------------------------------------
@dataclass
class ObservationQualityConfig:
# q indicators are normalized to 0..1
size_ref_px: float = 50.0
border_ref_px: float = 120.0
center_ref_norm: float = 1.0
sharpness_ref: float = 2500.0
homography_ref: float = 0.18
# factor f scales the effect of each quality indicator:
# f = 1 -> indicator is fully active
# f = 0 -> indicator is ignored (neutral weight = 1)
size_factor: float = 1.0
aspect_factor: float = 1.0
border_factor: float = 1.0
center_factor: float = 1.0
sharpness_factor: float = 1.0
homography_factor: float = 1.0
# Placeholders for later phases
normal_visibility_factor: float = 1.0
spin_factor: float = 1.0
# Keep a tiny floor for weights so the optimizer remains numerically stable
weight_floor: float = 0.01
def _load_nested_quality_config(src: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(src, dict):
return {}
for key in ("multiview_quality","multiview_calculation", "multiviewQuality", "quality_config", "multiview"):
v = src.get(key)
if isinstance(v, dict):
return v
return {}
def load_quality_config(robot: Dict[str, Any]) -> ObservationQualityConfig:
cfg = ObservationQualityConfig()
candidates = []
candidates.append(_load_nested_quality_config(robot))
candidates.append(_load_nested_quality_config(robot.get("vision_config", {}) or {}))
for src in candidates:
if not src:
continue
for field_name in cfg.__dataclass_fields__.keys():
if field_name in src:
setattr(cfg, field_name, resolve_scalar(src.get(field_name), getattr(cfg, field_name)))
return cfg
# -----------------------------------------------------------------------------
# Marker extraction and kinematics
# -----------------------------------------------------------------------------
class ConstraintResult:
def __init__(self, name: str, enabled: bool, reason: str = ""):
self.name = name
self.enabled = enabled
self.reason = reason
self.residuals = []
def __str__(self) -> str:
status = "✓ ENABLED" if self.enabled else "✗ DISABLED"
return f"{self.name:40s} {status:12s} {self.reason}"
def build_link_chain(robot: Dict[str, Any]) -> List[str]:
links = robot.get("links", {}) or {}
ordered: List[str] = []
remaining = set(links.keys())
while remaining:
progress = False
for name in list(remaining):
parent = links[name].get("parent")
if not parent or parent in ordered:
ordered.append(name)
remaining.remove(name)
progress = True
if not progress:
raise RuntimeError("Cycle detected in robot link tree or missing parent link")
return ordered
def extract_markers(robot: Dict[str, Any], scale: float) -> Dict[int, Dict[str, Any]]:
markers: Dict[int, Dict[str, Any]] = {}
links = robot.get("links", {}) or {}
marker_defaults = (robot.get("renderingInfo", {}) or {}).get("markerDefaults", {}) or {}
default_size_mm = float(marker_defaults.get("size", 25.0))
for link_name, link_info in links.items():
for marker in link_info.get("markers", []) or []:
marker_id = int(marker.get("id", -1))
if marker_id < 0:
continue
if marker_id in markers:
# Duplicate ids exist in the provided robot.json. Keep the first entry and warn.
print(f"[WARN] Duplicate marker id {marker_id} on link '{link_name}'. Ignoring this duplicate entry.")
continue
pos = resolve_vector(marker.get("position", [0, 0, 0]), 3)
size_mm = float(marker.get("size", default_size_mm))
markers[marker_id] = {
"marker_id": marker_id,
"link_name": link_name,
"position_m": np.asarray([pos[0] * scale, pos[1] * scale, pos[2] * scale], dtype=np.float64),
"normal": normalize_axis(resolve_vector(marker.get("normal", [0, 0, 1]), 3)),
"spin_deg": float(marker.get("spin", 0.0)),
"size_m": size_mm * scale,
}
return markers
def compute_link_transforms(robot: Dict[str, Any], state: Dict[str, float], scale: float) -> Dict[str, np.ndarray]:
links = robot.get("links", {}) or {}
ordered_links = build_link_chain(robot)
transforms: Dict[str, np.ndarray] = {}
for link_name in ordered_links:
link_info = links[link_name] or {}
parent_name = link_info.get("parent")
parent_transform = transforms[parent_name] if parent_name else np.eye(4, dtype=np.float64)
mount_translation = np.asarray(resolve_vector(link_info.get("mountPosition", [0, 0, 0]), 3), dtype=np.float64) * scale
mount = transform_from_translation_rotation(mount_translation, link_info.get("mountRotation", [0, 0, 0]))
joint_info = link_info.get("jointToParent", {}) or {}
joint_origin = np.asarray(resolve_vector(joint_info.get("origin", [0, 0, 0]), 3), dtype=np.float64) * scale
joint = transform_from_translation_rotation(joint_origin, joint_info.get("rotation", [0, 0, 0]))
motion = np.eye(4, dtype=np.float64)
joint_type = str(joint_info.get("type", "fixed")).strip().lower()
control_var = str(joint_info.get("variable", joint_info.get("control", ""))).strip().lower()
axis = resolve_vector(joint_info.get("axis", [1, 0, 0]), 3)
if joint_type == "linear":
motion[:3, 3] = normalize_axis(axis) * state.get(control_var, 0.0) * scale
elif joint_type == "revolute":
motion = axis_angle_matrix(axis, state.get(control_var, 0.0))
transforms[link_name] = parent_transform @ mount @ joint @ motion
return transforms
def compute_marker_world_position(marker: Dict[str, Any], link_transforms: Dict[str, np.ndarray]) -> np.ndarray:
link_transform = link_transforms[marker["link_name"]]
local = np.ones(4, dtype=np.float64)
local[:3] = marker["position_m"]
world = link_transform @ local
return world[:3]
def marker_plane_axes(normal: np.ndarray, spin_deg: float) -> Tuple[np.ndarray, np.ndarray]:
n = normalize_axis(normal)
candidate = np.array((0.0, 0.0, 1.0), dtype=np.float64)
if abs(np.dot(n, candidate)) > 0.99:
candidate = np.array((1.0, 0.0, 0.0), dtype=np.float64)
x_dir = np.cross(candidate, n)
x_dir /= max(np.linalg.norm(x_dir), 1e-9)
y_dir = np.cross(n, x_dir)
if abs(spin_deg) > 1e-9:
theta = math.radians(spin_deg)
cos_t = math.cos(theta)
sin_t = math.sin(theta)
x_rot = x_dir * cos_t + np.cross(n, x_dir) * sin_t + n * np.dot(n, x_dir) * (1.0 - cos_t)
y_rot = y_dir * cos_t + np.cross(n, y_dir) * sin_t + n * np.dot(n, y_dir) * (1.0 - cos_t)
return x_rot, y_rot
return x_dir, y_dir
def marker_object_corners(marker: Dict[str, Any]) -> np.ndarray:
half = marker["size_m"] * 0.5
x_dir, y_dir = marker_plane_axes(marker["normal"], marker["spin_deg"])
corners = np.stack([
-x_dir * half + y_dir * half,
x_dir * half + y_dir * half,
x_dir * half - y_dir * half,
-x_dir * half - y_dir * half,
], axis=0)
return marker["position_m"].reshape(1, 3) + corners
def compute_marker_world_corners(marker: Dict[str, Any], link_transforms: Dict[str, np.ndarray]) -> np.ndarray:
link_transform = link_transforms[marker["link_name"]]
local = marker_object_corners(marker)
homogeneous = np.concatenate([local, np.ones((local.shape[0], 1), dtype=np.float64)], axis=1)
world = (link_transform @ homogeneous.T).T
return world[:, :3]
# -----------------------------------------------------------------------------
# Quality model
# -----------------------------------------------------------------------------
def quality_factor(q: float, f: float) -> float:
# Interpret f in [0, 1] as the per-indicator influence.
# f = 1.0 -> indicator is fully active; q is applied.
# f = 0.0 -> indicator is ignored and contributes a neutral weight of 1.0.
q = clamp01(q)
f = clamp01(f)
return 1.0 - f + f * q
def projective_homography_quality(image_points_px: np.ndarray, image_shape: Tuple[int, int], ref: float) -> float:
if image_points_px is None or len(image_points_px) != 4:
return 1.0
h, w = image_shape
if h <= 0 or w <= 0:
return 1.0
src = np.array([[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]], dtype=np.float32)
dst = np.asarray(image_points_px, dtype=np.float32).copy()
dst[:, 0] /= float(w)
dst[:, 1] /= float(h)
try:
H = cv2.getPerspectiveTransform(src, dst).astype(np.float64)
if abs(H[2, 2]) > 1e-12:
H = H / H[2, 2]
proj_strength = float(abs(H[2, 0]) + abs(H[2, 1]))
q = 1.0 / (1.0 + proj_strength / max(ref, 1e-6))
return clamp01(q)
except Exception:
return 1.0
def compute_observation_quality(
det: Dict[str, Any],
image_shape: Tuple[int, int],
cfg: ObservationQualityConfig,
) -> Dict[str, Any]:
quality = det.get("quality", {}) or {}
geometry = quality.get("geometry", {}) or {}
sharpness = quality.get("sharpness", {}) or {}
edge_lengths = quality.get("edge_lengths_px", []) or []
edge_lengths = [float(x) for x in edge_lengths if x is not None]
mean_edge_px = float(np.mean(edge_lengths)) if len(edge_lengths) else math.sqrt(max(float(quality.get("area_px", 0.0)), 0.0))
edge_ratio = float(quality.get("edge_ratio", 1.0) or 1.0)
distance_to_border_px = float(geometry.get("distance_to_border_px", 0.0) or 0.0)
distance_to_center_norm = float(geometry.get("distance_to_center_norm", 1.0) or 1.0)
laplacian_var = float(sharpness.get("laplacian_var", 0.0) or 0.0)
# q in 0..1
q_size = clamp01(mean_edge_px / max(cfg.size_ref_px, 1e-6))
q_aspect = clamp01(2.0 / (1.0 + max(edge_ratio, 1e-6)))
q_border = clamp01(distance_to_border_px / max(cfg.border_ref_px, 1e-6))
q_center = clamp01(1.0 - (distance_to_center_norm / max(cfg.center_ref_norm, 1e-6)))
q_sharpness = clamp01(laplacian_var / max(cfg.sharpness_ref, 1e-6))
q_homography = projective_homography_quality(np.asarray(det.get("image_points_px", []), dtype=np.float64), image_shape, cfg.homography_ref)
factor_map = {
"size": quality_factor(q_size, cfg.size_factor),
"aspect": quality_factor(q_aspect, cfg.aspect_factor),
"border": quality_factor(q_border, cfg.border_factor),
"center": quality_factor(q_center, cfg.center_factor),
"sharpness": quality_factor(q_sharpness, cfg.sharpness_factor),
"homography": quality_factor(q_homography, cfg.homography_factor),
}
# Currently not active in phase 1, but already supported for later phases.
# They default to f=1, which makes them neutral.
q_normal_visibility = 1.0
q_spin = 1.0
factor_map["normal_visibility"] = quality_factor(q_normal_visibility, cfg.normal_visibility_factor)
factor_map["spin"] = quality_factor(q_spin, cfg.spin_factor)
weight_multiplier = 1.0
for v in factor_map.values():
weight_multiplier *= float(v)
# Conservative default: if no factor is activated in robot.json,
# this remains effectively neutral.
detector_confidence = clamp01(float(det.get("confidence", 1.0) or 1.0))
weighted_confidence = detector_confidence * weight_multiplier
weighted_confidence = max(cfg.weight_floor, min(1.0, weighted_confidence))
return {
"detector_confidence": detector_confidence,
"weighted_confidence": weighted_confidence,
"q": {
"size": q_size,
"aspect": q_aspect,
"border": q_border,
"center": q_center,
"sharpness": q_sharpness,
"homography": q_homography,
"normal_visibility": q_normal_visibility,
"spin": q_spin,
},
"factor": factor_map,
"weight_multiplier": weight_multiplier,
"raw": {
"mean_edge_px": mean_edge_px,
"edge_ratio": edge_ratio,
"distance_to_border_px": distance_to_border_px,
"distance_to_center_norm": distance_to_center_norm,
"laplacian_var": laplacian_var,
},
}
# -----------------------------------------------------------------------------
# Constraints (kept from the existing approach)
# -----------------------------------------------------------------------------
def validate_constraints(robot: Dict[str, Any], robot_markers: Dict[int, Dict[str, Any]]) -> Dict[str, ConstraintResult]:
results = {}
rigid_body_result = ConstraintResult("RigidBodyDistances", False)
try:
rigid_body_count = 0
for link_name in ["Arm1", "Ellbow", "Arm2"]:
link_markers = [m for m in robot_markers.values() if m["link_name"] == link_name]
if len(link_markers) >= 2:
rigid_body_count += 1
if rigid_body_count >= 2:
rigid_body_result.enabled = True
rigid_body_result.reason = f"Found {rigid_body_count} links with 2+ markers each"
else:
rigid_body_result.reason = "Not enough rigid links with multiple markers"
except Exception as e:
rigid_body_result.reason = f"Error: {str(e)}"
results["RigidBodyDistances"] = rigid_body_result
inter_link_x_result = ConstraintResult("InterLinkXDistances", False)
try:
links_with_markers = set(m["link_name"] for m in robot_markers.values())
x_rotated_links = []
for link_name in ["Arm1", "Ellbow"]:
if link_name in links_with_markers:
link_markers = [m for m in robot_markers.values() if m["link_name"] == link_name]
if len(link_markers) >= 1:
x_rotated_links.append(link_name)
if len(x_rotated_links) >= 2:
inter_link_x_result.enabled = True
inter_link_x_result.reason = f"Found {len(x_rotated_links)} X-rotation links: {', '.join(x_rotated_links)}"
else:
inter_link_x_result.reason = "Not enough X-rotation links"
except Exception as e:
inter_link_x_result.reason = f"Error: {str(e)}"
results["InterLinkXDistances"] = inter_link_x_result
arm2_sina_result = ConstraintResult("Arm2SinADependency", True, "Sanity check only (not enforced)")
try:
arm2_markers = [m for m in robot_markers.values() if m["link_name"] == "Arm2"]
if len(arm2_markers) >= 2:
z_values = set(float(m["position_m"][2]) for m in arm2_markers)
if len(z_values) > 1:
arm2_sina_result.enabled = True
arm2_sina_result.reason = "Multiple Z-values detected; sin(a) dependency confirmed"
else:
arm2_sina_result.enabled = False
arm2_sina_result.reason = "No Z-variation in Arm2 markers (cannot use sin(a) constraint)"
else:
arm2_sina_result.enabled = False
arm2_sina_result.reason = "Not enough Arm2 markers"
except Exception as e:
arm2_sina_result.reason = f"Error: {str(e)}"
results["Arm2SinADependency"] = arm2_sina_result
return results
def compute_soft_constraint_residuals(
robot_state: Dict[str, float],
robot_markers: Dict[int, Dict[str, Any]],
link_transforms: Dict[str, np.ndarray],
robot: Dict[str, Any],
enabled_constraints: Dict[str, ConstraintResult],
) -> List[float]:
residuals = []
weight_scale = 0.1
if enabled_constraints["RigidBodyDistances"].enabled:
for link_name in ["Arm1", "Ellbow", "Arm2"]:
link_markers = [m for m in robot_markers.values() if m["link_name"] == link_name]
if len(link_markers) < 2:
continue
for i in range(len(link_markers)):
for j in range(i + 1, len(link_markers)):
m_i = link_markers[i]
m_j = link_markers[j]
pos_i = compute_marker_world_position(m_i, link_transforms)
pos_j = compute_marker_world_position(m_j, link_transforms)
dist_world = np.linalg.norm(pos_i - pos_j)
dist_local = np.linalg.norm(m_i["position_m"] - m_j["position_m"])
error = dist_world - dist_local
residuals.append(error * weight_scale * 0.1)
if enabled_constraints["InterLinkXDistances"].enabled:
arm1_markers = [m for m in robot_markers.values() if m["link_name"] == "Arm1"]
ellbow_markers = [m for m in robot_markers.values() if m["link_name"] == "Ellbow"]
if len(arm1_markers) >= 1 and len(ellbow_markers) >= 1:
m_arm1 = arm1_markers[0]
m_ellbow = ellbow_markers[0]
pos_arm1 = compute_marker_world_position(m_arm1, link_transforms)
pos_ellbow = compute_marker_world_position(m_ellbow, link_transforms)
x_diff_world = pos_ellbow[0] - pos_arm1[0]
x_diff_ref = m_ellbow["position_m"][0] - m_arm1["position_m"][0]
residuals.append((x_diff_world - x_diff_ref) * weight_scale)
return residuals
# -----------------------------------------------------------------------------
# Camera / observation helpers
# -----------------------------------------------------------------------------
def load_intrinsics(detection_json: Dict[str, Any]) -> Tuple[np.ndarray, np.ndarray]:
cam = detection_json["camera"]
K = np.asarray(cam["camera_matrix"], dtype=np.float64)
D = np.asarray(cam.get("distortion_coefficients", [0, 0, 0, 0, 0]), dtype=np.float64).reshape(-1, 1)
return K, D
def detection_image_shape(detection_json: Dict[str, Any]) -> Tuple[int, int]:
image = detection_json.get("image", {}) or {}
h = int(image.get("height_px", detection_json.get("height_px", 720)) or 720)
w = int(image.get("width_px", detection_json.get("width_px", 1280)) or 1280)
return h, w
def collect_views_and_observations(
detection_files: List[str],
robot_markers: Dict[int, Dict[str, Any]],
quality_cfg: ObservationQualityConfig,
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
views: List[Dict[str, Any]] = []
observations: List[Dict[str, Any]] = []
for idx, det_path in enumerate(detection_files):
detection_json = load_json(det_path)
K, D = load_intrinsics(detection_json)
image_shape = detection_image_shape(detection_json)
views.append({
"index": idx,
"source_file": os.path.abspath(det_path),
"camera_id": detection_json.get("camera", {}).get("camera_id", f"cam{idx+1}"),
"image_file": detection_json.get("image", {}).get("image_file"),
"image_shape": image_shape,
"K": K,
"D": D,
})
for det in detection_json.get("detections", []) or []:
if str(det.get("type", "aruco")).lower() != "aruco":
continue
marker_id = int(det.get("marker_id", -1))
if marker_id < 0 or marker_id not in robot_markers:
continue
image_points = det.get("image_points_px")
if not (isinstance(image_points, list) and len(image_points) == 4):
# Phase 1 uses full marker corners only.
continue
image_points = np.asarray(image_points, dtype=np.float64)
marker = robot_markers[marker_id]
obs_quality = compute_observation_quality(det, image_shape, quality_cfg)
observations.append({
"view_index": idx,
"marker_id": marker_id,
"marker_link_corners": marker_object_corners(marker),
"image_points_px": image_points,
"confidence_base": obs_quality["detector_confidence"],
"confidence": obs_quality["weighted_confidence"],
"quality": obs_quality,
"raw_detection": det,
})
if len(views) == 0:
raise RuntimeError("No valid detection views found")
if len(observations) == 0:
raise RuntimeError("No marker observations matched robot.json markers")
return views, observations
def initial_camera_guess(
view: Dict[str, Any],
observations: List[Dict[str, Any]],
robot_markers: Dict[int, Dict[str, Any]],
default_state: Dict[str, float],
scale: float,
robot: Dict[str, Any],
) -> Tuple[np.ndarray, np.ndarray]:
object_points = []
image_points = []
link_transforms = compute_link_transforms(robot, default_state, scale)
for obs in observations:
if obs["view_index"] != view["index"]:
continue
marker = robot_markers[obs["marker_id"]]
object_points.append(compute_marker_world_corners(marker, link_transforms))
image_points.append(obs["image_points_px"])
if len(object_points) == 0:
return np.zeros((3, 1), dtype=np.float64), np.array([[0.0], [0.0], [1.0]], dtype=np.float64)
object_points = np.vstack(object_points)
image_points = np.vstack(image_points)
if object_points.shape[0] < 4:
return np.zeros((3, 1), dtype=np.float64), np.array([[0.0], [0.0], [1.0]], dtype=np.float64)
success, rvec, tvec = cv2.solvePnP(
object_points,
image_points,
view["K"],
view["D"],
flags=cv2.SOLVEPNP_ITERATIVE,
)
if not success:
return np.zeros((3, 1), dtype=np.float64), np.array([[0.0], [0.0], [1.0]], dtype=np.float64)
return rvec, tvec
def project_points(points_3d: np.ndarray, rvec: np.ndarray, tvec: np.ndarray, K: np.ndarray, D: np.ndarray) -> np.ndarray:
projected, _ = cv2.projectPoints(points_3d, rvec, tvec, K, D)
return projected.reshape(-1, 2)
# -----------------------------------------------------------------------------
# Optimization
# -----------------------------------------------------------------------------
def pack_parameters(robot_state: Dict[str, float], camera_params: List[Tuple[np.ndarray, np.ndarray]]) -> np.ndarray:
state_vec = np.asarray([robot_state[k] for k in STATE_KEYS], dtype=np.float64)
cams = []
for rvec, tvec in camera_params:
cams.append(rvec.reshape(3))
cams.append(tvec.reshape(3))
return np.concatenate([state_vec] + cams)
def unpack_parameters(params: np.ndarray, n_views: int) -> Tuple[Dict[str, float], List[Tuple[np.ndarray, np.ndarray]]]:
robot_state = {STATE_KEYS[i]: float(params[i]) for i in range(len(STATE_KEYS))}
camera_params = []
offset = len(STATE_KEYS)
for _ in range(n_views):
rvec = params[offset:offset + 3].reshape(3, 1)
tvec = params[offset + 3:offset + 6].reshape(3, 1)
camera_params.append((rvec, tvec))
offset += 6
return robot_state, camera_params
def residuals_for_parameters(
params: np.ndarray,
views: List[Dict[str, Any]],
observations: List[Dict[str, Any]],
robot_markers: Dict[int, Dict[str, Any]],
robot: Dict[str, Any],
scale: float,
default_state: Dict[str, float],
enabled_constraints: Dict[str, ConstraintResult],
) -> np.ndarray:
robot_state, camera_params = unpack_parameters(params, len(views))
link_transforms = compute_link_transforms(robot, robot_state, scale)
residuals = []
for obs in observations:
marker = robot_markers[obs["marker_id"]]
world_corners = compute_marker_world_corners(marker, link_transforms)
rvec, tvec = camera_params[obs["view_index"]]
proj = project_points(world_corners, rvec, tvec, views[obs["view_index"]]["K"], views[obs["view_index"]]["D"])
diffs = proj - obs["image_points_px"]
weight = math.sqrt(max(float(obs["confidence"]), 1e-9))
residuals.extend((diffs * weight).reshape(-1))
for key in STATE_KEYS:
diff = robot_state[key] - default_state.get(key, 0.0)
w = 0.001 if key in ("x", "y", "z", "e") else 0.01
residuals.append(diff * w)
residuals.extend(compute_soft_constraint_residuals(robot_state, robot_markers, link_transforms, robot, enabled_constraints))
return np.asarray(residuals, dtype=np.float64)
def estimate_uncertainty(result: Any, n_params: int) -> np.ndarray:
if result.jac is None:
return np.full(n_params, float("nan"), dtype=np.float64)
J = result.jac
m, n = J.shape
JTJ = J.T @ J
try:
cov = np.linalg.pinv(JTJ)
except np.linalg.LinAlgError:
cov = np.linalg.pinv(JTJ + np.eye(n) * 1e-9)
residuals = result.fun
dof = max(1, m - n)
sigma2 = float(np.sum(residuals ** 2) / dof)
cov *= sigma2
return np.sqrt(np.diag(cov))
def camera_position_world(rvec: np.ndarray, tvec: np.ndarray) -> np.ndarray:
R, _ = cv2.Rodrigues(rvec)
return (-R.T @ tvec).reshape(3)
# -----------------------------------------------------------------------------
# Output building
# -----------------------------------------------------------------------------
def build_output(
robot_state: Dict[str, float],
state_uncertainty: np.ndarray,
views: List[Dict[str, Any]],
camera_params: List[Tuple[np.ndarray, np.ndarray]],
observations: List[Dict[str, Any]],
robot_markers: Dict[int, Dict[str, Any]],
scale: float,
robot: Dict[str, Any],
robot_json_path: str,
quality_cfg: ObservationQualityConfig,
final_cost: Optional[float] = None,
solver_status: Optional[int] = None,
solver_message: Optional[str] = None,
) -> Dict[str, Any]:
link_transforms = compute_link_transforms(robot, robot_state, scale)
marker_summary: Dict[int, Dict[str, Any]] = {}
for marker_id, marker in robot_markers.items():
marker_summary[marker_id] = {
"marker_id": marker_id,
"link_name": marker["link_name"],
"position_world_m": compute_marker_world_position(marker, link_transforms).tolist(),
"size_m": marker["size_m"],
"observation_count": 0,
"mean_confidence": None,
"mean_detector_confidence": None,
"mean_reprojection_error_px": None,
"observations": [],
}
per_marker_errors: Dict[int, List[float]] = {mid: [] for mid in marker_summary}
per_marker_confidences: Dict[int, List[float]] = {mid: [] for mid in marker_summary}
per_marker_detector_conf: Dict[int, List[float]] = {mid: [] for mid in marker_summary}
for obs in observations:
marker_id = obs["marker_id"]
marker = robot_markers[marker_id]
object_points_m = compute_marker_world_corners(marker, link_transforms)
rvec, tvec = camera_params[obs["view_index"]]
proj = project_points(object_points_m, rvec, tvec, views[obs["view_index"]]["K"], views[obs["view_index"]]["D"])
diffs = proj - obs["image_points_px"]
errors = np.linalg.norm(diffs, axis=1)
repro_error = float(np.mean(errors))
per_marker_errors[marker_id].extend(errors.tolist())
per_marker_confidences[marker_id].append(float(obs["confidence"]))
per_marker_detector_conf[marker_id].append(float(obs["confidence_base"]))
marker_summary[marker_id]["observation_count"] += 1
marker_summary[marker_id]["observations"].append({
"view_index": obs["view_index"],
"source_file": views[obs["view_index"]]["source_file"],
"image_file": views[obs["view_index"]]["image_file"],
"confidence_detector": float(obs["confidence_base"]),
"confidence_weighted": float(obs["confidence"]),
"quality": obs["quality"],
"mean_reprojection_error_px": repro_error,
"corner_reprojection_errors_px": errors.tolist(),
})
for marker_id, summary in marker_summary.items():
if summary["observation_count"] > 0:
summary["mean_confidence"] = float(np.mean(per_marker_confidences[marker_id]))
summary["mean_detector_confidence"] = float(np.mean(per_marker_detector_conf[marker_id]))
summary["mean_reprojection_error_px"] = float(np.mean(per_marker_errors[marker_id]))
camera_outputs = []
for idx, view in enumerate(views):
rvec, tvec = camera_params[idx]
cam_pos = camera_position_world(rvec, tvec)
observed_count = sum(1 for obs in observations if obs["view_index"] == idx)
camera_outputs.append({
"view_index": idx,
"source_file": view["source_file"],
"camera_id": view["camera_id"],
"camera_position_world_m": cam_pos.tolist(),
"rvec": rvec.reshape(-1).tolist(),
"tvec": tvec.reshape(-1).tolist(),
"intrinsics": {
"camera_matrix": view["K"].tolist(),
"distortion_coefficients": view["D"].reshape(-1).tolist(),
},
"observation_count": observed_count,
})
robot_pose_output = {
"state": {k: float(robot_state[k]) for k in STATE_KEYS},
"uncertainty": {
"x_mm": float(state_uncertainty[0]),
"y_mm": float(state_uncertainty[1]),
"z_mm": float(state_uncertainty[2]),
"a_deg": float(state_uncertainty[3]),
"b_deg": float(state_uncertainty[4]),
"c_deg": float(state_uncertainty[5]),
"e_mm": float(state_uncertainty[6]),
},
"confidence": {
"x": float(math.exp(-state_uncertainty[0] / 10.0)) if np.isfinite(state_uncertainty[0]) else 0.0,
"y": float(math.exp(-state_uncertainty[1] / 10.0)) if np.isfinite(state_uncertainty[1]) else 0.0,
"z": float(math.exp(-state_uncertainty[2] / 10.0)) if np.isfinite(state_uncertainty[2]) else 0.0,
"a": float(math.exp(-state_uncertainty[3] / 10.0)) if np.isfinite(state_uncertainty[3]) else 0.0,
"b": float(math.exp(-state_uncertainty[4] / 10.0)) if np.isfinite(state_uncertainty[4]) else 0.0,
"c": float(math.exp(-state_uncertainty[5] / 10.0)) if np.isfinite(state_uncertainty[5]) else 0.0,
"e": float(math.exp(-state_uncertainty[6] / max(1.0, state_uncertainty[6]))) if np.isfinite(state_uncertainty[6]) else 0.0,
},
}
all_conf = np.asarray([obs["confidence"] for obs in observations], dtype=np.float64)
all_det_conf = np.asarray([obs["confidence_base"] for obs in observations], dtype=np.float64)
all_q_size = np.asarray([obs["quality"]["q"]["size"] for obs in observations], dtype=np.float64)
all_q_aspect = np.asarray([obs["quality"]["q"]["aspect"] for obs in observations], dtype=np.float64)
all_q_border = np.asarray([obs["quality"]["q"]["border"] for obs in observations], dtype=np.float64)
all_q_homography = np.asarray([obs["quality"]["q"]["homography"] for obs in observations], dtype=np.float64)
all_errors = []
for marker in marker_summary.values():
if marker["mean_reprojection_error_px"] is not None:
all_errors.append(marker["mean_reprojection_error_px"])
statistics = {
"observation_count": len(observations),
"camera_count": len(views),
"marker_count": len(robot_markers),
"observed_marker_count": int(sum(1 for m in marker_summary.values() if m["observation_count"] > 0)),
"mean_detector_confidence": float(np.mean(all_det_conf)) if len(all_det_conf) else None,
"mean_weighted_confidence": float(np.mean(all_conf)) if len(all_conf) else None,
"mean_reprojection_error_px": float(np.mean(all_errors)) if len(all_errors) else None,
"quality_means": {
"size": float(np.mean(all_q_size)) if len(all_q_size) else None,
"aspect": float(np.mean(all_q_aspect)) if len(all_q_aspect) else None,
"border": float(np.mean(all_q_border)) if len(all_q_border) else None,
"homography": float(np.mean(all_q_homography)) if len(all_q_homography) else None,
},
"quality_config": {
"size_ref_px": quality_cfg.size_ref_px,
"border_ref_px": quality_cfg.border_ref_px,
"center_ref_norm": quality_cfg.center_ref_norm,
"sharpness_ref": quality_cfg.sharpness_ref,
"homography_ref": quality_cfg.homography_ref,
"size_factor": quality_cfg.size_factor,
"aspect_factor": quality_cfg.aspect_factor,
"border_factor": quality_cfg.border_factor,
"center_factor": quality_cfg.center_factor,
"sharpness_factor": quality_cfg.sharpness_factor,
"homography_factor": quality_cfg.homography_factor,
},
}
output = {
"schema_version": "1.0",
"created_utc": _dt.datetime.utcnow().isoformat() + "Z",
"source_robot_json": os.path.abspath(robot_json_path),
"source_detections": [view["source_file"] for view in views],
"robot_pose": robot_pose_output,
"camera_poses": camera_outputs,
"marker_positions": list(marker_summary.values()),
"statistics": statistics,
"solver": {
"final_cost": final_cost,
"status": solver_status,
"message": solver_message,
},
}
return output
def build_summary(output: Dict[str, Any]) -> Dict[str, Any]:
return {
"schema_version": output.get("schema_version"),
"created_utc": output.get("created_utc"),
"source_robot_json": output.get("source_robot_json"),
"source_detections": output.get("source_detections"),
"solver": output.get("solver", {}),
"robot_pose": output.get("robot_pose"),
"statistics": output.get("statistics", {}),
}
# -----------------------------------------------------------------------------
# Diagnostics
# -----------------------------------------------------------------------------
def print_constraint_sanity_check(
robot_state: Dict[str, float],
robot_markers: Dict[int, Dict[str, Any]],
link_transforms: Dict[str, np.ndarray],
robot: Dict[str, Any],
enabled_constraints: Dict[str, ConstraintResult],
scale: float,
) -> None:
print("\n" + "=" * 70)
print("CONSTRAINT SANITY CHECKS (after optimization)")
print("=" * 70)
if enabled_constraints["RigidBodyDistances"].enabled:
print("\n1. RIGID BODY DISTANCES")
for link_name in ["Arm1", "Ellbow", "Arm2"]:
link_markers = [m for m in robot_markers.values() if m["link_name"] == link_name]
if len(link_markers) < 2:
continue
max_error = 0.0
for i in range(len(link_markers)):
for j in range(i + 1, len(link_markers)):
m_i = link_markers[i]
m_j = link_markers[j]
pos_i = compute_marker_world_position(m_i, link_transforms)
pos_j = compute_marker_world_position(m_j, link_transforms)
dist_world = np.linalg.norm(pos_i - pos_j)
dist_local = np.linalg.norm(m_i["position_m"] - m_j["position_m"])
max_error = max(max_error, abs(dist_world - dist_local))
status = "" if max_error < 1.0 else "" if max_error < 5.0 else ""
print(f" {link_name:10s}: max_error = {max_error:.3f} mm {status}")
if enabled_constraints["InterLinkXDistances"].enabled:
print("\n2. INTER-LINK X-DISTANCES")
arm1_markers = [m for m in robot_markers.values() if m["link_name"] == "Arm1"]
ellbow_markers = [m for m in robot_markers.values() if m["link_name"] == "Ellbow"]
if len(arm1_markers) >= 1 and len(ellbow_markers) >= 1:
m_arm1 = arm1_markers[0]
m_ellbow = ellbow_markers[0]
pos_arm1 = compute_marker_world_position(m_arm1, link_transforms)
pos_ellbow = compute_marker_world_position(m_ellbow, link_transforms)
x_diff_world = pos_ellbow[0] - pos_arm1[0]
x_diff_ref = m_ellbow["position_m"][0] - m_arm1["position_m"][0]
error = abs(x_diff_world - x_diff_ref)
status = "" if error < 1.0 else "" if error < 5.0 else ""
print(f" Arm1 <-> Ellbow: error = {error:.3f} mm {status}")
if enabled_constraints["Arm2SinADependency"].enabled:
print("\n3. ARM2 sin(a) DEPENDENCY (sanity check)")
arm2_markers = [m for m in robot_markers.values() if m["link_name"] == "Arm2"]
if len(arm2_markers) >= 2:
a_rad = math.radians(robot_state["a"])
sin_a = math.sin(a_rad)
cos_a = math.cos(a_rad)
max_error = 0.0
# This remains only a qualitative check.
for m in arm2_markers:
pos_world = compute_marker_world_position(m, link_transforms)
x_world = pos_world[0]
x_local = m["position_m"][0]
z_local = m["position_m"][2]
x_expected = (90.0 * scale) + x_local * cos_a - z_local * sin_a
max_error = max(max_error, abs(x_world - x_expected))
status = "" if max_error < 5.0 else ""
print(f" X-consistency with sin(a): max_error = {max_error:.3f} mm {status}")
print(" (Note: this is a consistency check, not a hard constraint)")
print("=" * 70)
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="Multiview optimization of robot pose and camera extrinsics")
parser.add_argument("--robot", required=True, help="Path to robot.json")
parser.add_argument("--detections", required=True, nargs="+", help="List of detection JSON files")
parser.add_argument("--outDir", required=True, help="Output directory")
parser.add_argument("--write-summary", action="store_true", help="Write summary file")
parser.add_argument("--max-iter", type=int, default=500, help="Maximum optimizer iterations")
args = parser.parse_args()
os.makedirs(args.outDir, exist_ok=True)
robot_json_path = os.path.abspath(args.robot)
robot = load_json(robot_json_path)
scale = parse_metric_scale(robot)
quality_cfg = load_quality_config(robot)
default_state = {k: float(robot.get("defaultPosition", {}).get(k, 0.0) or 0.0) for k in STATE_KEYS}
robot_markers = extract_markers(robot, scale)
print("\n" + "=" * 70)
print("CONSTRAINT VALIDATION")
print("=" * 70)
enabled_constraints = validate_constraints(robot, robot_markers)
for _, result in enabled_constraints.items():
print(result)
print("=" * 70)
views, observations = collect_views_and_observations(args.detections, robot_markers, quality_cfg)
print("\n" + "=" * 70)
print("OBSERVATION QUALITY SUMMARY")
print("=" * 70)
print(f"Total observations: {len(observations)}")
print()
quality_by_marker: Dict[int, List[Dict[str, Any]]] = {}
for obs in observations:
quality_by_marker.setdefault(obs["marker_id"], []).append(obs["quality"])
print(f"{'Marker':>8} {'Link':>12} {'Count':>6} {'Avg Size':>10} {'Avg Aspec':>10} {'Avg Hmg.':>10} {'Avg Conf.':>10}")
print("-" * 74)
for marker_id in sorted(quality_by_marker.keys()):
marker = robot_markers[marker_id]
qlist = quality_by_marker[marker_id]
avg_size = float(np.mean([q["q"]["size"] for q in qlist]))
avg_aspect = float(np.mean([q["q"]["aspect"] for q in qlist]))
avg_homog = float(np.mean([q["q"]["homography"] for q in qlist]))
obs_for_marker = [o for o in observations if o["marker_id"] == marker_id]
avg_conf = float(np.mean([o["confidence"] for o in obs_for_marker]))
print(f"{marker_id:8d} {marker['link_name']:>12} {len(qlist):6d} {avg_size:10.3f} {avg_aspect:10.3f} {avg_homog:10.3f} {avg_conf:10.3f}")
print("=" * 70)
camera_guesses = []
for view in views:
rvec, tvec = initial_camera_guess(view, observations, robot_markers, default_state, scale, robot)
camera_guesses.append((rvec, tvec))
x0 = pack_parameters(default_state, camera_guesses)
progress = {
"iter": 0,
"last_cost": None,
"last_print": time.time(),
"prev_x": x0.copy(),
}
def progress_callback(xk: np.ndarray) -> None:
progress["iter"] += 1
now = time.time()
if progress["iter"] == 1 or now - progress["last_print"] >= 1.0:
res = residuals_for_parameters(xk, views, observations, robot_markers, robot, scale, default_state, enabled_constraints)
cost = 0.5 * float(np.dot(res, res))
delta_cost = None
convergence = ""
if progress["last_cost"] is not None:
delta_cost = cost - progress["last_cost"]
if abs(delta_cost) < 1e-3:
convergence = " stable"
elif delta_cost < 0:
convergence = " improving"
else:
convergence = " worsening"
step_norm = float(np.linalg.norm(xk - progress["prev_x"]))
print(f'[Multiview] iter={progress["iter"]:4d} cost={cost:.4f}' + (f' delta={delta_cost:.4g}' if delta_cost is not None else "") + f' step={step_norm:.4g}' + convergence)
progress["last_cost"] = cost
progress["last_print"] = now
progress["prev_x"] = xk.copy()
result = least_squares(
residuals_for_parameters,
x0,
args=(views, observations, robot_markers, robot, scale, default_state, enabled_constraints),
jac="2-point",
method="trf",
loss="soft_l1",
f_scale=1.0,
max_nfev=args.max_iter,
callback=progress_callback,
)
robot_state, camera_params = unpack_parameters(result.x, len(views))
uncertainties = estimate_uncertainty(result, len(result.x))
link_transforms = compute_link_transforms(robot, robot_state, scale)
print_constraint_sanity_check(robot_state, robot_markers, link_transforms, robot, enabled_constraints, scale)
output = build_output(
robot_state,
uncertainties[:len(STATE_KEYS)],
views,
camera_params,
observations,
robot_markers,
scale,
robot,
robot_json_path,
quality_cfg,
final_cost=float(result.cost),
solver_status=int(result.status),
solver_message=str(result.message),
)
out_path = Path(args.outDir) / "multiview_pose.json"
save_json(output, out_path)
print(f"Saved: {out_path}")
if args.write_summary:
summary_path = Path(args.outDir) / "multiview_pose_summary.json"
summary = build_summary(output)
save_json(summary, summary_path)
print(f"Saved: {summary_path}")
if __name__ == "__main__":
main()