766 lines
16 KiB
Python
766 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
3_fuse_markers_world.py
|
|
|
|
PHASE 1B
|
|
---------
|
|
|
|
Fusioniert Marker-Weltkoordinaten aus mehreren Kameras.
|
|
|
|
EINGABE:
|
|
--json *.camera_pose.json (mehrfach möglich)
|
|
--robots robot.json
|
|
|
|
Das Script findet automatisch:
|
|
*.aruco_detection.json
|
|
|
|
Beispiel:
|
|
snapshot_video0_1779690911822_aruco_detection.camera_pose.json
|
|
|
|
->
|
|
|
|
snapshot_video0_1779690911822_aruco_detection.json
|
|
|
|
FEATURES:
|
|
- mehrere Kameras (2..5)
|
|
- automatische Detection-Datei-Erkennung
|
|
- bekannte Marker aus robot.json
|
|
- unbekannte Marker triangulieren
|
|
- gewichtete Marker-Fusion
|
|
- Qualitätsmetriken
|
|
- CSV Export
|
|
- JSON Export
|
|
- Kamera Export
|
|
- robuste Fehlerbehandlung
|
|
- vorbereitet für spätere Rigid-Body Constraints
|
|
|
|
OUTPUT:
|
|
fused_markers.csv
|
|
fused_markers.json
|
|
|
|
Benötigt:
|
|
pip install opencv-python numpy
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import math
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
|
|
# ============================================================
|
|
# JSON HELPERS
|
|
# ============================================================
|
|
|
|
|
|
def load_json(path):
|
|
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
|
|
def save_json(path, data):
|
|
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2)
|
|
|
|
|
|
# ============================================================
|
|
# FILE MATCHING
|
|
# ============================================================
|
|
|
|
|
|
def find_detection_json(camera_pose_json_path):
|
|
"""
|
|
Findet automatisch die passende
|
|
*_aruco_detection.json Datei.
|
|
|
|
Beispiel:
|
|
|
|
INPUT:
|
|
snapshot_video0_123_aruco_detection.camera_pose.json
|
|
|
|
OUTPUT:
|
|
snapshot_video0_123_aruco_detection.json
|
|
"""
|
|
|
|
pose_path = Path(camera_pose_json_path)
|
|
|
|
name = pose_path.name
|
|
|
|
if not name.endswith(".camera_pose.json"):
|
|
raise ValueError(
|
|
f"Expected .camera_pose.json: {pose_path}"
|
|
)
|
|
|
|
detection_name = name.replace(
|
|
".camera_pose.json",
|
|
".json"
|
|
)
|
|
|
|
detection_path = pose_path.with_name(
|
|
detection_name
|
|
)
|
|
|
|
if not detection_path.exists():
|
|
|
|
raise FileNotFoundError(
|
|
"Matching detection JSON not found:\n"
|
|
f"{detection_path}"
|
|
)
|
|
|
|
return detection_path
|
|
|
|
|
|
# ============================================================
|
|
# CAMERA POSE
|
|
# ============================================================
|
|
|
|
|
|
def extract_camera_pose(camera_pose_data):
|
|
|
|
if "camera_pose" not in camera_pose_data:
|
|
raise ValueError("camera_pose missing")
|
|
|
|
pose = camera_pose_data["camera_pose"]
|
|
|
|
pos = pose["position_m"]
|
|
|
|
cam_pos = np.array([
|
|
pos["x"],
|
|
pos["y"],
|
|
pos["z"]
|
|
], dtype=np.float32)
|
|
|
|
if "rotation_matrix_world_from_camera" in pose:
|
|
|
|
R_wc = np.array(
|
|
pose["rotation_matrix_world_from_camera"],
|
|
dtype=np.float32
|
|
)
|
|
|
|
else:
|
|
|
|
ori = pose["orientation_deg"]
|
|
|
|
R_wc = euler_to_rotation_matrix(
|
|
ori["roll"],
|
|
ori["pitch"],
|
|
ori["yaw"]
|
|
)
|
|
|
|
return R_wc, cam_pos
|
|
|
|
|
|
# ============================================================
|
|
# INTRINSICS
|
|
# ============================================================
|
|
|
|
|
|
def extract_intrinsics(detection_data):
|
|
|
|
if "camera" not in detection_data:
|
|
raise ValueError("camera section missing")
|
|
|
|
camera = detection_data["camera"]
|
|
|
|
if "camera_matrix" not in camera:
|
|
raise ValueError("camera_matrix missing")
|
|
|
|
if "distortion_coefficients" not in camera:
|
|
raise ValueError("distortion_coefficients missing")
|
|
|
|
K = np.array(
|
|
camera["camera_matrix"],
|
|
dtype=np.float32
|
|
)
|
|
|
|
D = np.array(
|
|
camera["distortion_coefficients"],
|
|
dtype=np.float32
|
|
).reshape(-1, 1)
|
|
|
|
return K, D
|
|
|
|
|
|
# ============================================================
|
|
# ROTATION HELPERS
|
|
# ============================================================
|
|
|
|
|
|
def euler_to_rotation_matrix(
|
|
roll_deg,
|
|
pitch_deg,
|
|
yaw_deg
|
|
):
|
|
|
|
r = math.radians(roll_deg)
|
|
p = math.radians(pitch_deg)
|
|
y = math.radians(yaw_deg)
|
|
|
|
Rx = np.array([
|
|
[1, 0, 0],
|
|
[0, math.cos(r), -math.sin(r)],
|
|
[0, math.sin(r), math.cos(r)]
|
|
])
|
|
|
|
Ry = np.array([
|
|
[math.cos(p), 0, math.sin(p)],
|
|
[0, 1, 0],
|
|
[-math.sin(p), 0, math.cos(p)]
|
|
])
|
|
|
|
Rz = np.array([
|
|
[math.cos(y), -math.sin(y), 0],
|
|
[math.sin(y), math.cos(y), 0],
|
|
[0, 0, 1]
|
|
])
|
|
|
|
return Rz @ Ry @ Rx
|
|
|
|
|
|
# ============================================================
|
|
# ROBOT MARKERS
|
|
# ============================================================
|
|
|
|
|
|
def build_known_marker_lookup(robot_data):
|
|
"""
|
|
Nur Marker mit ABSOLUTER Weltposition.
|
|
|
|
relPos wird in Phase 2 verwendet.
|
|
"""
|
|
|
|
lookup = {}
|
|
|
|
for marker in robot_data.get("Marker", []):
|
|
|
|
marker_id = int(marker.get("id", -1))
|
|
|
|
if marker_id < 0:
|
|
continue
|
|
|
|
if "position" not in marker:
|
|
continue
|
|
|
|
pos = marker["position"]
|
|
|
|
if pos is None:
|
|
continue
|
|
|
|
if len(pos) != 3:
|
|
continue
|
|
|
|
lookup[marker_id] = np.array(
|
|
pos,
|
|
dtype=np.float32
|
|
)
|
|
|
|
return lookup
|
|
|
|
|
|
# ============================================================
|
|
# MARKER POSE REL CAMERA
|
|
# ============================================================
|
|
|
|
|
|
def estimate_marker_pose_camera(
|
|
image_points,
|
|
marker_size,
|
|
K,
|
|
D
|
|
):
|
|
|
|
half = marker_size / 2.0
|
|
|
|
object_points = np.array([
|
|
[-half, half, 0],
|
|
[half, half, 0],
|
|
[half, -half, 0],
|
|
[-half, -half, 0]
|
|
], dtype=np.float32)
|
|
|
|
image_points = np.array(
|
|
image_points,
|
|
dtype=np.float32
|
|
)
|
|
|
|
success, rvec, tvec = cv2.solvePnP(
|
|
object_points,
|
|
image_points,
|
|
K,
|
|
D,
|
|
flags=cv2.SOLVEPNP_IPPE_SQUARE
|
|
)
|
|
|
|
if not success:
|
|
return None
|
|
|
|
R_mc, _ = cv2.Rodrigues(rvec)
|
|
|
|
return {
|
|
"rvec": rvec,
|
|
"tvec": tvec.reshape(3),
|
|
"R_mc": R_mc
|
|
}
|
|
|
|
|
|
# ============================================================
|
|
# MARKER WORLD TRANSFORM
|
|
# ============================================================
|
|
|
|
|
|
def marker_world_position(
|
|
cam_world_pos,
|
|
R_wc,
|
|
t_mc
|
|
):
|
|
"""
|
|
Marker Mittelpunkt in Weltkoordinaten.
|
|
|
|
X_world = R_wc * X_cam + C
|
|
"""
|
|
|
|
return (
|
|
R_wc @ t_mc.reshape(3)
|
|
) + cam_world_pos
|
|
|
|
|
|
# ============================================================
|
|
# WEIGHTING
|
|
# ============================================================
|
|
|
|
|
|
def compute_marker_weight(
|
|
detection,
|
|
camera_pose_data
|
|
):
|
|
"""
|
|
Qualitätsgewicht.
|
|
|
|
Verwendet:
|
|
- confidence
|
|
- reprojection RMS
|
|
- Bildzentrum
|
|
- Markerfläche
|
|
- Sharpness
|
|
"""
|
|
|
|
confidence = float(
|
|
detection.get("confidence", 0.5)
|
|
)
|
|
|
|
quality = detection.get("quality", {})
|
|
|
|
area_px = float(
|
|
quality.get("area_px", 1000)
|
|
)
|
|
|
|
sharpness = quality.get(
|
|
"sharpness", {}
|
|
)
|
|
|
|
lap_var = float(
|
|
sharpness.get(
|
|
"laplacian_var",
|
|
500
|
|
)
|
|
)
|
|
|
|
geometry = quality.get(
|
|
"geometry", {}
|
|
)
|
|
|
|
dist_center = float(
|
|
geometry.get(
|
|
"distance_to_center_norm",
|
|
0.5
|
|
)
|
|
)
|
|
|
|
pose_quality = camera_pose_data.get(
|
|
"quality",
|
|
{}
|
|
)
|
|
|
|
reproj = float(
|
|
pose_quality.get(
|
|
"reprojection_rms_px",
|
|
10.0
|
|
)
|
|
)
|
|
|
|
reproj_weight = 1.0 / (1.0 + reproj)
|
|
|
|
area_weight = min(
|
|
area_px / 2000.0,
|
|
1.0
|
|
)
|
|
|
|
sharpness_weight = min(
|
|
lap_var / 5000.0,
|
|
1.0
|
|
)
|
|
|
|
center_weight = 1.0 - dist_center
|
|
|
|
weight = (
|
|
confidence *
|
|
reproj_weight *
|
|
area_weight *
|
|
sharpness_weight *
|
|
center_weight
|
|
)
|
|
|
|
return max(weight, 1e-6)
|
|
|
|
|
|
# ============================================================
|
|
# FUSION
|
|
# ============================================================
|
|
|
|
|
|
def weighted_average(points, weights):
|
|
|
|
points = np.array(points)
|
|
weights = np.array(weights)
|
|
|
|
if len(points) == 1:
|
|
return points[0]
|
|
|
|
total_weight = np.sum(weights)
|
|
|
|
if total_weight < 1e-9:
|
|
return np.mean(points, axis=0)
|
|
|
|
return np.sum(
|
|
points * weights[:, None],
|
|
axis=0
|
|
) / total_weight
|
|
|
|
|
|
# ============================================================
|
|
# MAIN
|
|
# ============================================================
|
|
|
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument(
|
|
"--json",
|
|
action="append",
|
|
required=True,
|
|
help="*.camera_pose.json"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--robots",
|
|
required=True,
|
|
help="robot.json"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--outdir",
|
|
default="."
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
outdir = Path(args.outdir)
|
|
|
|
outdir.mkdir(
|
|
parents=True,
|
|
exist_ok=True
|
|
)
|
|
|
|
# ========================================================
|
|
# robot.json
|
|
# ========================================================
|
|
|
|
robot_data = load_json(
|
|
args.robots
|
|
)
|
|
|
|
known_markers = build_known_marker_lookup(
|
|
robot_data
|
|
)
|
|
|
|
# ========================================================
|
|
# globale Observationen
|
|
# ========================================================
|
|
|
|
observations = defaultdict(list)
|
|
|
|
camera_exports = []
|
|
|
|
# ========================================================
|
|
# alle Kameras
|
|
# ========================================================
|
|
|
|
for json_file in args.json:
|
|
|
|
print()
|
|
print("================================================")
|
|
print("LOAD CAMERA")
|
|
print("================================================")
|
|
print(json_file)
|
|
|
|
# ----------------------------------------------------
|
|
# camera pose json
|
|
# ----------------------------------------------------
|
|
|
|
camera_pose_data = load_json(
|
|
json_file
|
|
)
|
|
|
|
# ----------------------------------------------------
|
|
# detection json automatisch finden
|
|
# ----------------------------------------------------
|
|
|
|
detection_json = find_detection_json(
|
|
json_file
|
|
)
|
|
|
|
print(
|
|
f"Detection JSON:\n{detection_json}"
|
|
)
|
|
|
|
detection_data = load_json(
|
|
detection_json
|
|
)
|
|
|
|
# ----------------------------------------------------
|
|
# intrinsics
|
|
# ----------------------------------------------------
|
|
|
|
K, D = extract_intrinsics(
|
|
detection_data
|
|
)
|
|
|
|
# ----------------------------------------------------
|
|
# kamerapose
|
|
# ----------------------------------------------------
|
|
|
|
R_wc, cam_world_pos = extract_camera_pose(
|
|
camera_pose_data
|
|
)
|
|
|
|
camera_name = Path(
|
|
json_file
|
|
).stem
|
|
|
|
# ----------------------------------------------------
|
|
# camera export
|
|
# ----------------------------------------------------
|
|
|
|
camera_exports.append({
|
|
"camera": camera_name,
|
|
"x": float(cam_world_pos[0]),
|
|
"y": float(cam_world_pos[1]),
|
|
"z": float(cam_world_pos[2])
|
|
})
|
|
|
|
# ----------------------------------------------------
|
|
# detections
|
|
# ----------------------------------------------------
|
|
|
|
detections = detection_data.get(
|
|
"detections",
|
|
[]
|
|
)
|
|
|
|
print(
|
|
f"Detections: {len(detections)}"
|
|
)
|
|
|
|
# ----------------------------------------------------
|
|
# marker durchlaufen
|
|
# ----------------------------------------------------
|
|
|
|
for det in detections:
|
|
|
|
marker_id = int(
|
|
det["marker_id"]
|
|
)
|
|
|
|
marker_size = float(
|
|
det["marker_size_m"]
|
|
)
|
|
|
|
pose = estimate_marker_pose_camera(
|
|
det["image_points_px"],
|
|
marker_size,
|
|
K,
|
|
D
|
|
)
|
|
|
|
if pose is None:
|
|
continue
|
|
|
|
world_pos = marker_world_position(
|
|
cam_world_pos,
|
|
R_wc,
|
|
pose["tvec"]
|
|
)
|
|
|
|
weight = compute_marker_weight(
|
|
det,
|
|
camera_pose_data
|
|
)
|
|
|
|
observations[marker_id].append({
|
|
"world_pos": world_pos,
|
|
"weight": weight,
|
|
"camera": camera_name,
|
|
"confidence": float(
|
|
det.get("confidence", 0.5)
|
|
),
|
|
"known_marker": marker_id in known_markers
|
|
})
|
|
|
|
# ========================================================
|
|
# fusion
|
|
# ========================================================
|
|
|
|
fused_markers = []
|
|
|
|
print()
|
|
print("================================================")
|
|
print("FUSE MARKERS")
|
|
print("================================================")
|
|
|
|
for marker_id, obs_list in observations.items():
|
|
|
|
points = [
|
|
o["world_pos"]
|
|
for o in obs_list
|
|
]
|
|
|
|
weights = [
|
|
o["weight"]
|
|
for o in obs_list
|
|
]
|
|
|
|
fused = weighted_average(
|
|
points,
|
|
weights
|
|
)
|
|
|
|
spread = 0.0
|
|
|
|
if len(points) > 1:
|
|
|
|
dists = [
|
|
np.linalg.norm(p - fused)
|
|
for p in points
|
|
]
|
|
|
|
spread = float(
|
|
np.mean(dists)
|
|
)
|
|
|
|
known = marker_id in known_markers
|
|
|
|
mean_conf = float(np.mean([
|
|
o["confidence"]
|
|
for o in obs_list
|
|
]))
|
|
|
|
mean_weight = float(np.mean(weights))
|
|
|
|
print(
|
|
f"Marker {marker_id:3d} | "
|
|
f"cams={len(obs_list)} | "
|
|
f"spread={spread:.4f}m | "
|
|
f"known={known}"
|
|
)
|
|
|
|
fused_markers.append({
|
|
"marker_id": marker_id,
|
|
"x": float(fused[0]),
|
|
"y": float(fused[1]),
|
|
"z": float(fused[2]),
|
|
"num_cameras": len(obs_list),
|
|
"spread_m": spread,
|
|
"known_marker": known,
|
|
"mean_confidence": mean_conf,
|
|
"mean_weight": mean_weight
|
|
})
|
|
|
|
# ========================================================
|
|
# CSV EXPORT
|
|
# ========================================================
|
|
|
|
csv_file = outdir / "fused_markers.csv"
|
|
|
|
with open(
|
|
csv_file,
|
|
"w",
|
|
newline="",
|
|
encoding="utf-8"
|
|
) as f:
|
|
|
|
writer = csv.DictWriter(
|
|
f,
|
|
fieldnames=[
|
|
"marker_id",
|
|
"x",
|
|
"y",
|
|
"z",
|
|
"num_cameras",
|
|
"spread_m",
|
|
"known_marker",
|
|
"mean_confidence",
|
|
"mean_weight"
|
|
]
|
|
)
|
|
|
|
writer.writeheader()
|
|
|
|
for row in fused_markers:
|
|
writer.writerow(row)
|
|
|
|
# ========================================================
|
|
# JSON EXPORT
|
|
# ========================================================
|
|
|
|
export_json = {
|
|
"fused_markers": fused_markers,
|
|
"cameras": camera_exports
|
|
}
|
|
|
|
json_file = outdir / "fused_markers.json"
|
|
|
|
save_json(
|
|
json_file,
|
|
export_json
|
|
)
|
|
|
|
# ========================================================
|
|
# DONE
|
|
# ========================================================
|
|
|
|
print()
|
|
print("================================================")
|
|
print("EXPORT")
|
|
print("================================================")
|
|
print(csv_file)
|
|
print(json_file)
|
|
print()
|
|
|
|
|
|
# ============================================================
|
|
# ENTRY
|
|
# ============================================================
|
|
|
|
if __name__ == "__main__":
|
|
main()
|