846 lines
29 KiB
Python
846 lines
29 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
2_estimate_camera_from_observations.py
|
|
|
|
Estimate a single camera pose from ArUco observations stored in
|
|
*_aruco_detection.json, using marker world positions from robot.json.
|
|
|
|
This follows the same mathematical idea as readTwoImages.py:
|
|
1) use detected marker observations,
|
|
2) get an initial pose from a rigid transform,
|
|
3) refine with Levenberg-Marquardt on normalized reprojection residuals.
|
|
|
|
Difference to readTwoImages.py:
|
|
- No image processing here.
|
|
- Input is the observation JSON created by 1_detect_aruco_observations.py.
|
|
- Output is xxx_camera_pose.json.
|
|
- Unknown marker reconstruction is intentionally omitted.
|
|
|
|
Assumptions:
|
|
- robot.json contains a marker list for the board/world frame.
|
|
- At minimum, marker positions are present for the reference markers.
|
|
- The detection JSON contains camera intrinsics and marker corners.
|
|
|
|
Typical usage:
|
|
python3 2_estimate_camera_from_observations.py \
|
|
-i frame_0001_aruco_detection.json \
|
|
-robot robot.json \
|
|
-outDir results/
|
|
|
|
Output:
|
|
frame_0001_camera_pose.json
|
|
|
|
Notes on uncertainty:
|
|
- The script computes an approximate 6x6 covariance for the pose parameters
|
|
[rvec_x, rvec_y, rvec_z, t_x, t_y, t_z].
|
|
- It also propagates that covariance to camera center uncertainty in world
|
|
coordinates and to approximate roll/pitch/yaw uncertainty.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Path / JSON helpers
|
|
# ---------------------------------------------------------------------
|
|
|
|
def resolve_path(path: str) -> str:
|
|
path = os.path.expanduser(path)
|
|
if os.path.isabs(path):
|
|
return path
|
|
return os.path.abspath(path)
|
|
|
|
|
|
def load_json(path: str) -> Dict[str, Any]:
|
|
with open(resolve_path(path), "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_json(path: str, data: Dict[str, Any]) -> None:
|
|
with open(resolve_path(path), "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Intrinsics
|
|
# ---------------------------------------------------------------------
|
|
|
|
def load_intrinsics_from_detection(detection: Dict[str, Any]) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""
|
|
Primary source: the embedded camera intrinsics in the detection JSON.
|
|
"""
|
|
camera = detection.get("camera", {})
|
|
K = camera.get("camera_matrix", None)
|
|
D = camera.get("distortion_coefficients", None)
|
|
|
|
if K is None:
|
|
raise KeyError("camera_matrix missing in detection JSON.")
|
|
if D is None:
|
|
D = [0, 0, 0, 0, 0]
|
|
|
|
K = np.array(K, dtype=np.float32).reshape(3, 3)
|
|
D = np.array(D, dtype=np.float32).reshape(-1, 1)
|
|
return K, D
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Robot JSON parsing
|
|
# ---------------------------------------------------------------------
|
|
|
|
def _rotation_matrix_from_any(rotation: Any) -> np.ndarray:
|
|
"""
|
|
Best-effort parser for marker rotation.
|
|
|
|
Supported inputs:
|
|
- 3x3 matrix as nested list
|
|
- flat 9 list
|
|
- dict with keys:
|
|
* rotation_matrix / matrix
|
|
* rvec / rodriques / rodrigues
|
|
* euler_deg / rpy_deg / roll_pitch_yaw_deg
|
|
* euler_rad / rpy_rad / roll_pitch_yaw_rad
|
|
* quaternion / quat (best-effort, expects [x,y,z,w] unless specified)
|
|
- None => identity
|
|
|
|
The pose estimator below only needs marker positions, but we keep
|
|
this parser for completeness and future extension.
|
|
"""
|
|
if rotation is None:
|
|
return np.eye(3, dtype=np.float32)
|
|
|
|
# Direct matrix
|
|
if isinstance(rotation, (list, tuple, np.ndarray)):
|
|
arr = np.array(rotation, dtype=np.float32)
|
|
if arr.shape == (3, 3):
|
|
return arr
|
|
if arr.size == 9:
|
|
return arr.reshape(3, 3).astype(np.float32)
|
|
if arr.size == 3:
|
|
# Treat as Rodrigues vector
|
|
R, _ = cv2.Rodrigues(arr.reshape(3, 1))
|
|
return R.astype(np.float32)
|
|
return np.eye(3, dtype=np.float32)
|
|
|
|
if isinstance(rotation, dict):
|
|
for key in ("rotation_matrix", "matrix"):
|
|
if key in rotation:
|
|
return _rotation_matrix_from_any(rotation[key])
|
|
|
|
for key in ("rvec", "rodrigues", "rodriques"):
|
|
if key in rotation:
|
|
v = np.array(rotation[key], dtype=np.float32).reshape(3, 1)
|
|
R, _ = cv2.Rodrigues(v)
|
|
return R.astype(np.float32)
|
|
|
|
def euler_to_R(roll: float, pitch: float, yaw: float, degrees: bool = True) -> np.ndarray:
|
|
if degrees:
|
|
roll = np.deg2rad(roll)
|
|
pitch = np.deg2rad(pitch)
|
|
yaw = np.deg2rad(yaw)
|
|
cr, sr = np.cos(roll), np.sin(roll)
|
|
cp, sp = np.cos(pitch), np.sin(pitch)
|
|
cy, sy = np.cos(yaw), np.sin(yaw)
|
|
|
|
Rx = np.array([[1, 0, 0],
|
|
[0, cr, -sr],
|
|
[0, sr, cr]], dtype=np.float32)
|
|
Ry = np.array([[cp, 0, sp],
|
|
[0, 1, 0],
|
|
[-sp, 0, cp]], dtype=np.float32)
|
|
Rz = np.array([[cy, -sy, 0],
|
|
[sy, cy, 0],
|
|
[0, 0, 1]], dtype=np.float32)
|
|
# ZYX convention
|
|
return (Rz @ Ry @ Rx).astype(np.float32)
|
|
|
|
for key in ("euler_deg", "rpy_deg", "roll_pitch_yaw_deg"):
|
|
if key in rotation:
|
|
vals = np.array(rotation[key], dtype=np.float32).reshape(-1)
|
|
if vals.size == 3:
|
|
return euler_to_R(float(vals[0]), float(vals[1]), float(vals[2]), degrees=True)
|
|
|
|
for key in ("euler_rad", "rpy_rad", "roll_pitch_yaw_rad"):
|
|
if key in rotation:
|
|
vals = np.array(rotation[key], dtype=np.float32).reshape(-1)
|
|
if vals.size == 3:
|
|
return euler_to_R(float(vals[0]), float(vals[1]), float(vals[2]), degrees=False)
|
|
|
|
for key in ("quaternion", "quat"):
|
|
if key in rotation:
|
|
q = np.array(rotation[key], dtype=np.float32).reshape(-1)
|
|
if q.size == 4:
|
|
# Best-effort: try [x,y,z,w]
|
|
x, y, z, w = [float(v) for v in q]
|
|
R = np.array([
|
|
[1 - 2*y*y - 2*z*z, 2*x*y - 2*z*w, 2*x*z + 2*y*w],
|
|
[2*x*y + 2*z*w, 1 - 2*x*x - 2*z*z, 2*y*z - 2*x*w],
|
|
[2*x*z - 2*y*w, 2*y*z + 2*x*w, 1 - 2*x*x - 2*y*y]
|
|
], dtype=np.float32)
|
|
return R
|
|
|
|
return np.eye(3, dtype=np.float32)
|
|
|
|
|
|
def get_marker_rotation(marker: Dict[str, Any]) -> np.ndarray:
|
|
"""
|
|
Flexible rotation extraction. Falls back to identity if absent.
|
|
"""
|
|
for key in ("rotation", "rotation_matrix", "matrix", "pose_rotation", "orientation"):
|
|
if key in marker:
|
|
return _rotation_matrix_from_any(marker[key])
|
|
|
|
# Also allow flat pose-style fields
|
|
if "rvec" in marker or "rodrigues" in marker:
|
|
return _rotation_matrix_from_any({"rvec": marker.get("rvec", marker.get("rodrigues"))})
|
|
if "euler_deg" in marker:
|
|
return _rotation_matrix_from_any({"euler_deg": marker["euler_deg"]})
|
|
if "rpy_deg" in marker:
|
|
return _rotation_matrix_from_any({"rpy_deg": marker["rpy_deg"]})
|
|
if "quaternion" in marker:
|
|
return _rotation_matrix_from_any({"quaternion": marker["quaternion"]})
|
|
|
|
return np.eye(3, dtype=np.float32)
|
|
|
|
|
|
def load_marker_lookup(robot_json_path: str, ref_set: Optional[str] = None) -> Dict[int, Dict[str, Any]]:
|
|
"""
|
|
Supports the new format:
|
|
robot_data["links"]["Board"]["markers"]
|
|
|
|
Fallback:
|
|
robot_data["Marker"]
|
|
|
|
ref_set: wenn angegeben, werden nur Marker mit passendem "set"-Feld als Referenz verwendet.
|
|
"""
|
|
robot_json_path = resolve_path(robot_json_path)
|
|
with open(robot_json_path, "r", encoding="utf-8") as f:
|
|
robot_data = json.load(f)
|
|
|
|
length_units = str(robot_data.get("units", {}).get("length", "")).strip().lower()
|
|
length_scale = 1.0
|
|
if length_units in ("mm", "millimeter", "millimeters"):
|
|
length_scale = 1.0 / 1000.0
|
|
elif length_units in ("cm", "centimeter", "centimeters"):
|
|
length_scale = 1.0 / 100.0
|
|
|
|
marker_lookup: Dict[int, Dict[str, Any]] = {}
|
|
|
|
links = robot_data.get("links", {})
|
|
board = links.get("Board")
|
|
|
|
markers = None
|
|
if board and "markers" in board:
|
|
markers = board["markers"]
|
|
|
|
if not markers:
|
|
markers = robot_data.get("Marker", [])
|
|
|
|
for marker in markers:
|
|
marker_id = int(marker.get("id", -1))
|
|
if marker_id < 0:
|
|
continue
|
|
|
|
# Referenz-Set-Filter: nur Marker mit passendem set-Wert verwenden
|
|
if ref_set and str(marker.get("set", "")) != ref_set:
|
|
continue
|
|
|
|
if "position" not in marker:
|
|
continue
|
|
|
|
pos = marker.get("position")
|
|
if pos is None:
|
|
continue
|
|
|
|
if len(pos) != 3:
|
|
continue
|
|
|
|
rotation = get_marker_rotation(marker)
|
|
|
|
marker_lookup[marker_id] = {
|
|
"position": np.array(pos, dtype=np.float32) * np.float32(length_scale),
|
|
"rotation": rotation,
|
|
"on": marker.get("on", "unknown"),
|
|
}
|
|
|
|
return marker_lookup
|
|
|
|
|
|
def load_robot_marker_size(robot_json_path: str) -> Optional[float]:
|
|
"""
|
|
Best-effort marker size reader from robot.json.
|
|
Returns meters if found, otherwise None.
|
|
"""
|
|
robot_json_path = resolve_path(robot_json_path)
|
|
with open(robot_json_path, "r", encoding="utf-8") as f:
|
|
robot_data = json.load(f)
|
|
|
|
vision_config = robot_data.get("vision_config", {})
|
|
size = vision_config.get("MarkerSize", None)
|
|
if size is None:
|
|
return None
|
|
try:
|
|
return float(size)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Geometry / pose helpers
|
|
# ---------------------------------------------------------------------
|
|
|
|
def marker_local_corners(marker_size_m: float) -> np.ndarray:
|
|
half = marker_size_m / 2.0
|
|
# Same corner order as the readTwoImages.py example
|
|
return np.array([
|
|
[-half, half, 0.0],
|
|
[ half, half, 0.0],
|
|
[ half, -half, 0.0],
|
|
[-half, -half, 0.0],
|
|
], dtype=np.float32)
|
|
|
|
|
|
def rigid_transform_no_scale(A: np.ndarray, B: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""
|
|
Find R, t such that B ≈ R A + t.
|
|
A, B: Nx3
|
|
"""
|
|
assert A.shape == B.shape and A.shape[1] == 3, "A and B must be Nx3"
|
|
N = A.shape[0]
|
|
if N < 2:
|
|
raise ValueError("Need at least 2 points; 3+ recommended.")
|
|
|
|
centroid_A = A.mean(axis=0)
|
|
centroid_B = B.mean(axis=0)
|
|
|
|
AA = A - centroid_A
|
|
BB = B - centroid_B
|
|
|
|
H = AA.T @ BB
|
|
U, S, Vt = np.linalg.svd(H)
|
|
R = Vt.T @ U.T
|
|
|
|
if np.linalg.det(R) < 0:
|
|
Vt[-1, :] *= -1
|
|
R = Vt.T @ U.T
|
|
|
|
t = centroid_B - R @ centroid_A
|
|
return R.astype(np.float32), t.astype(np.float32)
|
|
|
|
|
|
def undistort_to_normalized(points_px: np.ndarray, K: np.ndarray, D: np.ndarray) -> np.ndarray:
|
|
pts = points_px.reshape(-1, 1, 2).astype(np.float32)
|
|
und = cv2.undistortPoints(pts, K, D, P=None)
|
|
return und.reshape(-1, 2).astype(np.float32)
|
|
|
|
|
|
def rvec_to_R(rvec: np.ndarray) -> np.ndarray:
|
|
R, _ = cv2.Rodrigues(rvec.reshape(3, 1))
|
|
return R.astype(np.float32)
|
|
|
|
|
|
def R_to_euler_zyx(R: np.ndarray) -> Tuple[float, float, float]:
|
|
"""
|
|
Return roll, pitch, yaw in degrees using ZYX convention.
|
|
"""
|
|
yaw = float(np.degrees(np.arctan2(R[1, 0], R[0, 0])))
|
|
sp = np.sqrt(R[2, 1] ** 2 + R[2, 2] ** 2)
|
|
pitch = float(np.degrees(np.arctan2(-R[2, 0], sp)))
|
|
roll = float(np.degrees(np.arctan2(R[2, 1], R[2, 2])))
|
|
return roll, pitch, yaw
|
|
|
|
|
|
def theta_to_camera_pose(theta: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
|
"""
|
|
theta = [omega_x, omega_y, omega_z, t_x, t_y, t_z]
|
|
Returns:
|
|
R_wc, t_wc, camera_center_world
|
|
"""
|
|
omega = theta[0:3]
|
|
t_wc = theta[3:6].reshape(3, 1).astype(np.float32)
|
|
R_wc, _ = cv2.Rodrigues(omega.reshape(3, 1))
|
|
R_wc = R_wc.astype(np.float32)
|
|
R_cw = R_wc.T
|
|
camera_center_world = (-R_cw @ t_wc).reshape(3)
|
|
return R_wc, t_wc.reshape(3), camera_center_world
|
|
|
|
|
|
def build_projection_matrix(K: np.ndarray, R: np.ndarray, t: np.ndarray) -> np.ndarray:
|
|
return K @ np.hstack([R, t.reshape(3, 1)])
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# LM on normalized residuals (same style as readTwoImages.py)
|
|
# ---------------------------------------------------------------------
|
|
|
|
def pack_params(omega: np.ndarray, t: np.ndarray) -> np.ndarray:
|
|
return np.hstack([omega.reshape(3), t.reshape(3)]).astype(np.float64)
|
|
|
|
|
|
def unpack_params(theta: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
|
omega = theta[0:3]
|
|
t = theta[3:6]
|
|
return omega, t
|
|
|
|
|
|
def residuals_centers_normalized(theta: np.ndarray,
|
|
X_world: np.ndarray,
|
|
obs_norm: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Residuals in normalized coordinates:
|
|
obs_norm - project(R X_world + t)
|
|
"""
|
|
omega, t = unpack_params(theta)
|
|
R_wc = cv2.Rodrigues(omega.reshape(3, 1))[0].astype(np.float64)
|
|
X_cam = (R_wc @ X_world.T + t.reshape(3, 1)).T
|
|
uv = X_cam[:, :2] / X_cam[:, 2:3]
|
|
r = (obs_norm - uv).reshape(-1)
|
|
return r
|
|
|
|
|
|
def numerical_jacobian(f, theta: np.ndarray, eps: float, *args) -> Tuple[np.ndarray, np.ndarray]:
|
|
r0 = f(theta, *args)
|
|
m = r0.size
|
|
n = theta.size
|
|
J = np.zeros((m, n), dtype=np.float64)
|
|
for k in range(n):
|
|
th = theta.copy()
|
|
th[k] += eps
|
|
rk = f(th, *args)
|
|
J[:, k] = (rk - r0) / eps
|
|
return J, r0
|
|
|
|
|
|
def lm_solve(theta0: np.ndarray,
|
|
X_world: np.ndarray,
|
|
obs_norm: np.ndarray,
|
|
max_iter: int = 60,
|
|
eps_jac: float = 1e-6,
|
|
lambda_init: float = 1e-3) -> Tuple[np.ndarray, Dict[str, List[float]]]:
|
|
lam = lambda_init
|
|
theta = theta0.copy().astype(np.float64)
|
|
history = {"iters": [], "rms": [], "lambda": []}
|
|
|
|
for it in range(max_iter):
|
|
J, r = numerical_jacobian(residuals_centers_normalized, theta, eps_jac, X_world, obs_norm)
|
|
rms = float(np.sqrt(np.mean(r * r))) if r.size else 0.0
|
|
history["iters"].append(it)
|
|
history["rms"].append(rms)
|
|
history["lambda"].append(lam)
|
|
|
|
JTJ = J.T @ J
|
|
g = J.T @ r
|
|
H = JTJ + lam * np.eye(JTJ.shape[0], dtype=np.float64)
|
|
|
|
try:
|
|
delta = -np.linalg.solve(H, g)
|
|
except np.linalg.LinAlgError:
|
|
delta, *_ = np.linalg.lstsq(H, -g, rcond=None)
|
|
|
|
theta_trial = theta + delta
|
|
r_trial = residuals_centers_normalized(theta_trial, X_world, obs_norm)
|
|
rms_trial = float(np.sqrt(np.mean(r_trial * r_trial))) if r_trial.size else rms
|
|
|
|
if rms_trial < rms:
|
|
theta = theta_trial
|
|
lam *= 0.5
|
|
else:
|
|
lam *= 2.0
|
|
|
|
if np.linalg.norm(delta) < 1e-10:
|
|
break
|
|
if abs(rms - rms_trial) < 1e-12:
|
|
break
|
|
|
|
return theta, history
|
|
|
|
|
|
def pose_covariance(theta: np.ndarray,
|
|
X_world: np.ndarray,
|
|
obs_norm: np.ndarray,
|
|
eps_jac: float = 1e-6) -> Tuple[np.ndarray, float, np.ndarray]:
|
|
"""
|
|
Returns:
|
|
cov_theta_6x6, sigma2, residual_vector
|
|
"""
|
|
J, r = numerical_jacobian(residuals_centers_normalized, theta, eps_jac, X_world, obs_norm)
|
|
m = r.size
|
|
n = theta.size
|
|
dof = max(1, m - n)
|
|
sigma2 = float((r @ r) / dof)
|
|
|
|
JTJ = J.T @ J
|
|
cov = sigma2 * np.linalg.pinv(JTJ)
|
|
return cov.astype(np.float64), sigma2, r
|
|
|
|
|
|
def propagate_covariance(theta: np.ndarray,
|
|
cov_theta: np.ndarray) -> Dict[str, Any]:
|
|
"""
|
|
Propagate pose covariance to camera center and Euler angle uncertainties.
|
|
"""
|
|
def camera_center_fn(th: np.ndarray) -> np.ndarray:
|
|
_, _, c = theta_to_camera_pose(th)
|
|
return c.astype(np.float64)
|
|
|
|
def euler_fn(th: np.ndarray) -> np.ndarray:
|
|
R_wc, _, _ = theta_to_camera_pose(th)
|
|
return np.array(R_to_euler_zyx(R_wc), dtype=np.float64) # deg
|
|
|
|
Jc, _ = numerical_jacobian(lambda th, *_: camera_center_fn(th), theta, 1e-6)
|
|
cov_center = Jc @ cov_theta @ Jc.T
|
|
|
|
Je, _ = numerical_jacobian(lambda th, *_: euler_fn(th), theta, 1e-6)
|
|
cov_euler = Je @ cov_theta @ Je.T
|
|
|
|
center_std_m = np.sqrt(np.maximum(0.0, np.diag(cov_center)))
|
|
euler_std_deg = np.sqrt(np.maximum(0.0, np.diag(cov_euler)))
|
|
|
|
# Parameter std directly from covariance
|
|
param_std = np.sqrt(np.maximum(0.0, np.diag(cov_theta)))
|
|
rvec_std_deg = np.degrees(param_std[0:3])
|
|
tvec_std_m = param_std[3:6]
|
|
|
|
return {
|
|
"pose_covariance_6x6": cov_theta.tolist(),
|
|
"parameter_std": {
|
|
"rvec_std_deg": [float(x) for x in rvec_std_deg],
|
|
"tvec_std_m": [float(x) for x in tvec_std_m],
|
|
},
|
|
"camera_center_std_m": [float(x) for x in center_std_m],
|
|
"camera_center_std_mm": [float(x * 1000.0) for x in center_std_m],
|
|
"orientation_std_deg": {
|
|
"roll": float(euler_std_deg[0]),
|
|
"pitch": float(euler_std_deg[1]),
|
|
"yaw": float(euler_std_deg[2]),
|
|
},
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Marker processing
|
|
# ---------------------------------------------------------------------
|
|
|
|
def build_object_corners_from_world_position(position_m: np.ndarray,
|
|
marker_size_m: float) -> np.ndarray:
|
|
"""
|
|
Marker corners in world coordinates, assuming the marker frame is aligned
|
|
with the world frame and only translated to 'position_m'.
|
|
|
|
This is the direct analogue of readTwoImages.py using marker center positions.
|
|
"""
|
|
h = marker_size_m / 2.0
|
|
local = np.array([
|
|
[-h, h, 0.0],
|
|
[ h, h, 0.0],
|
|
[ h, -h, 0.0],
|
|
[-h, -h, 0.0],
|
|
], dtype=np.float32)
|
|
return local + position_m.reshape(1, 3)
|
|
|
|
|
|
def solve_single_marker_pose(corners_px: np.ndarray,
|
|
K: np.ndarray,
|
|
D: np.ndarray,
|
|
marker_size_m: float) -> Optional[Tuple[np.ndarray, np.ndarray]]:
|
|
obj = marker_local_corners(marker_size_m)
|
|
success, rvec, tvec = cv2.solvePnP(
|
|
obj,
|
|
corners_px.astype(np.float32),
|
|
K,
|
|
D,
|
|
flags=cv2.SOLVEPNP_IPPE_SQUARE
|
|
)
|
|
if not success:
|
|
success, rvec, tvec = cv2.solvePnP(
|
|
obj,
|
|
corners_px.astype(np.float32),
|
|
K,
|
|
D,
|
|
flags=cv2.SOLVEPNP_ITERATIVE
|
|
)
|
|
if not success:
|
|
return None
|
|
return rvec.reshape(3), tvec.reshape(3)
|
|
|
|
|
|
# ---------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Estimate camera pose from ArUco observation JSON")
|
|
parser.add_argument("-i", "--input", required=True, help="*_aruco_detection.json")
|
|
parser.add_argument("-robot", "--robot", required=True, help="robot.json with board markers")
|
|
parser.add_argument("-outDir", "--outDir", default=None, help="Optional output directory")
|
|
parser.add_argument("--minConfidence", type=float, default=0.0,
|
|
help="Skip detections below this confidence")
|
|
parser.add_argument("--minCommonMarkers", type=int, default=3,
|
|
help="Minimum number of world markers required")
|
|
parser.add_argument("--maxRmsPx", type=float, default=None,
|
|
help="Optional soft warning threshold for final reprojection RMS in pixels")
|
|
parser.add_argument("--epsJac", type=float, default=1e-6, help="Finite-difference epsilon")
|
|
parser.add_argument("--refSet", default=None,
|
|
help="Nur Marker dieses Sets als Referenz verwenden (z.B. 'A0', 'rail'). "
|
|
"Leer = alle Marker aus links.Board.")
|
|
args = parser.parse_args()
|
|
|
|
detection_path = resolve_path(args.input)
|
|
robot_path = resolve_path(args.robot)
|
|
|
|
detection = load_json(detection_path)
|
|
marker_lookup = load_marker_lookup(robot_path, ref_set=args.refSet)
|
|
if args.refSet:
|
|
print(f"[INFO] Referenz-Set: '{args.refSet}' → {len(marker_lookup)} Referenz-Marker")
|
|
|
|
K, D = load_intrinsics_from_detection(detection)
|
|
|
|
robot_marker_size = load_robot_marker_size(robot_path)
|
|
det_marker_size = detection.get("vision_config", {}).get("MarkerSize", None)
|
|
if det_marker_size is not None:
|
|
marker_size_m = float(det_marker_size)
|
|
elif robot_marker_size is not None:
|
|
marker_size_m = float(robot_marker_size)
|
|
else:
|
|
marker_size_m = 0.025
|
|
|
|
detections = detection.get("detections", [])
|
|
if not isinstance(detections, list):
|
|
raise TypeError("detection['detections'] must be a list")
|
|
|
|
used_ids: List[int] = []
|
|
used_world_positions: List[np.ndarray] = []
|
|
used_obs_centers_px: List[np.ndarray] = []
|
|
used_obs_centers_norm: List[np.ndarray] = []
|
|
used_marker_cam_centers: List[np.ndarray] = []
|
|
used_marker_meta: List[Dict[str, Any]] = []
|
|
|
|
sanity_notes: List[str] = []
|
|
|
|
for det in detections:
|
|
if det.get("type", "aruco") != "aruco":
|
|
continue
|
|
|
|
marker_id = int(det.get("marker_id", -1))
|
|
if marker_id < 0:
|
|
continue
|
|
|
|
if marker_id not in marker_lookup:
|
|
continue
|
|
|
|
confidence = float(det.get("confidence", 1.0))
|
|
if confidence < args.minConfidence:
|
|
continue
|
|
|
|
corners = det.get("image_points_px", None)
|
|
if corners is None:
|
|
continue
|
|
|
|
corners_px = np.array(corners, dtype=np.float32).reshape(4, 2)
|
|
center_from_corners = corners_px.mean(axis=0)
|
|
|
|
center_px = np.array(det.get("center_px", center_from_corners), dtype=np.float32).reshape(2)
|
|
center_delta = float(np.linalg.norm(center_from_corners - center_px))
|
|
if center_delta > 0.75:
|
|
sanity_notes.append(
|
|
f"marker {marker_id}: center_px differs from corner-mean by {center_delta:.2f}px"
|
|
)
|
|
|
|
pnp = solve_single_marker_pose(corners_px, K, D, marker_size_m)
|
|
if pnp is None:
|
|
continue
|
|
|
|
rvec_m, tvec_m = pnp
|
|
world_pos = marker_lookup[marker_id]["position"].astype(np.float32)
|
|
|
|
used_ids.append(marker_id)
|
|
used_world_positions.append(world_pos)
|
|
used_obs_centers_px.append(center_from_corners.astype(np.float32))
|
|
used_obs_centers_norm.append(undistort_to_normalized(center_from_corners.reshape(1, 2), K, D)[0])
|
|
used_marker_cam_centers.append(tvec_m.astype(np.float32))
|
|
used_marker_meta.append({
|
|
"marker_id": marker_id,
|
|
"confidence": confidence,
|
|
"center_px": [float(center_from_corners[0]), float(center_from_corners[1])],
|
|
"marker_size_m": marker_size_m,
|
|
})
|
|
|
|
# Unique / deduplicate by marker_id while preserving order
|
|
dedup: Dict[int, int] = {}
|
|
uniq_ids: List[int] = []
|
|
uniq_world_positions: List[np.ndarray] = []
|
|
uniq_obs_px: List[np.ndarray] = []
|
|
uniq_obs_norm: List[np.ndarray] = []
|
|
uniq_cam_centers: List[np.ndarray] = []
|
|
uniq_meta: List[Dict[str, Any]] = []
|
|
|
|
for idx, mid in enumerate(used_ids):
|
|
if mid in dedup:
|
|
continue
|
|
dedup[mid] = idx
|
|
uniq_ids.append(mid)
|
|
uniq_world_positions.append(used_world_positions[idx])
|
|
uniq_obs_px.append(used_obs_centers_px[idx])
|
|
uniq_obs_norm.append(used_obs_centers_norm[idx])
|
|
uniq_cam_centers.append(used_marker_cam_centers[idx])
|
|
uniq_meta.append(used_marker_meta[idx])
|
|
|
|
if len(uniq_ids) < args.minCommonMarkers:
|
|
raise RuntimeError(
|
|
f"Need at least {args.minCommonMarkers} common markers; found {len(uniq_ids)}: {uniq_ids}"
|
|
)
|
|
|
|
X_world = np.stack(uniq_world_positions, axis=0).astype(np.float64)
|
|
obs_px = np.stack(uniq_obs_px, axis=0).astype(np.float64)
|
|
obs_norm = np.stack(uniq_obs_norm, axis=0).astype(np.float64)
|
|
marker_cam_centers = np.stack(uniq_cam_centers, axis=0).astype(np.float64)
|
|
|
|
# Initial pose from rigid transform of per-marker camera-frame centers to world positions
|
|
# B ≈ R A + t -> world = R * camera + t
|
|
R_cw_init, t_cw_init = rigid_transform_no_scale(marker_cam_centers, X_world)
|
|
R_wc_init = R_cw_init.T
|
|
t_wc_init = -(R_wc_init @ t_cw_init).reshape(3)
|
|
|
|
omega_init = cv2.Rodrigues(R_wc_init)[0].reshape(3)
|
|
theta0 = pack_params(omega_init, t_wc_init)
|
|
|
|
theta_opt, hist = lm_solve(
|
|
theta0=theta0,
|
|
X_world=X_world,
|
|
obs_norm=obs_norm,
|
|
max_iter=60,
|
|
eps_jac=args.epsJac,
|
|
lambda_init=1e-3,
|
|
)
|
|
|
|
R_wc, t_wc, camera_center_world = theta_to_camera_pose(theta_opt)
|
|
|
|
cov_theta, sigma2, residual_vec = pose_covariance(
|
|
theta_opt, X_world, obs_norm, eps_jac=args.epsJac
|
|
)
|
|
propagated = propagate_covariance(theta_opt, cov_theta)
|
|
|
|
# Exact pixel-space reprojection statistics
|
|
proj_pts, _ = cv2.projectPoints(
|
|
X_world.reshape(-1, 1, 3).astype(np.float32),
|
|
theta_opt[0:3].reshape(3, 1).astype(np.float32),
|
|
theta_opt[3:6].reshape(3, 1).astype(np.float32),
|
|
K,
|
|
D,
|
|
)
|
|
proj_pts = proj_pts.reshape(-1, 2)
|
|
reproj_err_px = np.linalg.norm(proj_pts - obs_px, axis=1)
|
|
rms_px = float(np.sqrt(np.mean(reproj_err_px ** 2))) if reproj_err_px.size else 0.0
|
|
median_px = float(np.median(reproj_err_px)) if reproj_err_px.size else 0.0
|
|
max_px = float(np.max(reproj_err_px)) if reproj_err_px.size else 0.0
|
|
|
|
if args.maxRmsPx is not None and rms_px > args.maxRmsPx:
|
|
print(f"[WARN] Final reprojection RMS is {rms_px:.3f}px (threshold {args.maxRmsPx:.3f}px).")
|
|
|
|
# Convert outputs
|
|
roll, pitch, yaw = R_to_euler_zyx(R_wc)
|
|
position_mm = (camera_center_world * 1000.0).astype(float).tolist()
|
|
|
|
# Reproject each used marker center for QA
|
|
per_marker_results = []
|
|
proj_pts_exact, _ = cv2.projectPoints(
|
|
X_world.reshape(-1, 1, 3).astype(np.float32),
|
|
theta_opt[0:3].reshape(3, 1).astype(np.float32),
|
|
theta_opt[3:6].reshape(3, 1).astype(np.float32),
|
|
K,
|
|
D,
|
|
)
|
|
proj_pts_exact = proj_pts_exact.reshape(-1, 2)
|
|
|
|
for idx, mid in enumerate(uniq_ids):
|
|
x = proj_pts_exact[idx]
|
|
err = float(np.linalg.norm(x - obs_px[idx]))
|
|
per_marker_results.append({
|
|
"marker_id": int(mid),
|
|
"observed_center_px": [float(obs_px[idx, 0]), float(obs_px[idx, 1])],
|
|
"projected_center_px": [float(x[0]), float(x[1])],
|
|
"reprojection_error_px": err,
|
|
"confidence": float(uniq_meta[idx]["confidence"]),
|
|
})
|
|
|
|
# Output directory
|
|
in_base = os.path.splitext(os.path.basename(detection_path))[0]
|
|
out_name = in_base.replace("_aruco_detection", "_camera_pose") + ".json"
|
|
|
|
if args.outDir is not None:
|
|
out_dir = resolve_path(args.outDir)
|
|
else:
|
|
out_dir = os.path.dirname(detection_path) or "."
|
|
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
out_json = os.path.join(out_dir, out_name)
|
|
|
|
output = {
|
|
"schema_version": "1.0",
|
|
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
|
"source": {
|
|
"detection_json": detection_path,
|
|
"robot_json": robot_path,
|
|
},
|
|
"camera": {
|
|
"camera_id": detection.get("camera", {}).get("camera_id", "unknown"),
|
|
"camera_matrix": K.tolist(),
|
|
"distortion_coefficients": D.reshape(-1).tolist(),
|
|
},
|
|
"estimation": {
|
|
"method": "single_camera_marker_center_lm",
|
|
"description": "Rigid init from per-marker pose estimates, followed by LM on normalized marker-center reprojection residuals.",
|
|
"marker_size_m": float(marker_size_m),
|
|
"num_used_markers": int(len(uniq_ids)),
|
|
"used_marker_ids": [int(x) for x in uniq_ids],
|
|
"history": hist,
|
|
"residual_rms_px": float(rms_px),
|
|
"residual_median_px": float(median_px),
|
|
"residual_max_px": float(max_px),
|
|
"sigma2_normalized": float(sigma2),
|
|
},
|
|
"camera_pose": {
|
|
"world_to_camera": {
|
|
"rotation_matrix": R_wc.tolist(),
|
|
"translation_m": [float(x) for x in t_wc.tolist()],
|
|
"rvec_rad": [float(x) for x in theta_opt[0:3].tolist()],
|
|
},
|
|
"camera_in_world": {
|
|
"position_m": [float(x) for x in camera_center_world.tolist()],
|
|
"position_mm": [float(x) for x in position_mm],
|
|
"orientation_deg": {
|
|
"roll": float(roll),
|
|
"pitch": float(pitch),
|
|
"yaw": float(yaw),
|
|
},
|
|
},
|
|
"uncertainty": propagated,
|
|
},
|
|
"observations": {
|
|
"markers": per_marker_results,
|
|
},
|
|
"qa": {
|
|
"sanity_notes": sanity_notes,
|
|
},
|
|
}
|
|
|
|
save_json(out_json, output)
|
|
print(f"[INFO] Saved camera pose JSON: {out_json}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except Exception as exc:
|
|
print(f"[ERROR] {exc}", file=sys.stderr)
|
|
sys.exit(1)
|