1136 lines
38 KiB
Python
1136 lines
38 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
3_fuse_markers_world.py
|
|
|
|
Phase 1:
|
|
- Liest 2 bis 5 JSON-Dateien ein, die jeweils eine Kamera-Pose und ArUco-Detections enthalten.
|
|
- Rekonstruiert für jede Marker-Beobachtung eine Marker-Pose in Weltkoordinaten.
|
|
- Fusiert alle Beobachtungen pro Marker gewichtet.
|
|
- Gibt eine CSV-Liste mit Kameras und Markern aus.
|
|
- Optional wird robots.json nur für Metadaten verwendet (nicht für die Schätzung).
|
|
|
|
Typische Eingabe-Dateien:
|
|
snapshot_video1_1779690911822_aruco_detection.camera_pose.json
|
|
snapshot_video2_1779690911822_aruco_detection.camera_pose.json
|
|
...
|
|
|
|
python 3_fuse_markers_world.py \
|
|
--json snapshot_video1_1779690911822_aruco_detection.camera_pose.json \
|
|
--json snapshot_video2_1779690911822_aruco_detection.camera_pose.json \
|
|
--json snapshot_video3_1779690911822_aruco_detection.camera_pose.json \
|
|
--robots robots.json
|
|
|
|
|
|
Benötigt:
|
|
pip install numpy opencv-python
|
|
|
|
Hinweis:
|
|
- Das Skript nutzt die Kameraposen als fest.
|
|
- Die Markerkoordinaten werden pro Detektion aus SolvePnP bestimmt.
|
|
- Danach werden alle Beobachtungen eines Markers robust gemittelt.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import math
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Sequence, Tuple
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
# ============================================================
|
|
# Datenklassen
|
|
# ============================================================
|
|
|
|
@dataclass
|
|
class CameraPose:
|
|
camera_id: str
|
|
source_file: str
|
|
position_w: np.ndarray # (3,)
|
|
rotation_w_from_c: np.ndarray # (3,3)
|
|
orientation_rpy_deg: Tuple[float, float, float]
|
|
quality: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class MarkerObservation:
|
|
marker_id: int
|
|
camera_id: str
|
|
source_file: str
|
|
image_points_px: np.ndarray # (4,2)
|
|
marker_size_m: float
|
|
confidence: float
|
|
quality_score: float
|
|
camera_weight: float
|
|
observation_weight: float
|
|
rvec_cm: np.ndarray # (3,)
|
|
tvec_cm: np.ndarray # (3,)
|
|
position_w: np.ndarray # (3,)
|
|
rotation_w_from_m: np.ndarray # (3,3)
|
|
reprojection_rms_px: float
|
|
reprojection_max_px: float
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class FusedMarkerEstimate:
|
|
marker_id: int
|
|
position_w: np.ndarray
|
|
rotation_w_from_m: np.ndarray
|
|
orientation_rpy_deg: Tuple[float, float, float]
|
|
observations: List[MarkerObservation]
|
|
num_cameras: int
|
|
camera_ids: List[str]
|
|
weight_sum: float
|
|
position_std_m: float
|
|
angular_std_deg: float
|
|
confidence_mean: float
|
|
confidence_min: float
|
|
confidence_max: float
|
|
quality_mean: float
|
|
quality_min: float
|
|
quality_max: float
|
|
reproj_rms_mean_px: float
|
|
reproj_rms_max_px: float
|
|
reliability_0_1: float
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
# ============================================================
|
|
# Basisfunktionen: Rotationen / Euler / Quaternions
|
|
# ============================================================
|
|
|
|
|
|
def clamp(value: float, lo: float = 0.0, hi: float = 1.0) -> float:
|
|
return float(max(lo, min(hi, value)))
|
|
|
|
|
|
|
|
def normalize_vec(v: np.ndarray, eps: float = 1e-12) -> np.ndarray:
|
|
n = float(np.linalg.norm(v))
|
|
if n < eps:
|
|
return v.astype(np.float64, copy=True)
|
|
return (v / n).astype(np.float64)
|
|
|
|
|
|
|
|
def rotation_matrix_to_euler_zyx(R: np.ndarray) -> Tuple[float, float, float]:
|
|
"""Returns roll, pitch, yaw in degrees.
|
|
|
|
Convention:
|
|
R = Rz(yaw) @ Ry(pitch) @ Rx(roll)
|
|
"""
|
|
yaw = math.degrees(math.atan2(R[1, 0], R[0, 0]))
|
|
sp = math.sqrt(R[2, 1] ** 2 + R[2, 2] ** 2)
|
|
pitch = math.degrees(math.atan2(-R[2, 0], sp))
|
|
roll = math.degrees(math.atan2(R[2, 1], R[2, 2]))
|
|
return float(roll), float(pitch), float(yaw)
|
|
|
|
|
|
|
|
def euler_zyx_to_rotation_matrix(roll_deg: float, pitch_deg: float, yaw_deg: float) -> np.ndarray:
|
|
roll = math.radians(roll_deg)
|
|
pitch = math.radians(pitch_deg)
|
|
yaw = math.radians(yaw_deg)
|
|
|
|
cr, sr = math.cos(roll), math.sin(roll)
|
|
cp, sp = math.cos(pitch), math.sin(pitch)
|
|
cy, sy = math.cos(yaw), math.sin(yaw)
|
|
|
|
Rx = np.array([[1, 0, 0], [0, cr, -sr], [0, sr, cr]], dtype=np.float64)
|
|
Ry = np.array([[cp, 0, sp], [0, 1, 0], [-sp, 0, cp]], dtype=np.float64)
|
|
Rz = np.array([[cy, -sy, 0], [sy, cy, 0], [0, 0, 1]], dtype=np.float64)
|
|
|
|
return (Rz @ Ry @ Rx).astype(np.float64)
|
|
|
|
|
|
|
|
def rotation_matrix_to_quaternion(R: np.ndarray) -> np.ndarray:
|
|
"""Returns quaternion as [w, x, y, z]."""
|
|
m = R.astype(np.float64)
|
|
t = np.trace(m)
|
|
|
|
if t > 0.0:
|
|
s = math.sqrt(t + 1.0) * 2.0
|
|
w = 0.25 * s
|
|
x = (m[2, 1] - m[1, 2]) / s
|
|
y = (m[0, 2] - m[2, 0]) / s
|
|
z = (m[1, 0] - m[0, 1]) / s
|
|
elif m[0, 0] > m[1, 1] and m[0, 0] > m[2, 2]:
|
|
s = math.sqrt(1.0 + m[0, 0] - m[1, 1] - m[2, 2]) * 2.0
|
|
w = (m[2, 1] - m[1, 2]) / s
|
|
x = 0.25 * s
|
|
y = (m[0, 1] + m[1, 0]) / s
|
|
z = (m[0, 2] + m[2, 0]) / s
|
|
elif m[1, 1] > m[2, 2]:
|
|
s = math.sqrt(1.0 + m[1, 1] - m[0, 0] - m[2, 2]) * 2.0
|
|
w = (m[0, 2] - m[2, 0]) / s
|
|
x = (m[0, 1] + m[1, 0]) / s
|
|
y = 0.25 * s
|
|
z = (m[1, 2] + m[2, 1]) / s
|
|
else:
|
|
s = math.sqrt(1.0 + m[2, 2] - m[0, 0] - m[1, 1]) * 2.0
|
|
w = (m[1, 0] - m[0, 1]) / s
|
|
x = (m[0, 2] + m[2, 0]) / s
|
|
y = (m[1, 2] + m[2, 1]) / s
|
|
z = 0.25 * s
|
|
|
|
q = np.array([w, x, y, z], dtype=np.float64)
|
|
return normalize_vec(q)
|
|
|
|
|
|
|
|
def quaternion_to_rotation_matrix(q: np.ndarray) -> np.ndarray:
|
|
q = normalize_vec(q)
|
|
w, x, y, z = q
|
|
|
|
R = np.array([
|
|
[1 - 2 * (y * y + z * z), 2 * (x * y - z * w), 2 * (x * z + y * w)],
|
|
[2 * (x * y + z * w), 1 - 2 * (x * x + z * z), 2 * (y * z - x * w)],
|
|
[2 * (x * z - y * w), 2 * (y * z + x * w), 1 - 2 * (x * x + y * y)],
|
|
], dtype=np.float64)
|
|
return R
|
|
|
|
|
|
|
|
def average_rotations_weighted(rotations: Sequence[np.ndarray], weights: Sequence[float]) -> np.ndarray:
|
|
"""Averages rotations by weighted matrix sum + projection to SO(3)."""
|
|
if not rotations:
|
|
return np.eye(3, dtype=np.float64)
|
|
|
|
M = np.zeros((3, 3), dtype=np.float64)
|
|
for R, w in zip(rotations, weights):
|
|
M += float(w) * R.astype(np.float64)
|
|
|
|
U, _, Vt = np.linalg.svd(M)
|
|
R = U @ Vt
|
|
if np.linalg.det(R) < 0:
|
|
U[:, -1] *= -1
|
|
R = U @ Vt
|
|
return R.astype(np.float64)
|
|
|
|
|
|
|
|
def angular_distance_deg(R_a: np.ndarray, R_b: np.ndarray) -> float:
|
|
"""Smallest angle between two rotations."""
|
|
R = R_a.T @ R_b
|
|
tr = float(np.trace(R))
|
|
cos_theta = clamp((tr - 1.0) / 2.0, -1.0, 1.0)
|
|
return float(math.degrees(math.acos(cos_theta)))
|
|
|
|
|
|
# ============================================================
|
|
# JSON / Metadaten
|
|
# ============================================================
|
|
|
|
|
|
def load_json(path: str) -> Dict[str, Any]:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
|
|
def detect_camera_id(data: Dict[str, Any], source_file: str) -> str:
|
|
camera = data.get("camera", {})
|
|
camera_pose = data.get("camera_pose", {})
|
|
|
|
for key in ("camera_id", "id", "name"):
|
|
if isinstance(camera, dict) and camera.get(key):
|
|
return str(camera[key])
|
|
if isinstance(camera_pose, dict) and camera_pose.get(key):
|
|
return str(camera_pose[key])
|
|
|
|
return Path(source_file).stem
|
|
|
|
|
|
|
|
def extract_camera_pose(data: Dict[str, Any], source_file: str) -> CameraPose:
|
|
camera_id = detect_camera_id(data, source_file)
|
|
|
|
# Camera pose may appear in different layouts.
|
|
pose_block = data.get("camera_pose")
|
|
if pose_block is None:
|
|
pose_block = data.get("camera", {})
|
|
|
|
if not isinstance(pose_block, dict):
|
|
raise ValueError(f"Camera pose block missing or invalid in: {source_file}")
|
|
|
|
quality = pose_block.get("quality", {}) if isinstance(pose_block.get("quality", {}), dict) else {}
|
|
|
|
# Position
|
|
pos = None
|
|
if isinstance(pose_block.get("position_m"), dict):
|
|
pm = pose_block["position_m"]
|
|
pos = np.array([pm.get("x", 0.0), pm.get("y", 0.0), pm.get("z", 0.0)], dtype=np.float64)
|
|
elif isinstance(pose_block.get("position_mm"), (list, tuple)) and len(pose_block["position_mm"]) == 3:
|
|
pos = np.array(pose_block["position_mm"], dtype=np.float64) / 1000.0
|
|
elif isinstance(pose_block.get("position"), (list, tuple)) and len(pose_block["position"]) == 3:
|
|
pos = np.array(pose_block["position"], dtype=np.float64)
|
|
|
|
if pos is None:
|
|
raise ValueError(f"Camera position missing in: {source_file}")
|
|
|
|
# Rotation
|
|
if isinstance(pose_block.get("rotation_matrix_world_from_camera"), (list, tuple)):
|
|
R_wc = np.array(pose_block["rotation_matrix_world_from_camera"], dtype=np.float64)
|
|
elif isinstance(pose_block.get("rotation_matrix"), (list, tuple)):
|
|
R_wc = np.array(pose_block["rotation_matrix"], dtype=np.float64)
|
|
elif isinstance(pose_block.get("orientation_deg"), dict):
|
|
od = pose_block["orientation_deg"]
|
|
roll = float(od.get("roll", 0.0))
|
|
pitch = float(od.get("pitch", 0.0))
|
|
yaw = float(od.get("yaw", 0.0))
|
|
R_wc = euler_zyx_to_rotation_matrix(roll, pitch, yaw)
|
|
else:
|
|
R_wc = np.eye(3, dtype=np.float64)
|
|
|
|
if R_wc.shape != (3, 3):
|
|
raise ValueError(f"Invalid camera rotation matrix in: {source_file}")
|
|
|
|
rpy = rotation_matrix_to_euler_zyx(R_wc)
|
|
|
|
return CameraPose(
|
|
camera_id=camera_id,
|
|
source_file=source_file,
|
|
position_w=pos,
|
|
rotation_w_from_c=R_wc,
|
|
orientation_rpy_deg=rpy,
|
|
quality=quality,
|
|
)
|
|
|
|
|
|
|
|
def load_robot_metadata(robot_path: Optional[str]) -> Dict[int, List[Dict[str, Any]]]:
|
|
"""Optional metadata only. Duplicate IDs are kept as a list."""
|
|
if not robot_path:
|
|
return {}
|
|
|
|
data = load_json(robot_path)
|
|
marker_meta: Dict[int, List[Dict[str, Any]]] = {}
|
|
|
|
for entry in data.get("Marker", []):
|
|
try:
|
|
mid = int(entry.get("id"))
|
|
except Exception:
|
|
continue
|
|
|
|
marker_meta.setdefault(mid, []).append({
|
|
"on": entry.get("on"),
|
|
"position": entry.get("position"),
|
|
"relPos": entry.get("relPos"),
|
|
"name": entry.get("name"),
|
|
"relPosSource": entry.get("relPosSource"),
|
|
})
|
|
|
|
return marker_meta
|
|
|
|
|
|
|
|
def summarize_robot_metadata(marker_id: int, robot_meta: Dict[int, List[Dict[str, Any]]]) -> Dict[str, Any]:
|
|
entries = robot_meta.get(marker_id, [])
|
|
if not entries:
|
|
return {
|
|
"robot_known": False,
|
|
"robot_on": None,
|
|
"robot_positions": None,
|
|
"robot_relpos_count": 0,
|
|
}
|
|
|
|
on_list = []
|
|
abs_pos_list = []
|
|
rel_pos_list = []
|
|
for e in entries:
|
|
if e.get("on") is not None:
|
|
on_list.append(str(e.get("on")))
|
|
if e.get("position") is not None:
|
|
abs_pos_list.append(e.get("position"))
|
|
if e.get("relPos") is not None:
|
|
rel_pos_list.append(e.get("relPos"))
|
|
|
|
return {
|
|
"robot_known": True,
|
|
"robot_on": "|".join(dict.fromkeys(on_list)) if on_list else None,
|
|
"robot_positions": abs_pos_list if abs_pos_list else None,
|
|
"robot_relpos_count": len(rel_pos_list),
|
|
}
|
|
|
|
|
|
# ============================================================
|
|
# Detection / Gewichtung
|
|
# ============================================================
|
|
|
|
|
|
def get_camera_quality_score(camera_pose: CameraPose) -> float:
|
|
q = camera_pose.quality or {}
|
|
|
|
if "pose_quality_score" in q:
|
|
try:
|
|
return clamp(float(q["pose_quality_score"]) / 100.0)
|
|
except Exception:
|
|
pass
|
|
|
|
if "reprojection_rms_px" in q:
|
|
try:
|
|
rms = float(q["reprojection_rms_px"])
|
|
# Heuristik: 0 px -> 1.0, 5 px -> 0.5, 10 px -> ~0.33
|
|
return clamp(1.0 / (1.0 + 0.2 * rms))
|
|
except Exception:
|
|
pass
|
|
|
|
return 1.0
|
|
|
|
|
|
|
|
def get_marker_size_from_detection(det: Dict[str, Any], default_size_m: float) -> float:
|
|
try:
|
|
return float(det.get("marker_size_m", default_size_m))
|
|
except Exception:
|
|
return float(default_size_m)
|
|
|
|
|
|
|
|
def score_from_range(value: Optional[float], low: float, high: float, invert: bool = False) -> float:
|
|
if value is None:
|
|
return 1.0
|
|
if high <= low:
|
|
return 1.0
|
|
t = (float(value) - low) / (high - low)
|
|
t = clamp(t)
|
|
return 1.0 - t if invert else t
|
|
|
|
|
|
|
|
def detection_quality_score(det: Dict[str, Any]) -> float:
|
|
"""Heuristischer Score in [0,1]. Falls Felder fehlen, wird 1.0 angenommen."""
|
|
confidence = float(det.get("confidence", 1.0))
|
|
quality = det.get("quality", {}) if isinstance(det.get("quality", {}), dict) else {}
|
|
|
|
# Subscores
|
|
area_px = quality.get("area_px")
|
|
sharpness = (((quality.get("sharpness") or {}).get("laplacian_var")) if isinstance(quality.get("sharpness"), dict) else None)
|
|
contrast = (((quality.get("contrast") or {}).get("dynamic_range")) if isinstance(quality.get("contrast"), dict) else None)
|
|
dist_border = (((quality.get("geometry") or {}).get("distance_to_border_px")) if isinstance(quality.get("geometry"), dict) else None)
|
|
dist_center = (((quality.get("geometry") or {}).get("distance_to_center_norm")) if isinstance(quality.get("geometry"), dict) else None)
|
|
edge_ratio = quality.get("edge_ratio")
|
|
|
|
area_score = score_from_range(area_px, 150.0, 3000.0)
|
|
sharp_score = score_from_range(sharpness, 500.0, 5000.0)
|
|
contrast_score = score_from_range(contrast, 40.0, 180.0)
|
|
border_score = score_from_range(dist_border, 20.0, 250.0)
|
|
center_score = score_from_range(dist_center, 0.75, 0.0, invert=True) # closer to center -> higher
|
|
|
|
edge_ratio_score = 1.0
|
|
if edge_ratio is not None:
|
|
try:
|
|
edge_ratio_score = clamp(1.0 - min(abs(float(edge_ratio) - 1.0) / 1.5, 1.0))
|
|
except Exception:
|
|
edge_ratio_score = 1.0
|
|
|
|
# Weighted sum, confidence strongest
|
|
score = (
|
|
0.35 * clamp(confidence)
|
|
+ 0.15 * area_score
|
|
+ 0.15 * sharp_score
|
|
+ 0.10 * contrast_score
|
|
+ 0.10 * border_score
|
|
+ 0.10 * center_score
|
|
+ 0.05 * edge_ratio_score
|
|
)
|
|
return clamp(score)
|
|
|
|
|
|
|
|
def marker_frontality_score(rotation_m_from_cam: np.ndarray) -> float:
|
|
"""Marker normal relative to camera optical axis (+Z camera). 1.0 is frontal."""
|
|
normal_cam = rotation_m_from_cam[:, 2]
|
|
# absolute because the sign depends on marker axis convention; we only want frontality
|
|
return clamp(abs(float(normal_cam[2])))
|
|
|
|
|
|
|
|
def compute_observation_weight(camera_pose: CameraPose, det: Dict[str, Any]) -> Tuple[float, float, float]:
|
|
det_score = detection_quality_score(det)
|
|
cam_score = get_camera_quality_score(camera_pose)
|
|
# frontality will be added after pose estimation; here only base score.
|
|
base = clamp(det_score * cam_score)
|
|
return base, det_score, cam_score
|
|
|
|
|
|
# ============================================================
|
|
# Marker Pose Schätzung pro Observation
|
|
# ============================================================
|
|
|
|
|
|
def build_marker_object_points(marker_size_m: float) -> np.ndarray:
|
|
half = marker_size_m / 2.0
|
|
# OpenCV ArUco corner order: TL, TR, BR, BL
|
|
return np.array([
|
|
[-half, half, 0.0],
|
|
[ half, half, 0.0],
|
|
[ half, -half, 0.0],
|
|
[-half, -half, 0.0],
|
|
], dtype=np.float32)
|
|
|
|
|
|
|
|
def solve_marker_pose_in_camera(img_pts: np.ndarray, marker_size_m: float, K: np.ndarray, D: np.ndarray) -> Tuple[np.ndarray, np.ndarray, float, float]:
|
|
obj_pts = build_marker_object_points(marker_size_m)
|
|
|
|
success, rvec, tvec = cv2.solvePnP(
|
|
obj_pts,
|
|
img_pts.astype(np.float32),
|
|
K,
|
|
D,
|
|
flags=cv2.SOLVEPNP_IPPE_SQUARE,
|
|
)
|
|
|
|
if not success:
|
|
success, rvec, tvec = cv2.solvePnP(
|
|
obj_pts,
|
|
img_pts.astype(np.float32),
|
|
K,
|
|
D,
|
|
flags=cv2.SOLVEPNP_ITERATIVE,
|
|
)
|
|
|
|
if not success:
|
|
raise RuntimeError("solvePnP failed for one marker observation")
|
|
|
|
projected, _ = cv2.projectPoints(obj_pts, rvec, tvec, K, D)
|
|
projected = projected.reshape(-1, 2)
|
|
err = np.linalg.norm(projected - img_pts, axis=1)
|
|
|
|
rms = float(np.sqrt(np.mean(err ** 2)))
|
|
mx = float(np.max(err))
|
|
return rvec.reshape(3), tvec.reshape(3), rms, mx
|
|
|
|
|
|
|
|
def observation_to_world(
|
|
camera_pose: CameraPose,
|
|
marker_id: int,
|
|
det: Dict[str, Any],
|
|
K: np.ndarray,
|
|
D: np.ndarray,
|
|
robot_meta: Dict[int, List[Dict[str, Any]]],
|
|
default_marker_size_m: float,
|
|
) -> Optional[MarkerObservation]:
|
|
if det.get("image_points_px") is None:
|
|
return None
|
|
|
|
img_pts = np.array(det["image_points_px"], dtype=np.float32)
|
|
if img_pts.shape != (4, 2):
|
|
return None
|
|
|
|
marker_size_m = get_marker_size_from_detection(det, default_marker_size_m)
|
|
|
|
try:
|
|
rvec_cm, tvec_cm, reproj_rms_px, reproj_max_px = solve_marker_pose_in_camera(img_pts, marker_size_m, K, D)
|
|
except Exception:
|
|
return None
|
|
|
|
R_cm, _ = cv2.Rodrigues(rvec_cm)
|
|
cam_weight_base, det_score, cam_score = compute_observation_weight(camera_pose, det)
|
|
frontality = marker_frontality_score(R_cm)
|
|
|
|
# Final observation weight: base quality * frontality
|
|
obs_weight = clamp(cam_weight_base * (0.25 + 0.75 * frontality))
|
|
if obs_weight <= 1e-9:
|
|
obs_weight = 1e-9
|
|
|
|
# World transform
|
|
R_wc = camera_pose.rotation_w_from_c
|
|
t_wc = camera_pose.position_w
|
|
|
|
position_w = R_wc @ tvec_cm + t_wc
|
|
rotation_w_from_m = R_wc @ R_cm
|
|
|
|
metadata = summarize_robot_metadata(marker_id, robot_meta)
|
|
|
|
return MarkerObservation(
|
|
marker_id=marker_id,
|
|
camera_id=camera_pose.camera_id,
|
|
source_file=camera_pose.source_file,
|
|
image_points_px=img_pts,
|
|
marker_size_m=float(marker_size_m),
|
|
confidence=float(det.get("confidence", 1.0)),
|
|
quality_score=float(det_score),
|
|
camera_weight=float(cam_score),
|
|
observation_weight=float(obs_weight),
|
|
rvec_cm=rvec_cm.astype(np.float64),
|
|
tvec_cm=tvec_cm.astype(np.float64),
|
|
position_w=position_w.astype(np.float64),
|
|
rotation_w_from_m=rotation_w_from_m.astype(np.float64),
|
|
reprojection_rms_px=reproj_rms_px,
|
|
reprojection_max_px=reproj_max_px,
|
|
metadata=metadata,
|
|
)
|
|
|
|
|
|
# ============================================================
|
|
# Fusion pro Marker
|
|
# ============================================================
|
|
|
|
|
|
def fuse_marker_observations(marker_id: int, observations: List[MarkerObservation], robot_meta: Dict[int, List[Dict[str, Any]]]) -> FusedMarkerEstimate:
|
|
if not observations:
|
|
raise ValueError("No observations to fuse")
|
|
|
|
# Optional outlier suppression: one soft reweighting pass based on position and angle residuals.
|
|
pos_stack = np.stack([o.position_w for o in observations], axis=0)
|
|
weights = np.array([o.observation_weight for o in observations], dtype=np.float64)
|
|
weights = np.maximum(weights, 1e-9)
|
|
|
|
# Initial estimate
|
|
pos0 = np.average(pos_stack, axis=0, weights=weights)
|
|
rot0 = average_rotations_weighted([o.rotation_w_from_m for o in observations], weights)
|
|
|
|
# Residual-based robustification
|
|
pos_err = np.linalg.norm(pos_stack - pos0.reshape(1, 3), axis=1)
|
|
ang_err = np.array([angular_distance_deg(rot0, o.rotation_w_from_m) for o in observations], dtype=np.float64)
|
|
|
|
# Huber-like soft downweighting
|
|
pos_scale = max(0.01, float(np.median(pos_err) + 1e-9))
|
|
ang_scale = max(2.0, float(np.median(ang_err) + 1e-9))
|
|
robust = 1.0 / (1.0 + (pos_err / pos_scale) ** 2 + (ang_err / ang_scale) ** 2)
|
|
|
|
final_w = weights * robust
|
|
final_w = np.maximum(final_w, 1e-9)
|
|
|
|
pos = np.average(pos_stack, axis=0, weights=final_w)
|
|
rot = average_rotations_weighted([o.rotation_w_from_m for o in observations], final_w)
|
|
rpy = rotation_matrix_to_euler_zyx(rot)
|
|
|
|
camera_ids = sorted({o.camera_id for o in observations})
|
|
source_files = sorted({Path(o.source_file).name for o in observations})
|
|
|
|
weight_sum = float(np.sum(final_w))
|
|
weight_mean = float(np.mean(final_w))
|
|
|
|
confidence_vals = np.array([o.confidence for o in observations], dtype=np.float64)
|
|
quality_vals = np.array([o.quality_score for o in observations], dtype=np.float64)
|
|
reproj_rms_vals = np.array([o.reprojection_rms_px for o in observations], dtype=np.float64)
|
|
|
|
pos_std_m = float(np.sqrt(np.average(np.sum((pos_stack - pos.reshape(1, 3)) ** 2, axis=1), weights=final_w)))
|
|
angular_std_deg = float(np.sqrt(np.average(np.array([angular_distance_deg(rot, o.rotation_w_from_m) ** 2 for o in observations]), weights=final_w)))
|
|
|
|
# Reliability heuristic in [0,1]
|
|
num_obs = len(observations)
|
|
num_cams = len(camera_ids)
|
|
obs_factor = clamp(num_obs / 5.0)
|
|
cam_factor = clamp(num_cams / 3.0)
|
|
conf_factor = clamp(float(np.mean(confidence_vals)))
|
|
qual_factor = clamp(float(np.mean(quality_vals)))
|
|
spread_factor = clamp(1.0 - min(pos_std_m / 0.08, 1.0))
|
|
reproj_factor = clamp(1.0 - min(float(np.mean(reproj_rms_vals)) / 5.0, 1.0))
|
|
reliability = clamp(
|
|
0.18 * obs_factor +
|
|
0.15 * cam_factor +
|
|
0.22 * conf_factor +
|
|
0.15 * qual_factor +
|
|
0.15 * spread_factor +
|
|
0.15 * reproj_factor
|
|
)
|
|
|
|
metadata = summarize_robot_metadata(marker_id, robot_meta)
|
|
|
|
return FusedMarkerEstimate(
|
|
marker_id=marker_id,
|
|
position_w=pos.astype(np.float64),
|
|
rotation_w_from_m=rot.astype(np.float64),
|
|
orientation_rpy_deg=rpy,
|
|
observations=observations,
|
|
num_cameras=num_cams,
|
|
camera_ids=camera_ids,
|
|
weight_sum=weight_sum,
|
|
position_std_m=pos_std_m,
|
|
angular_std_deg=angular_std_deg,
|
|
confidence_mean=float(np.mean(confidence_vals)),
|
|
confidence_min=float(np.min(confidence_vals)),
|
|
confidence_max=float(np.max(confidence_vals)),
|
|
quality_mean=float(np.mean(quality_vals)),
|
|
quality_min=float(np.min(quality_vals)),
|
|
quality_max=float(np.max(quality_vals)),
|
|
reproj_rms_mean_px=float(np.mean(reproj_rms_vals)),
|
|
reproj_rms_max_px=float(np.max([o.reprojection_max_px for o in observations])),
|
|
reliability_0_1=reliability,
|
|
metadata=metadata,
|
|
)
|
|
|
|
|
|
# ============================================================
|
|
# Input Handling
|
|
# ============================================================
|
|
|
|
|
|
def collect_input_files(args: argparse.Namespace) -> List[str]:
|
|
files: List[str] = []
|
|
|
|
if args.json:
|
|
files.extend(args.json)
|
|
|
|
if args.input_dir and args.timestamp:
|
|
base = Path(args.input_dir)
|
|
ts = str(args.timestamp)
|
|
patterns = [
|
|
f"*_{ts}_aruco_detection.camera_pose.json",
|
|
f"*_{ts}_aruco_detection.json",
|
|
f"*_{ts}.camera_pose.json",
|
|
f"*_{ts}.json",
|
|
]
|
|
for pattern in patterns:
|
|
files.extend(str(p) for p in sorted(base.glob(pattern)))
|
|
|
|
# de-duplicate, keep order
|
|
seen = set()
|
|
uniq = []
|
|
for f in files:
|
|
ff = str(Path(f))
|
|
if ff not in seen:
|
|
seen.add(ff)
|
|
uniq.append(ff)
|
|
|
|
return uniq
|
|
|
|
|
|
def find_detection_json(camera_pose_json_path):
|
|
"""
|
|
Findet automatisch die passende Detection-JSON.
|
|
|
|
Beispiel:
|
|
xxx_aruco_detection.camera_pose.json
|
|
->
|
|
xxx_aruco_detection.json
|
|
"""
|
|
|
|
pose_path = Path(camera_pose_json_path)
|
|
|
|
name = pose_path.name
|
|
|
|
if not name.endswith(".camera_pose.json"):
|
|
raise ValueError(
|
|
f"Not a .camera_pose.json file: {pose_path}"
|
|
)
|
|
|
|
detection_name = name.replace(
|
|
".camera_pose.json",
|
|
".json"
|
|
)
|
|
|
|
detection_path = pose_path.with_name(detection_name)
|
|
|
|
if not detection_path.exists():
|
|
raise FileNotFoundError(
|
|
f"Detection JSON not found: {detection_path}"
|
|
)
|
|
|
|
return detection_path
|
|
|
|
|
|
def load_detection_json(camera_pose_json_path):
|
|
"""
|
|
Lädt automatisch die passende Detection-Datei.
|
|
"""
|
|
|
|
detection_path = find_detection_json(
|
|
camera_pose_json_path
|
|
)
|
|
|
|
with open(detection_path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def extract_intrinsics_from_detection(detection_data):
|
|
|
|
if "camera" not in detection_data:
|
|
raise ValueError("camera section missing")
|
|
|
|
camera = detection_data["camera"]
|
|
|
|
if "camera_matrix" not in camera:
|
|
raise ValueError("camera_matrix missing")
|
|
|
|
if "distortion_coefficients" not in camera:
|
|
raise ValueError("distortion_coefficients missing")
|
|
|
|
K = np.array(
|
|
camera["camera_matrix"],
|
|
dtype=np.float32
|
|
)
|
|
|
|
D = np.array(
|
|
camera["distortion_coefficients"],
|
|
dtype=np.float32
|
|
).reshape(-1, 1)
|
|
|
|
return K, D
|
|
|
|
def extract_default_marker_size(data: Dict[str, Any], robot_data: Dict[str, Any]) -> float:
|
|
# Priority: detection JSON -> robots.json -> fallback 25 mm
|
|
try:
|
|
vc = data.get("vision_config", {})
|
|
if isinstance(vc, dict) and vc.get("MarkerSize") is not None:
|
|
return float(vc["MarkerSize"])
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
vc = robot_data.get("vision_config", {})
|
|
if isinstance(vc, dict) and vc.get("MarkerSize") is not None:
|
|
return float(vc["MarkerSize"])
|
|
except Exception:
|
|
pass
|
|
|
|
return 0.025
|
|
|
|
|
|
# ============================================================
|
|
# CSV / JSON Export
|
|
# ============================================================
|
|
|
|
|
|
def fmt_float(x: Optional[float], nd: int = 6) -> str:
|
|
if x is None:
|
|
return ""
|
|
try:
|
|
if isinstance(x, float) and (math.isnan(x) or math.isinf(x)):
|
|
return ""
|
|
return f"{float(x):.{nd}f}"
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
|
|
def marker_row_from_estimate(est: FusedMarkerEstimate) -> Dict[str, Any]:
|
|
robot_on = est.metadata.get("robot_on") if est.metadata else None
|
|
robot_known = est.metadata.get("robot_known") if est.metadata else None
|
|
robot_relpos_count = est.metadata.get("robot_relpos_count") if est.metadata else 0
|
|
|
|
return {
|
|
"kind": "marker",
|
|
"id": str(est.marker_id),
|
|
"source_file": "",
|
|
"camera_id": "",
|
|
"robot_on": robot_on or "",
|
|
"robot_known": int(bool(robot_known)),
|
|
"robot_relpos_count": int(robot_relpos_count or 0),
|
|
"x_m": est.position_w[0],
|
|
"y_m": est.position_w[1],
|
|
"z_m": est.position_w[2],
|
|
"roll_deg": est.orientation_rpy_deg[0],
|
|
"pitch_deg": est.orientation_rpy_deg[1],
|
|
"yaw_deg": est.orientation_rpy_deg[2],
|
|
"num_observations": len(est.observations),
|
|
"num_cameras": est.num_cameras,
|
|
"cameras_seen_by": "|".join(est.camera_ids),
|
|
"weight_sum": est.weight_sum,
|
|
"weight_mean": est.weight_sum / max(1, len(est.observations)),
|
|
"confidence_mean": est.confidence_mean,
|
|
"confidence_min": est.confidence_min,
|
|
"confidence_max": est.confidence_max,
|
|
"quality_mean": est.quality_mean,
|
|
"quality_min": est.quality_min,
|
|
"quality_max": est.quality_max,
|
|
"reproj_rms_mean_px": est.reproj_rms_mean_px,
|
|
"reproj_rms_max_px": est.reproj_rms_max_px,
|
|
"position_std_m": est.position_std_m,
|
|
"angular_std_deg": est.angular_std_deg,
|
|
"reliability_0_1": est.reliability_0_1,
|
|
"source_files": "|".join(est.metadata.get("source_files", []) if est.metadata else []),
|
|
"note": "",
|
|
}
|
|
|
|
|
|
|
|
def camera_row_from_pose(cam: CameraPose) -> Dict[str, Any]:
|
|
roll, pitch, yaw = cam.orientation_rpy_deg
|
|
return {
|
|
"kind": "camera",
|
|
"id": cam.camera_id,
|
|
"source_file": Path(cam.source_file).name,
|
|
"camera_id": cam.camera_id,
|
|
"robot_on": "",
|
|
"robot_known": 0,
|
|
"robot_relpos_count": 0,
|
|
"x_m": cam.position_w[0],
|
|
"y_m": cam.position_w[1],
|
|
"z_m": cam.position_w[2],
|
|
"roll_deg": roll,
|
|
"pitch_deg": pitch,
|
|
"yaw_deg": yaw,
|
|
"num_observations": 0,
|
|
"num_cameras": 0,
|
|
"cameras_seen_by": cam.camera_id,
|
|
"weight_sum": "",
|
|
"weight_mean": "",
|
|
"confidence_mean": "",
|
|
"confidence_min": "",
|
|
"confidence_max": "",
|
|
"quality_mean": cam.quality.get("pose_quality_score", ""),
|
|
"quality_min": "",
|
|
"quality_max": "",
|
|
"reproj_rms_mean_px": cam.quality.get("reprojection_rms_px", ""),
|
|
"reproj_rms_max_px": cam.quality.get("reprojection_max_px", ""),
|
|
"position_std_m": "",
|
|
"angular_std_deg": "",
|
|
"reliability_0_1": get_camera_quality_score(cam),
|
|
"source_files": Path(cam.source_file).name,
|
|
"note": "",
|
|
}
|
|
|
|
|
|
|
|
def write_csv(rows: List[Dict[str, Any]], out_csv: str) -> None:
|
|
if not rows:
|
|
return
|
|
|
|
fieldnames = [
|
|
"kind", "id", "source_file", "camera_id",
|
|
"robot_on", "robot_known", "robot_relpos_count",
|
|
"x_m", "y_m", "z_m",
|
|
"roll_deg", "pitch_deg", "yaw_deg",
|
|
"num_observations", "num_cameras", "cameras_seen_by",
|
|
"weight_sum", "weight_mean",
|
|
"confidence_mean", "confidence_min", "confidence_max",
|
|
"quality_mean", "quality_min", "quality_max",
|
|
"reproj_rms_mean_px", "reproj_rms_max_px",
|
|
"position_std_m", "angular_std_deg",
|
|
"reliability_0_1", "source_files", "note",
|
|
]
|
|
|
|
with open(out_csv, "w", newline="", encoding="utf-8") as f:
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
for row in rows:
|
|
cleaned = {}
|
|
for key in fieldnames:
|
|
value = row.get(key, "")
|
|
if isinstance(value, np.generic):
|
|
value = value.item()
|
|
cleaned[key] = value
|
|
writer.writerow(cleaned)
|
|
|
|
|
|
|
|
def write_summary_json(summary: Dict[str, Any], out_json: str) -> None:
|
|
with open(out_json, "w", encoding="utf-8") as f:
|
|
json.dump(summary, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
# ============================================================
|
|
# Main Pipeline
|
|
# ============================================================
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Phase 1: multi-camera world marker fusion from camera pose JSONs")
|
|
parser.add_argument("--json", action="append", help="Input JSON file (repeatable). Expected 2 to 5 files.")
|
|
parser.add_argument("--input-dir", type=str, default=None, help="Optional directory to scan")
|
|
parser.add_argument("--timestamp", type=str, default=None, help="Optional timestamp used with --input-dir")
|
|
parser.add_argument("--robots", type=str, default=None, help="Optional robots.json for metadata only")
|
|
parser.add_argument("--out-csv", type=str, default=None, help="Output CSV path")
|
|
parser.add_argument("--out-json", type=str, default=None, help="Optional summary JSON path")
|
|
parser.add_argument("--min-confidence", type=float, default=0.0, help="Filter observations below this confidence")
|
|
parser.add_argument("--verbose", action="store_true", help="Verbose output")
|
|
args = parser.parse_args()
|
|
|
|
files = collect_input_files(args)
|
|
if len(files) < 1:
|
|
raise SystemExit("No input JSON files provided. Use --json and/or --input-dir + --timestamp.")
|
|
if len(files) > 5:
|
|
print(f"[WARN] More than 5 input JSONs found ({len(files)}). All will be processed.")
|
|
elif len(files) < 2:
|
|
print(f"[WARN] Only {len(files)} input JSON file(s) found. The pipeline still runs.")
|
|
|
|
robot_meta = load_robot_metadata(args.robots)
|
|
|
|
# Parse all files first
|
|
cameras: List[CameraPose] = []
|
|
all_observations: List[MarkerObservation] = []
|
|
all_detections_count = 0
|
|
total_rejected = 0
|
|
|
|
for file_path in files:
|
|
data = load_json(file_path)
|
|
cam = extract_camera_pose(data, file_path)
|
|
cameras.append(cam)
|
|
|
|
|
|
detection_data = load_detection_json(json_file)
|
|
|
|
K, D = extract_intrinsics_from_detection(detection_data)
|
|
|
|
|
|
default_marker_size_m = extract_default_marker_size(data, load_json(args.robots) if args.robots else {})
|
|
|
|
detections = data.get("detections", []) if isinstance(data.get("detections", []), list) else []
|
|
all_detections_count += len(detections)
|
|
|
|
if args.verbose:
|
|
print(f"[INFO] {Path(file_path).name}: camera={cam.camera_id}, detections={len(detections)}")
|
|
|
|
for det in detections:
|
|
marker_id = det.get("marker_id")
|
|
if marker_id is None:
|
|
total_rejected += 1
|
|
continue
|
|
|
|
try:
|
|
marker_id_int = int(marker_id)
|
|
except Exception:
|
|
total_rejected += 1
|
|
continue
|
|
|
|
confidence = float(det.get("confidence", 1.0))
|
|
if confidence < args.min_confidence:
|
|
total_rejected += 1
|
|
continue
|
|
|
|
obs = observation_to_world(
|
|
camera_pose=cam,
|
|
marker_id=marker_id_int,
|
|
det=det,
|
|
K=K,
|
|
D=D,
|
|
robot_meta=robot_meta,
|
|
default_marker_size_m=default_marker_size_m,
|
|
)
|
|
if obs is None:
|
|
total_rejected += 1
|
|
continue
|
|
|
|
all_observations.append(obs)
|
|
|
|
# Group observations by marker id
|
|
obs_by_marker: Dict[int, List[MarkerObservation]] = {}
|
|
for obs in all_observations:
|
|
obs_by_marker.setdefault(obs.marker_id, []).append(obs)
|
|
|
|
# Build fused estimates
|
|
fused_markers: List[FusedMarkerEstimate] = []
|
|
for mid in sorted(obs_by_marker.keys()):
|
|
fused = fuse_marker_observations(mid, obs_by_marker[mid], robot_meta)
|
|
# attach source files to metadata for CSV convenience
|
|
fused.metadata = dict(fused.metadata)
|
|
fused.metadata["source_files"] = sorted({Path(o.source_file).name for o in fused.observations})
|
|
fused_markers.append(fused)
|
|
|
|
# CSV rows: cameras + markers
|
|
rows: List[Dict[str, Any]] = []
|
|
for cam in cameras:
|
|
rows.append(camera_row_from_pose(cam))
|
|
for est in fused_markers:
|
|
rows.append(marker_row_from_estimate(est))
|
|
|
|
# Sort: cameras first, then markers by numeric id if possible
|
|
def sort_key(row: Dict[str, Any]):
|
|
kind = row.get("kind", "")
|
|
if kind == "camera":
|
|
return (0, str(row.get("id", "")))
|
|
try:
|
|
mid = int(row.get("id", 10**9))
|
|
except Exception:
|
|
mid = 10**9
|
|
return (1, mid)
|
|
|
|
rows.sort(key=sort_key)
|
|
|
|
# Output paths
|
|
if args.out_csv:
|
|
out_csv = args.out_csv
|
|
else:
|
|
stem = Path(files[0]).stem
|
|
if "camera_pose" in stem:
|
|
stem = stem.replace(".camera_pose", "")
|
|
out_csv = str(Path(files[0]).with_name(f"{stem}_world_markers.csv"))
|
|
|
|
if args.out_json:
|
|
out_json = args.out_json
|
|
else:
|
|
out_json = str(Path(out_csv).with_suffix(".json"))
|
|
|
|
write_csv(rows, out_csv)
|
|
|
|
# Summary JSON: contains the fused marker estimates plus cameras and stats.
|
|
summary = {
|
|
"summary": {
|
|
"input_files": [Path(f).name for f in files],
|
|
"num_files": len(files),
|
|
"num_cameras": len(cameras),
|
|
"num_detections_total": all_detections_count,
|
|
"num_valid_observations": len(all_observations),
|
|
"num_markers_fused": len(fused_markers),
|
|
"num_rejected_detections": total_rejected,
|
|
},
|
|
"cameras": [
|
|
{
|
|
"camera_id": cam.camera_id,
|
|
"source_file": cam.source_file,
|
|
"position_m": cam.position_w.tolist(),
|
|
"rotation_matrix_world_from_camera": cam.rotation_w_from_c.tolist(),
|
|
"orientation_deg": {
|
|
"roll": cam.orientation_rpy_deg[0],
|
|
"pitch": cam.orientation_rpy_deg[1],
|
|
"yaw": cam.orientation_rpy_deg[2],
|
|
},
|
|
"quality": cam.quality,
|
|
}
|
|
for cam in cameras
|
|
],
|
|
"markers": [
|
|
{
|
|
"marker_id": est.marker_id,
|
|
"position_m": est.position_w.tolist(),
|
|
"rotation_matrix_world_from_marker": est.rotation_w_from_m.tolist(),
|
|
"orientation_deg": {
|
|
"roll": est.orientation_rpy_deg[0],
|
|
"pitch": est.orientation_rpy_deg[1],
|
|
"yaw": est.orientation_rpy_deg[2],
|
|
},
|
|
"num_observations": len(est.observations),
|
|
"num_cameras": est.num_cameras,
|
|
"camera_ids": est.camera_ids,
|
|
"weight_sum": est.weight_sum,
|
|
"position_std_m": est.position_std_m,
|
|
"angular_std_deg": est.angular_std_deg,
|
|
"confidence_mean": est.confidence_mean,
|
|
"quality_mean": est.quality_mean,
|
|
"reprojection_rms_mean_px": est.reproj_rms_mean_px,
|
|
"reliability_0_1": est.reliability_0_1,
|
|
"metadata": est.metadata,
|
|
"observations": [
|
|
{
|
|
"camera_id": o.camera_id,
|
|
"source_file": o.source_file,
|
|
"confidence": o.confidence,
|
|
"quality_score": o.quality_score,
|
|
"camera_weight": o.camera_weight,
|
|
"observation_weight": o.observation_weight,
|
|
"position_m": o.position_w.tolist(),
|
|
"rotation_matrix_world_from_marker": o.rotation_w_from_m.tolist(),
|
|
"reprojection_rms_px": o.reprojection_rms_px,
|
|
"reprojection_max_px": o.reprojection_max_px,
|
|
"metadata": o.metadata,
|
|
}
|
|
for o in est.observations
|
|
],
|
|
}
|
|
for est in fused_markers
|
|
],
|
|
}
|
|
write_summary_json(summary, out_json)
|
|
|
|
print(f"[OK] CSV written: {out_csv}")
|
|
print(f"[OK] JSON written: {out_json}")
|
|
print(f"[OK] Cameras: {len(cameras)}, markers fused: {len(fused_markers)}, observations: {len(all_observations)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|