Files
appRobotRender/pipeline/3_multiview_bundle_adjustment_v5.py
2026-05-31 17:39:25 +02:00

1463 lines
53 KiB
Python

#!/usr/bin/env python3
"""
3_multiview_bundle_adjustment_v4.py
Multi-view ArUco marker position optimization with explicit, switchable
degrees-of-freedom constraints.
Mathematical model
------------------
We estimate 3D marker positions X_i ∈ R^3 by minimizing
E(X) =
Σ_{i,c} w_ic || π_c(X_i) - u_ic ||²
+ λ_r Σ_j w_j^r || ||X_a - X_b|| - d_j ||²
+ λ_rev Σ_k w_k^rev || (X_b - X_a)·a_k - t_k ||²
+ λ_pri Σ_m w_m^pri ( ||(X_b - X_a)·u_m - t_u||²
+ ||(X_b - X_a)·v_m - t_v||² )
where:
- u_ic are observed normalized image coordinates for marker i in camera c
- π_c(.) is the normalized reprojection model
- w_ic are observation weights from detection quality / marker priors / range
- rigid-link constraints preserve internal marker geometry of a link
- revolute joints keep the projection along the joint axis constant
- prismatic joints keep the two orthogonal projection components constant
Important design choices
------------------------
- robot.json is used as a kinematic description, not as a direct source of
world-space marker positions.
- constraint families are explicit, switchable, and easy to compare across
versions.
- legacy chain-propagation constraints are retained only as an optional family
and are OFF by default.
- observation weighting remains separate from constraint weighting so both can
be tested independently.
Dependencies:
numpy, opencv-python, scipy (optional for optimization)
Example:
python 3_multiview_bundle_adjustment_v4.py ^
-det cam1_aruco_detection.json cam2_aruco_detection.json cam3_aruco_detection.json ^
-pose cam1_camera_pose.json cam2_camera_pose.json cam3_camera_pose.json ^
-robot robot.json ^
-lambdaWeight 100.0
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from dataclasses import dataclass
from itertools import combinations
from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy as np
# ===================================================================
# Path / JSON helpers
# ===================================================================
def resolve_path(path: str) -> str:
path = os.path.expanduser(path)
if os.path.isabs(path):
return path
return os.path.abspath(path)
def load_json(path: str) -> Dict[str, Any]:
with open(resolve_path(path), "r", encoding="utf-8") as f:
return json.load(f)
def save_json(path: str, data: Dict[str, Any]) -> None:
with open(resolve_path(path), "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
# ===================================================================
# Units
# ===================================================================
def get_length_scale(robot_data: Dict[str, Any]) -> float:
units = robot_data.get("units", {}) or {}
length_unit = str(units.get("length", "")).strip().lower()
if length_unit in ("mm", "millimeter", "millimeters"):
return 1.0 / 1000.0
if length_unit in ("cm", "centimeter", "centimeters"):
return 1.0 / 100.0
return 1.0
# ===================================================================
# Small geometry helpers
# ===================================================================
def safe_norm(v: np.ndarray, eps: float = 1e-12) -> float:
return float(np.linalg.norm(v) + eps)
def normalize_vector(v: np.ndarray, eps: float = 1e-12) -> np.ndarray:
return np.asarray(v, dtype=np.float64) / safe_norm(v, eps)
def clamp(v: float, lo: float, hi: float) -> float:
return float(max(lo, min(hi, v)))
def principal_axis_id(axis: np.ndarray, threshold: float = 0.95) -> Optional[int]:
"""Return 0,1,2 for x,y,z if axis is close enough to a principal axis."""
a = normalize_vector(np.asarray(axis, dtype=np.float64))
idx = int(np.argmax(np.abs(a)))
if abs(a[idx]) >= threshold:
return idx
return None
def camera_center_from_world_to_cam(R_wc: np.ndarray, t_wc: np.ndarray) -> np.ndarray:
"""world_to_camera: X_cam = R_wc * X_world + t_wc; camera center is -R^T t."""
return -R_wc.T @ t_wc
def principal_axis_vector(axis: np.ndarray) -> np.ndarray:
"""Convert a near-principal axis to an exact signed principal axis vector."""
a = normalize_vector(axis)
idx = int(np.argmax(np.abs(a)))
out = np.zeros(3, dtype=np.float64)
out[idx] = 1.0 if a[idx] >= 0 else -1.0
return normalize_vector(out)
def orthonormal_basis_from_axis(axis: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Build two unit vectors orthogonal to axis, with a deterministic orientation.
"""
a = normalize_vector(axis)
ref = np.array([1.0, 0.0, 0.0], dtype=np.float64)
if abs(float(np.dot(a, ref))) > 0.90:
ref = np.array([0.0, 1.0, 0.0], dtype=np.float64)
u = np.cross(a, ref)
if np.linalg.norm(u) < 1e-12:
ref = np.array([0.0, 0.0, 1.0], dtype=np.float64)
u = np.cross(a, ref)
u = normalize_vector(u)
v = normalize_vector(np.cross(a, u))
return u, v
# ===================================================================
# Configuration
# ===================================================================
@dataclass
class ConstraintRuleConfig:
rigid_distance_enabled: bool = True
rigid_distance_mode: str = "mst" # mst | star | full
rigid_distance_weight: float = 1.0
# Revolute joints: keep the projection along the axis constant.
revolute_axis_enabled: bool = True
revolute_axis_max_pairs: int = 2
revolute_axis_weight: float = 0.5
# Prismatic joints: keep the two orthogonal projection components constant.
prismatic_orthogonal_enabled: bool = True
prismatic_orthogonal_max_pairs: int = 2
prismatic_orthogonal_weight: float = 0.35
# Legacy / optional chain propagation, disabled by default.
chain_axis_enabled: bool = False
chain_axis_max_depth: int = 3
chain_axis_max_pairs: int = 2
chain_axis_weight: float = 0.3
axis_alignment_threshold: float = 0.95
strict_unique_marker_ids: bool = False
show_skipped_constraints: bool = True
enable_observation_weights: bool = True
weight_floor: float = 0.30
weight_ceiling: float = 3.00
ref_distance_m: float = 0.75
ref_marker_size_px: float = 50.0
use_detection_confidence: bool = True
use_detection_size_px: bool = True
use_initial_range: bool = True
use_marker_size_prior: bool = True
def _bool_or_default(value: Any, default: bool) -> bool:
if value is None:
return default
return bool(value)
def _float_or_default(value: Any, default: float) -> float:
if value is None:
return default
return float(value)
def _int_or_default(value: Any, default: int) -> int:
if value is None:
return default
return int(value)
def load_constraint_rule_config(robot_data: Dict[str, Any], args: argparse.Namespace) -> ConstraintRuleConfig:
"""
Merge built-in defaults with optional robot.json constraint_rules and CLI flags.
Backward compatibility:
- joint_axis_projection -> revolute_axis
"""
rules = robot_data.get("constraint_rules", {}) or {}
cfg = ConstraintRuleConfig()
rigid = rules.get("rigid_distance", {}) or {}
revolute = rules.get("joint_revolute_axis", {}) or rules.get("joint_axis_projection", {}) or {}
prismatic = rules.get("joint_prismatic_orthogonal", {}) or {}
chain = rules.get("chain_axis_projection", {}) or {}
obs = rules.get("observation_weights", {}) or {}
cfg.rigid_distance_enabled = _bool_or_default(rigid.get("enabled"), cfg.rigid_distance_enabled)
cfg.rigid_distance_mode = str(rigid.get("mode", cfg.rigid_distance_mode)).strip().lower()
cfg.rigid_distance_weight = _float_or_default(rigid.get("weight"), cfg.rigid_distance_weight)
cfg.revolute_axis_enabled = _bool_or_default(revolute.get("enabled"), cfg.revolute_axis_enabled)
cfg.revolute_axis_max_pairs = _int_or_default(revolute.get("max_pairs"), cfg.revolute_axis_max_pairs)
cfg.revolute_axis_weight = _float_or_default(revolute.get("weight"), cfg.revolute_axis_weight)
cfg.prismatic_orthogonal_enabled = _bool_or_default(prismatic.get("enabled"), cfg.prismatic_orthogonal_enabled)
cfg.prismatic_orthogonal_max_pairs = _int_or_default(prismatic.get("max_pairs"), cfg.prismatic_orthogonal_max_pairs)
cfg.prismatic_orthogonal_weight = _float_or_default(prismatic.get("weight"), cfg.prismatic_orthogonal_weight)
cfg.chain_axis_enabled = _bool_or_default(chain.get("enabled"), cfg.chain_axis_enabled)
cfg.chain_axis_max_depth = _int_or_default(chain.get("max_depth"), cfg.chain_axis_max_depth)
cfg.chain_axis_max_pairs = _int_or_default(chain.get("max_pairs"), cfg.chain_axis_max_pairs)
cfg.chain_axis_weight = _float_or_default(chain.get("weight"), cfg.chain_axis_weight)
cfg.axis_alignment_threshold = _float_or_default(
rules.get("axis_alignment_threshold"), cfg.axis_alignment_threshold
)
cfg.enable_observation_weights = _bool_or_default(obs.get("enabled"), cfg.enable_observation_weights)
cfg.weight_floor = _float_or_default(obs.get("weight_floor"), cfg.weight_floor)
cfg.weight_ceiling = _float_or_default(obs.get("weight_ceiling"), cfg.weight_ceiling)
cfg.ref_distance_m = _float_or_default(obs.get("ref_distance_m"), cfg.ref_distance_m)
cfg.ref_marker_size_px = _float_or_default(obs.get("ref_marker_size_px"), cfg.ref_marker_size_px)
cfg.use_detection_confidence = _bool_or_default(obs.get("use_detection_confidence"), cfg.use_detection_confidence)
cfg.use_detection_size_px = _bool_or_default(obs.get("use_detection_size_px"), cfg.use_detection_size_px)
cfg.use_initial_range = _bool_or_default(obs.get("use_initial_range"), cfg.use_initial_range)
cfg.use_marker_size_prior = _bool_or_default(obs.get("use_marker_size_prior"), cfg.use_marker_size_prior)
if getattr(args, "strictUniqueMarkerIds", False):
cfg.strict_unique_marker_ids = True
if getattr(args, "showSkippedConstraints", False):
cfg.show_skipped_constraints = True
if getattr(args, "noShowSkippedConstraints", False):
cfg.show_skipped_constraints = False
return cfg
# ===================================================================
# Observation / constraint definitions
# ===================================================================
@dataclass
class Observation:
cam_idx: int
center_norm: np.ndarray
corners_norm: np.ndarray | None
marker_size_m: float | None
meta: Dict[str, Any]
@dataclass
class MarkerDistanceConstraint:
marker_id_a: int
marker_id_b: int
link_name: str
target_distance_m: float
weight: float = 1.0
enabled: bool = True
source: str = "rigid_distance"
@dataclass
class JointAxisConstraint:
marker_id_parent: int
marker_id_child: int
parent_link: str
child_link: str
joint_axis: np.ndarray
target_delta_along_axis_m: float
weight: float = 1.0
enabled: bool = True
source: str = "joint_axis_projection"
Constraint = MarkerDistanceConstraint | JointAxisConstraint
# ===================================================================
# Robot parsing
# ===================================================================
def parse_robot_markers(
robot_data: Dict[str, Any],
length_scale: float,
strict_unique_marker_ids: bool = False
) -> Tuple[Dict[int, str], Dict[str, List[Dict[str, Any]]], List[str], Dict[int, Dict[str, Any]]]:
links = robot_data.get("links", {}) or {}
marker_to_link: Dict[int, str] = {}
link_markers: Dict[str, List[Dict[str, Any]]] = {}
issues: List[str] = []
marker_meta: Dict[int, Dict[str, Any]] = {}
seen_global: Dict[int, str] = {}
for link_name, link_data in links.items():
markers = link_data.get("markers", []) or []
collected: List[Dict[str, Any]] = []
seen_local: set[int] = set()
for idx, marker in enumerate(markers):
marker_id = int(marker.get("id", -1))
pos = marker.get("position", None)
if marker_id < 0 or pos is None or len(pos) != 3:
issues.append(f"[WARN] link={link_name}: skipped invalid marker entry at index {idx}")
continue
if marker_id in seen_local:
msg = f"[WARN] duplicate marker id {marker_id} inside link '{link_name}'"
if strict_unique_marker_ids:
raise ValueError(msg)
issues.append(msg + " -> skipped duplicate inside same link")
continue
if marker_id in seen_global:
msg = (
f"[WARN] duplicate marker id {marker_id} appears in link '{link_name}' "
f"and already in link '{seen_global[marker_id]}'"
)
if strict_unique_marker_ids:
raise ValueError(msg)
issues.append(msg + " -> skipped duplicate occurrence")
continue
seen_local.add(marker_id)
seen_global[marker_id] = link_name
pos_raw = np.array(pos, dtype=np.float64)
pos_m = pos_raw * float(length_scale)
item = {
"id": marker_id,
"name": marker.get("name", f"marker_{marker_id}"),
"position_raw": pos_raw,
"position_m": pos_m,
"normal": np.array(marker.get("normal", [0, 0, 1]), dtype=np.float64),
"size": marker.get("size", None),
"spin": marker.get("spin", None),
}
collected.append(item)
marker_to_link[marker_id] = link_name
marker_meta[marker_id] = {
"link_name": link_name,
"name": item["name"],
"position_m": pos_m,
"normal": item["normal"],
"size": item["size"],
"spin": item["spin"],
}
link_markers[link_name] = collected
return marker_to_link, link_markers, issues, marker_meta
def get_link_parent_map(robot_data: Dict[str, Any]) -> Dict[str, Optional[str]]:
links = robot_data.get("links", {}) or {}
return {link_name: (link_data.get("parent", None)) for link_name, link_data in links.items()}
def get_joint_info(robot_data: Dict[str, Any], child_link: str) -> Dict[str, Any]:
links = robot_data.get("links", {}) or {}
return (links.get(child_link, {}) or {}).get("jointToParent", {}) or {}
def get_joint_axis(robot_data: Dict[str, Any], child_link: str) -> Optional[np.ndarray]:
joint = get_joint_info(robot_data, child_link)
axis = joint.get("axis", None)
if axis is None:
return None
axis = np.asarray(axis, dtype=np.float64)
if safe_norm(axis) < 1e-12:
return None
return normalize_vector(axis)
def get_vision_marker_size_default(robot_data: Dict[str, Any]) -> float:
vision = robot_data.get("vision_config", {}) or {}
ms = vision.get("MarkerSize", None)
if ms is None:
return 0.025
return float(ms)
# ===================================================================
# Constraint compilation helpers
# ===================================================================
def get_enabled_link_rule(
robot_data: Dict[str, Any],
link_name: str,
rule_name: str,
default_enabled: bool = True
) -> bool:
overrides = robot_data.get("constraint_overrides", {}) or {}
link_override = overrides.get(link_name, {}) or {}
rule_override = link_override.get(rule_name, {}) or {}
if "enabled" in rule_override:
return bool(rule_override["enabled"])
return default_enabled
def select_anchor_marker_ids(
markers: List[Dict[str, Any]],
axis: Optional[np.ndarray] = None,
max_count: int = 2
) -> List[int]:
if not markers:
return []
if len(markers) == 1:
return [int(markers[0]["id"])]
ids = [int(m["id"]) for m in markers]
pos = np.stack([np.asarray(m["position_m"], dtype=np.float64) for m in markers], axis=0)
selected: List[int] = []
if axis is not None and safe_norm(axis) > 1e-12:
a = normalize_vector(axis)
proj = pos @ a
min_idx = int(np.argmin(proj))
max_idx = int(np.argmax(proj))
selected = [ids[min_idx], ids[max_idx]]
else:
centroid = np.mean(pos, axis=0)
d = np.linalg.norm(pos - centroid, axis=1)
min_idx = int(np.argmin(d))
max_idx = int(np.argmax(d))
selected = [ids[min_idx], ids[max_idx]]
if len(selected) < max_count:
for mid in ids:
if mid not in selected:
selected.append(mid)
if len(selected) >= max_count:
break
out: List[int] = []
seen: set[int] = set()
for mid in selected:
if mid not in seen:
seen.add(mid)
out.append(mid)
if len(out) >= max_count:
break
return out
def mst_edges_for_link(markers: List[Dict[str, Any]]) -> List[Tuple[int, int]]:
n = len(markers)
if n < 2:
return []
ids = [int(m["id"]) for m in markers]
pos = np.stack([np.asarray(m["position_m"], dtype=np.float64) for m in markers], axis=0)
in_tree = np.zeros(n, dtype=bool)
in_tree[0] = True
edges: List[Tuple[int, int]] = []
dist = np.linalg.norm(pos[:, None, :] - pos[None, :, :], axis=2)
for _ in range(n - 1):
best = None
best_d = float("inf")
for i in range(n):
if not in_tree[i]:
continue
for j in range(n):
if in_tree[j]:
continue
d = float(dist[i, j])
if d < best_d:
best_d = d
best = (i, j)
if best is None:
break
i, j = best
in_tree[j] = True
edges.append((ids[i], ids[j]))
return edges
def compile_rigid_distance_constraints(
robot_data: Dict[str, Any],
link_markers: Dict[str, List[Dict[str, Any]]],
cfg: ConstraintRuleConfig
) -> List[MarkerDistanceConstraint]:
constraints: List[MarkerDistanceConstraint] = []
for link_name, markers in link_markers.items():
if not get_enabled_link_rule(robot_data, link_name, "rigid_distance", cfg.rigid_distance_enabled):
continue
if len(markers) < 2:
continue
mode = cfg.rigid_distance_mode
if mode == "full":
pairs = [(int(a["id"]), int(b["id"])) for a, b in combinations(markers, 2)]
elif mode == "star":
anchor_ids = select_anchor_marker_ids(markers, axis=None, max_count=1)
anchor_id = anchor_ids[0]
pairs = []
for m in markers:
mid = int(m["id"])
if mid != anchor_id:
pairs.append((anchor_id, mid))
elif mode == "mst":
pairs = mst_edges_for_link(markers)
else:
raise ValueError(f"Unknown rigid_distance_mode='{mode}'. Use mst|star|full.")
pos_map = {int(m["id"]): np.asarray(m["position_m"], dtype=np.float64) for m in markers}
seen_pairs: set[Tuple[int, int]] = set()
for mid_a, mid_b in pairs:
if mid_a == mid_b:
continue
key = tuple(sorted((int(mid_a), int(mid_b))))
if key in seen_pairs:
continue
seen_pairs.add(key)
pos_a = pos_map[mid_a]
pos_b = pos_map[mid_b]
target = float(np.linalg.norm(pos_b - pos_a))
constraints.append(
MarkerDistanceConstraint(
marker_id_a=mid_a,
marker_id_b=mid_b,
link_name=link_name,
target_distance_m=target,
weight=cfg.rigid_distance_weight,
enabled=True,
source=f"rigid_distance:{mode}",
)
)
return constraints
def compile_joint_dof_constraints(
robot_data: Dict[str, Any],
link_markers: Dict[str, List[Dict[str, Any]]],
cfg: ConstraintRuleConfig
) -> List[JointAxisConstraint]:
"""
Compile local joint constraints from robot.json.
Revolute joints: one scalar constraint per anchor pair
(projection along the joint axis stays constant)
Prismatic joints: two scalar constraints per anchor pair
(the orthogonal projections stay constant)
Both are emitted as JointAxisConstraint objects so the rest of the
optimization pipeline remains unchanged.
"""
constraints: List[JointAxisConstraint] = []
links = robot_data.get("links", {}) or {}
for child_link, child_data in links.items():
parent_link = child_data.get("parent", None)
if not parent_link:
continue
joint_info = child_data.get("jointToParent", {}) or {}
joint_type = str(joint_info.get("type", "")).strip().lower()
joint_axis = get_joint_axis(robot_data, child_link)
if joint_axis is None:
continue
axis_vec = principal_axis_vector(joint_axis)
parent_markers = link_markers.get(parent_link, [])
child_markers = link_markers.get(child_link, [])
if len(parent_markers) == 0 or len(child_markers) == 0:
continue
parent_pos = {int(m["id"]): np.asarray(m["position_m"], dtype=np.float64) for m in parent_markers}
child_pos = {int(m["id"]): np.asarray(m["position_m"], dtype=np.float64) for m in child_markers}
seen: set[Tuple[int, int]] = set()
if joint_type == "revolute":
if not get_enabled_link_rule(
robot_data, child_link, "joint_revolute_axis", cfg.revolute_axis_enabled
):
continue
max_pairs = max(1, int(cfg.revolute_axis_max_pairs))
parent_anchor_ids = select_anchor_marker_ids(parent_markers, axis=axis_vec, max_count=max_pairs)
child_anchor_ids = select_anchor_marker_ids(child_markers, axis=axis_vec, max_count=max_pairs)
for mid_p in parent_anchor_ids:
for mid_c in child_anchor_ids:
if mid_p == mid_c:
continue
key = (mid_p, mid_c)
if key in seen:
continue
seen.add(key)
delta = child_pos[mid_c] - parent_pos[mid_p]
target = float(np.dot(delta, axis_vec))
constraints.append(
JointAxisConstraint(
marker_id_parent=mid_p,
marker_id_child=mid_c,
parent_link=parent_link,
child_link=child_link,
joint_axis=axis_vec,
target_delta_along_axis_m=target,
weight=cfg.revolute_axis_weight,
enabled=True,
source="revolute_axis_projection",
)
)
elif joint_type == "linear":
if not get_enabled_link_rule(
robot_data, child_link, "joint_prismatic_orthogonal", cfg.prismatic_orthogonal_enabled
):
continue
max_pairs = max(1, int(cfg.prismatic_orthogonal_max_pairs))
parent_anchor_ids = select_anchor_marker_ids(parent_markers, axis=axis_vec, max_count=max_pairs)
child_anchor_ids = select_anchor_marker_ids(child_markers, axis=axis_vec, max_count=max_pairs)
basis_u, basis_v = orthonormal_basis_from_axis(axis_vec)
for mid_p in parent_anchor_ids:
for mid_c in child_anchor_ids:
if mid_p == mid_c:
continue
key = (mid_p, mid_c)
if key in seen:
continue
seen.add(key)
delta = child_pos[mid_c] - parent_pos[mid_p]
constraints.append(
JointAxisConstraint(
marker_id_parent=mid_p,
marker_id_child=mid_c,
parent_link=parent_link,
child_link=child_link,
joint_axis=basis_u,
target_delta_along_axis_m=float(np.dot(delta, basis_u)),
weight=cfg.prismatic_orthogonal_weight,
enabled=True,
source="prismatic_orthogonal_projection:u",
)
)
constraints.append(
JointAxisConstraint(
marker_id_parent=mid_p,
marker_id_child=mid_c,
parent_link=parent_link,
child_link=child_link,
joint_axis=basis_v,
target_delta_along_axis_m=float(np.dot(delta, basis_v)),
weight=cfg.prismatic_orthogonal_weight,
enabled=True,
source="prismatic_orthogonal_projection:v",
)
)
else:
continue
return constraints
def compile_constraints(
robot_data: Dict[str, Any],
length_scale: float,
cfg: ConstraintRuleConfig
) -> Tuple[Dict[int, str], Dict[str, List[Dict[str, Any]]], List[Constraint], List[str], Dict[int, Dict[str, Any]]]:
marker_to_link, link_markers, issues, marker_meta = parse_robot_markers(
robot_data,
length_scale=length_scale,
strict_unique_marker_ids=cfg.strict_unique_marker_ids,
)
constraints: List[Constraint] = []
constraints.extend(compile_rigid_distance_constraints(robot_data, link_markers, cfg))
constraints.extend(compile_joint_dof_constraints(robot_data, link_markers, cfg))
# Legacy optional family, OFF by default.
if cfg.chain_axis_enabled:
constraints.extend(compile_chain_axis_constraints(robot_data, link_markers, cfg))
unique_constraints: List[Constraint] = []
seen_keys: set[Tuple[Any, ...]] = set()
for c in constraints:
if isinstance(c, MarkerDistanceConstraint):
key = (
"d",
min(c.marker_id_a, c.marker_id_b),
max(c.marker_id_a, c.marker_id_b),
c.link_name,
round(c.target_distance_m, 9),
)
else:
key = (
"j",
c.parent_link,
c.child_link,
c.marker_id_parent,
c.marker_id_child,
tuple(np.round(c.joint_axis, 9).tolist()),
round(c.target_delta_along_axis_m, 9),
)
if key in seen_keys:
continue
seen_keys.add(key)
unique_constraints.append(c)
return marker_to_link, link_markers, unique_constraints, issues, marker_meta
# ===================================================================
# Observation quality / weighting
# ===================================================================
def _optional_float(meta: Dict[str, Any], keys: List[str]) -> Optional[float]:
for k in keys:
if k in meta and meta[k] is not None:
try:
return float(meta[k])
except Exception:
pass
return None
def detection_quality_from_metadata(det_obj: Dict[str, Any], cfg: ConstraintRuleConfig) -> float:
q = 1.0
if cfg.use_detection_confidence:
conf = _optional_float(det_obj, ["confidence", "score", "quality", "det_confidence"])
if conf is not None:
q *= clamp(conf, 0.1, 1.0)
if cfg.use_detection_size_px:
size_px = _optional_float(det_obj, ["size_px", "marker_size_px", "side_px", "side_length_px"])
if size_px is None and "corners_px" in det_obj and isinstance(det_obj["corners_px"], list):
try:
corners = np.asarray(det_obj["corners_px"], dtype=np.float64).reshape(-1, 2)
if len(corners) >= 4:
edges = []
for i in range(len(corners)):
p = corners[i]
q2 = corners[(i + 1) % len(corners)]
edges.append(float(np.linalg.norm(q2 - p)))
size_px = float(np.mean(edges))
except Exception:
size_px = None
if size_px is not None:
q *= clamp(size_px / max(cfg.ref_marker_size_px, 1e-6), 0.25, 3.0)
sharpness = _optional_float(det_obj, ["sharpness", "corner_sharpness"])
if sharpness is not None:
q *= clamp(sharpness / 2500.0, 0.5, 1.5)
normal_alignment = _optional_float(det_obj, ["normal_alignment", "view_cosine", "cos_to_camera"])
if normal_alignment is not None:
q *= clamp(normal_alignment, 0.3, 1.0)
return float(q)
def marker_size_prior_factor(marker_meta: Dict[str, Any], default_marker_size_m: float) -> float:
size_val = marker_meta.get("size", None)
if size_val is None:
return 1.0
try:
size_val = float(size_val)
except Exception:
return 1.0
size_m = size_val / 1000.0 if size_val > 1.0 else size_val
ref = max(default_marker_size_m, 1e-6)
return clamp(size_m / ref, 0.7, 1.3)
def compute_observation_weights(
marker_observations: Dict[int, List[Observation]],
cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]],
initial_positions: Dict[int, np.ndarray],
marker_meta: Dict[int, Dict[str, Any]],
cfg: ConstraintRuleConfig,
robot_data: Dict[str, Any]
) -> Dict[Tuple[int, int], float]:
weights: Dict[Tuple[int, int], float] = {}
default_marker_size_m = get_vision_marker_size_default(robot_data)
for marker_id, obs_list in marker_observations.items():
X = initial_positions.get(marker_id, None)
size_prior = marker_size_prior_factor(marker_meta.get(marker_id, {}), default_marker_size_m)
for obs_idx, obs in enumerate(obs_list):
w = 1.0
q = detection_quality_from_metadata(obs.meta, cfg)
w *= q
if cfg.use_marker_size_prior:
w *= size_prior
if cfg.use_initial_range and X is not None:
_, _, R_wc, t_wc = cameras[obs.cam_idx]
C = camera_center_from_world_to_cam(R_wc, t_wc)
dist = float(np.linalg.norm(X - C))
if np.isfinite(dist):
w *= clamp(cfg.ref_distance_m / max(dist, 1e-6), 0.4, 2.0)
weights[(marker_id, obs_idx)] = clamp(w, cfg.weight_floor, cfg.weight_ceiling)
return weights
# ===================================================================
# Multi-view loading
# ===================================================================
def load_observations_and_poses(
detection_files: List[str],
pose_files: List[str]
) -> Tuple[
Dict[int, List[Observation]],
List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]],
List[Dict[str, Any]]
]:
if len(detection_files) != len(pose_files):
raise ValueError(f"Mismatch: {len(detection_files)} detections vs {len(pose_files)} poses")
marker_observations: Dict[int, List[Observation]] = {}
cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]] = []
obs_metadata: List[Dict[str, Any]] = []
for cam_idx, (det_file, pose_file) in enumerate(zip(detection_files, pose_files)):
det = load_json(det_file)
pose_data = load_json(pose_file)
cam_section = det.get("camera", {}) or {}
K = np.array(cam_section.get("camera_matrix", []), dtype=np.float64).reshape(3, 3)
D = np.array(cam_section.get("distortion_coefficients", []), dtype=np.float64).reshape(-1, 1)
pose_section = pose_data.get("camera_pose", {}) or {}
world_to_cam = pose_section.get("world_to_camera", {}) or {}
R_wc = np.array(world_to_cam.get("rotation_matrix", []), dtype=np.float64).reshape(3, 3)
t_wc = np.array(world_to_cam.get("translation_m", []), dtype=np.float64).reshape(3)
cameras.append((K, D, R_wc, t_wc))
detections = det.get("detections", []) or []
for det_obj in detections:
marker_id = int(det_obj.get("marker_id", -1))
if marker_id < 0:
continue
center_px = np.array(det_obj.get("center_px", []), dtype=np.float64)
if center_px.shape != (2,):
continue
pts = center_px.reshape(1, 1, 2).astype(np.float32)
und = cv2.undistortPoints(pts, K.astype(np.float32), D.astype(np.float32), P=None)
center_norm = und.reshape(2).astype(np.float64)
obs = Observation(
cam_idx=cam_idx,
center_norm=center_norm,
meta=dict(det_obj),
)
marker_observations.setdefault(marker_id, []).append(obs)
obs_metadata.append(
{
"detection_file": det_file,
"pose_file": pose_file,
"num_detections": len(detections),
}
)
return marker_observations, cameras, obs_metadata
# ===================================================================
# Initial triangulation
# ===================================================================
def triangulate_marker_initial(
marker_id: int,
observations: List[Observation],
cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]
) -> Optional[np.ndarray]:
"""
Multi-view linear triangulation from all available normalized observations.
Uses all cameras instead of only the best pair.
"""
if len(observations) < 2:
return None
A = []
for obs in observations:
cam_idx = obs.cam_idx
_, _, R_wc, t_wc = cameras[cam_idx]
# world -> camera projection matrix without intrinsics,
# because obs.center_norm is already undistorted / normalized
P = np.hstack([R_wc, t_wc.reshape(3, 1)]) # 3x4
x = float(obs.center_norm[0])
y = float(obs.center_norm[1])
A.append(x * P[2, :] - P[0, :])
A.append(y * P[2, :] - P[1, :])
A = np.asarray(A, dtype=np.float64)
if A.shape[0] < 4:
return None
try:
_, _, Vt = np.linalg.svd(A)
X_h = Vt[-1, :]
if abs(X_h[3]) < 1e-12:
return None
X = X_h[:3] / X_h[3]
if not np.all(np.isfinite(X)):
return None
return X.astype(np.float64)
except Exception:
return None
def initial_triangulation(
marker_observations: Dict[int, List[Observation]],
cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]]
) -> Dict[int, np.ndarray]:
triangulated: Dict[int, np.ndarray] = {}
for marker_id, observations in marker_observations.items():
X = triangulate_marker_initial(marker_id, observations, cameras)
if X is not None:
triangulated[marker_id] = X
return triangulated
# ===================================================================
# Weighted residuals / optimization
# ===================================================================
def bundle_adjustment_residuals(
marker_positions_flat: np.ndarray,
marker_ids: List[int],
marker_observations: Dict[int, List[Observation]],
cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]],
constraints: List[Constraint],
obs_weights: Dict[Tuple[int, int], float],
lambda_constraint: float = 100.0
) -> np.ndarray:
marker_dict: Dict[int, np.ndarray] = {}
for i, marker_id in enumerate(marker_ids):
marker_dict[marker_id] = marker_positions_flat[i * 3:(i + 1) * 3]
residuals: List[float] = []
for marker_id, observations in marker_observations.items():
if marker_id not in marker_dict:
continue
X_world = marker_dict[marker_id]
for obs_idx, obs in enumerate(observations):
cam_idx, norm_coords_obs = obs.cam_idx, obs.norm_coords
K, D, R_wc, t_wc = cameras[cam_idx]
X_cam = R_wc @ X_world + t_wc
if X_cam[2] > 1e-6:
proj_norm = X_cam[:2] / X_cam[2]
r = proj_norm - norm_coords_obs
w = float(np.sqrt(obs_weights.get((marker_id, obs_idx), 1.0)))
residuals.append(w * float(r[0]))
residuals.append(w * float(r[1]))
for constraint in constraints:
if isinstance(constraint, MarkerDistanceConstraint):
if constraint.marker_id_a in marker_dict and constraint.marker_id_b in marker_dict:
pos_a = marker_dict[constraint.marker_id_a]
pos_b = marker_dict[constraint.marker_id_b]
actual_dist = float(np.linalg.norm(pos_b - pos_a))
residuals.append((actual_dist - constraint.target_distance_m) * constraint.weight * lambda_constraint)
elif isinstance(constraint, JointAxisConstraint):
if constraint.marker_id_parent in marker_dict and constraint.marker_id_child in marker_dict:
pos_parent = marker_dict[constraint.marker_id_parent]
pos_child = marker_dict[constraint.marker_id_child]
delta = pos_child - pos_parent
actual_delta = float(np.dot(delta, constraint.joint_axis))
residuals.append((actual_delta - constraint.target_delta_along_axis_m) * constraint.weight * lambda_constraint)
return np.asarray(residuals, dtype=np.float64)
def optimize_with_constraints(
initial_positions: Dict[int, np.ndarray],
marker_observations: Dict[int, List[Observation]],
cameras: List[Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]],
constraints: List[Constraint],
obs_weights: Dict[Tuple[int, int], float],
lambda_constraint: float = 100.0,
max_iterations: int = 50
) -> Dict[int, np.ndarray]:
try:
from scipy.optimize import least_squares
except ImportError:
print("[WARN] scipy not available, skipping optimization.")
return initial_positions
marker_ids = sorted(initial_positions.keys())
if not marker_ids:
return {}
x0 = np.concatenate([initial_positions[mid] for mid in marker_ids])
def residuals_fn(x: np.ndarray) -> np.ndarray:
return bundle_adjustment_residuals(
x, marker_ids, marker_observations, cameras, constraints, obs_weights, lambda_constraint
)
print(f"[INFO] Starting optimization with {len(x0)} variables and {len(constraints)} constraints...")
result = least_squares(
residuals_fn,
x0,
max_nfev=max_iterations * max(1, len(marker_ids)),
verbose=1,
)
optimized = {}
for i, marker_id in enumerate(marker_ids):
optimized[marker_id] = result.x[i * 3:(i + 1) * 3]
print(f"[INFO] Optimization complete. Final cost: {float(np.sum(result.fun ** 2)):.6f}")
return optimized
# ===================================================================
# Reporting helpers
# ===================================================================
def print_constraint_summary(constraints: List[Constraint]) -> None:
num_dist = sum(isinstance(c, MarkerDistanceConstraint) for c in constraints)
num_joint = sum(isinstance(c, JointAxisConstraint) for c in constraints)
print(f"[INFO] Constraint summary: total={len(constraints)} distance={num_dist} joint/chain={num_joint}")
def print_constraint_list(constraints: List[Constraint]) -> None:
print("\n[INFO] Constraint list:")
for idx, constraint in enumerate(constraints):
if isinstance(constraint, MarkerDistanceConstraint):
print(
f" [{idx:03d}] DISTANCE | "
f"Link='{constraint.link_name}' | "
f"M{constraint.marker_id_a} <-> M{constraint.marker_id_b} | "
f"Target={constraint.target_distance_m:.6f} m | "
f"Weight={constraint.weight} | "
f"Source={constraint.source}"
)
else:
axis_str = np.array2string(constraint.joint_axis, precision=3, suppress_small=True)
print(
f" [{idx:03d}] JOINT_AXIS | "
f"{constraint.parent_link}(M{constraint.marker_id_parent}) -> "
f"{constraint.child_link}(M{constraint.marker_id_child}) | "
f"Axis={axis_str} | "
f"TargetDelta={constraint.target_delta_along_axis_m:.6f} m | "
f"Weight={constraint.weight} | "
f"Source={constraint.source}"
)
def print_constraints_with_errors(
title: str,
constraints: List[Constraint],
positions: Dict[int, np.ndarray],
show_skipped: bool = True
) -> None:
print(f"\n[INFO] {title}")
active = 0
skipped = 0
for idx, constraint in enumerate(constraints):
if isinstance(constraint, MarkerDistanceConstraint):
if constraint.marker_id_a not in positions or constraint.marker_id_b not in positions:
skipped += 1
if show_skipped:
print(
f" [{idx:03d}] DISTANCE | "
f"M{constraint.marker_id_a} <-> M{constraint.marker_id_b} | SKIPPED (missing marker)"
)
continue
pos_a = positions[constraint.marker_id_a]
pos_b = positions[constraint.marker_id_b]
actual = float(np.linalg.norm(pos_b - pos_a))
error = actual - constraint.target_distance_m
active += 1
print(
f" [{idx:03d}] DISTANCE | "
f"Link='{constraint.link_name}' | "
f"M{constraint.marker_id_a} <-> M{constraint.marker_id_b} | "
f"target={constraint.target_distance_m*1000:.2f} mm | "
f"actual={actual*1000:.2f} mm | "
f"error={error*1000:+.2f} mm"
)
elif isinstance(constraint, JointAxisConstraint):
if constraint.marker_id_parent not in positions or constraint.marker_id_child not in positions:
skipped += 1
if show_skipped:
print(
f" [{idx:03d}] JOINT_AXIS | "
f"M{constraint.marker_id_parent} -> M{constraint.marker_id_child} | SKIPPED (missing marker)"
)
continue
pos_parent = positions[constraint.marker_id_parent]
pos_child = positions[constraint.marker_id_child]
delta = pos_child - pos_parent
actual = float(np.dot(delta, constraint.joint_axis))
error = actual - constraint.target_delta_along_axis_m
active += 1
axis_str = np.array2string(constraint.joint_axis, precision=2, suppress_small=True)
print(
f" [{idx:03d}] JOINT_AXIS | "
f"{constraint.parent_link}(M{constraint.marker_id_parent}) -> "
f"{constraint.child_link}(M{constraint.marker_id_child}) | "
f"axis={axis_str} | "
f"target={constraint.target_delta_along_axis_m*1000:.2f} mm | "
f"actual={actual*1000:.2f} mm | "
f"error={error*1000:+.2f} mm"
)
print(f"[INFO] Active constraints: {active} | Skipped: {skipped}")
def print_observation_weight_summary(obs_weights: Dict[Tuple[int, int], float]) -> None:
if not obs_weights:
print("[INFO] Observation weighting: disabled or empty")
return
values = np.array(list(obs_weights.values()), dtype=np.float64)
print(
"[INFO] Observation weights: "
f"min={values.min():.3f} mean={values.mean():.3f} "
f"median={np.median(values):.3f} max={values.max():.3f}"
)
# ===================================================================
# Main
# ===================================================================
def main() -> None:
parser = argparse.ArgumentParser(
description="Multi-view bundle adjustment with rule-based geometric constraints"
)
parser.add_argument(
"-det", "--detections",
action="append",
required=True,
help="*_aruco_detection.json files"
)
parser.add_argument(
"-pose", "--poses",
action="append",
required=True,
help="*_camera_pose.json files"
)
parser.add_argument(
"-robot", "--robot",
required=True,
help="robot.json"
)
parser.add_argument(
"-outDir", "--outDir",
default=None,
help="Output directory"
)
parser.add_argument(
"-lambdaWeight", "--lambdaWeight",
type=float,
default=100.0,
help="Constraint weight multiplier"
)
parser.add_argument(
"--strictUniqueMarkerIds",
action="store_true",
help="Fail if a marker ID appears more than once in robot.json"
)
parser.add_argument(
"--showSkippedConstraints",
action="store_true",
help="Print skipped constraints in the report"
)
parser.add_argument(
"--noShowSkippedConstraints",
action="store_true",
help="Hide skipped constraints in the report"
)
parser.add_argument(
"--saveConstraintReport",
action="store_true",
help="Save constraint report JSON files"
)
parser.add_argument(
"--saveObservationWeightReport",
action="store_true",
help="Save observation-weight report JSON file"
)
args = parser.parse_args()
if args.showSkippedConstraints and args.noShowSkippedConstraints:
print("[ERROR] Choose only one of --showSkippedConstraints or --noShowSkippedConstraints")
sys.exit(1)
if len(args.detections) != len(args.poses):
print(f"[ERROR] Mismatch: {len(args.detections)} detection files vs {len(args.poses)} pose files")
sys.exit(1)
robot_data = load_json(args.robot)
length_scale = get_length_scale(robot_data)
cfg = load_constraint_rule_config(robot_data, args)
print("[STEP 1] Compile constraints from robot.json structure...")
print(
"[INFO] Constraint families: "
f"rigid_distance={'on' if cfg.rigid_distance_enabled else 'off'}, "
f"revolute={'on' if cfg.revolute_axis_enabled else 'off'}, "
f"prismatic={'on' if cfg.prismatic_orthogonal_enabled else 'off'}, "
f"chain_legacy={'on' if cfg.chain_axis_enabled else 'off'}, "
f"observation_weights={'on' if cfg.enable_observation_weights else 'off'}"
)
marker_to_link, link_markers, constraints, issues, marker_meta = compile_constraints(robot_data, length_scale, cfg)
for issue in issues:
print(issue)
print(f"[INFO] Links with markers: {sum(1 for v in link_markers.values() if len(v) > 0)}")
print(f"[INFO] Unique marker IDs: {len(marker_to_link)}")
print_constraint_summary(constraints)
print_constraint_list(constraints)
print("\n[STEP 2] Load observations and camera poses...")
marker_observations, cameras, obs_metadata = load_observations_and_poses(args.detections, args.poses)
print(f"[INFO] {len(cameras)} cameras, {len(marker_observations)} observed markers")
print(f"[INFO] Detection files loaded: {len(obs_metadata)}")
print("\n[STEP 3] Initial triangulation...")
initial_pos = initial_triangulation(marker_observations, cameras)
print(f"[INFO] Triangulated {len(initial_pos)} markers")
out_dir = args.outDir or os.path.dirname(args.detections[0]) or "."
os.makedirs(resolve_path(out_dir), exist_ok=True)
initial_output_markers = []
for marker_id, position in sorted(initial_pos.items()):
initial_output_markers.append(
{
"marker_id": int(marker_id),
"position_m": [float(x) for x in position],
"position_mm": [float(x * 1000.0) for x in position],
"link": marker_to_link.get(marker_id, "unknown"),
}
)
initial_output = {
"schema_version": "1.2",
"stage": "initial_triangulation",
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"summary": {
"num_cameras": len(cameras),
"num_markers": len(initial_pos),
"num_constraints": len(constraints),
},
"markers": initial_output_markers,
}
initial_out_file = os.path.join(out_dir, "aruco_positions_initial.json")
save_json(initial_out_file, initial_output)
print(f"[INFO] Initial triangulation saved to {initial_out_file}")
obs_weights = compute_observation_weights(
marker_observations=marker_observations,
cameras=cameras,
initial_positions=initial_pos,
marker_meta=marker_meta,
cfg=cfg,
robot_data=robot_data,
)
print_observation_weight_summary(obs_weights)
print_constraints_with_errors(
"Constraint list BEFORE optimization",
constraints,
initial_pos,
show_skipped=cfg.show_skipped_constraints,
)
print("\n[STEP 4] Bundle adjustment with constraints...")
optimized_pos = optimize_with_constraints(
initial_pos,
marker_observations,
cameras,
constraints,
obs_weights,
lambda_constraint=args.lambdaWeight,
)
print_constraints_with_errors(
"Constraint list AFTER optimization",
constraints,
optimized_pos,
show_skipped=cfg.show_skipped_constraints,
)
output_markers = []
for marker_id, position in sorted(optimized_pos.items()):
output_markers.append(
{
"marker_id": int(marker_id),
"position_m": [float(x) for x in position],
"position_mm": [float(x * 1000.0) for x in position],
"link": marker_to_link.get(marker_id, "unknown"),
}
)
output = {
"schema_version": "1.2",
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"summary": {
"num_cameras": len(cameras),
"num_markers": len(optimized_pos),
"num_constraints": len(constraints),
},
"markers": output_markers,
}
out_file = os.path.join(out_dir, "aruco_positions_optimized.json")
save_json(out_file, output)
print(f"\n[INFO] Saved to {out_file}")
if args.saveConstraintReport:
report = {
"schema_version": "1.0",
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"summary": {
"num_constraints": len(constraints),
"num_links_with_markers": sum(1 for v in link_markers.values() if len(v) > 0),
"num_observed_markers": len(marker_observations),
"num_triangulated_markers": len(initial_pos),
"num_optimized_markers": len(optimized_pos),
},
"constraints": [],
}
for c in constraints:
if isinstance(c, MarkerDistanceConstraint):
report["constraints"].append(
{
"kind": "distance",
"link_name": c.link_name,
"marker_id_a": c.marker_id_a,
"marker_id_b": c.marker_id_b,
"target_distance_m": c.target_distance_m,
"weight": c.weight,
"source": c.source,
}
)
else:
report["constraints"].append(
{
"kind": "joint_axis",
"parent_link": c.parent_link,
"child_link": c.child_link,
"marker_id_parent": c.marker_id_parent,
"marker_id_child": c.marker_id_child,
"joint_axis": [float(x) for x in c.joint_axis],
"target_delta_along_axis_m": c.target_delta_along_axis_m,
"weight": c.weight,
"source": c.source,
}
)
report_file = os.path.join(out_dir, "constraint_report.json")
save_json(report_file, report)
print(f"[INFO] Constraint report saved to {report_file}")
if args.saveObservationWeightReport:
obs_report = {
"schema_version": "1.0",
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"summary": {
"num_weighted_observations": len(obs_weights),
},
"observation_weights": [
{
"marker_id": int(mid),
"observation_index": int(obs_idx),
"weight": float(w),
}
for (mid, obs_idx), w in sorted(obs_weights.items())
],
}
obs_file = os.path.join(out_dir, "observation_weight_report.json")
save_json(obs_file, obs_report)
print(f"[INFO] Observation-weight report saved to {obs_file}")
if __name__ == "__main__":
main()