Files
appRobotVideoControls/programs/3_fuse_markers_world.py
chk 5a7176920a Phase 1 abgeschlossen: Positionen werden erkannt.
Positionen aus den Merkern heraus erkennbar. Viele Bilder gleichzeitig verarbeitbar.
2026-05-25 22:16:11 +02:00

766 lines
16 KiB
Python

#!/usr/bin/env python3
"""
3_fuse_markers_world.py
PHASE 1B
---------
Fusioniert Marker-Weltkoordinaten aus mehreren Kameras.
EINGABE:
--json *.camera_pose.json (mehrfach möglich)
--robots robot.json
Das Script findet automatisch:
*.aruco_detection.json
Beispiel:
snapshot_video0_1779690911822_aruco_detection.camera_pose.json
->
snapshot_video0_1779690911822_aruco_detection.json
FEATURES:
- mehrere Kameras (2..5)
- automatische Detection-Datei-Erkennung
- bekannte Marker aus robot.json
- unbekannte Marker triangulieren
- gewichtete Marker-Fusion
- Qualitätsmetriken
- CSV Export
- JSON Export
- Kamera Export
- robuste Fehlerbehandlung
- vorbereitet für spätere Rigid-Body Constraints
OUTPUT:
fused_markers.csv
fused_markers.json
Benötigt:
pip install opencv-python numpy
"""
import argparse
import csv
import json
import math
from collections import defaultdict
from pathlib import Path
import cv2
import numpy as np
# ============================================================
# JSON HELPERS
# ============================================================
def load_json(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def save_json(path, data):
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
# ============================================================
# FILE MATCHING
# ============================================================
def find_detection_json(camera_pose_json_path):
"""
Findet automatisch die passende
*_aruco_detection.json Datei.
Beispiel:
INPUT:
snapshot_video0_123_aruco_detection.camera_pose.json
OUTPUT:
snapshot_video0_123_aruco_detection.json
"""
pose_path = Path(camera_pose_json_path)
name = pose_path.name
if not name.endswith(".camera_pose.json"):
raise ValueError(
f"Expected .camera_pose.json: {pose_path}"
)
detection_name = name.replace(
".camera_pose.json",
".json"
)
detection_path = pose_path.with_name(
detection_name
)
if not detection_path.exists():
raise FileNotFoundError(
"Matching detection JSON not found:\n"
f"{detection_path}"
)
return detection_path
# ============================================================
# CAMERA POSE
# ============================================================
def extract_camera_pose(camera_pose_data):
if "camera_pose" not in camera_pose_data:
raise ValueError("camera_pose missing")
pose = camera_pose_data["camera_pose"]
pos = pose["position_m"]
cam_pos = np.array([
pos["x"],
pos["y"],
pos["z"]
], dtype=np.float32)
if "rotation_matrix_world_from_camera" in pose:
R_wc = np.array(
pose["rotation_matrix_world_from_camera"],
dtype=np.float32
)
else:
ori = pose["orientation_deg"]
R_wc = euler_to_rotation_matrix(
ori["roll"],
ori["pitch"],
ori["yaw"]
)
return R_wc, cam_pos
# ============================================================
# INTRINSICS
# ============================================================
def extract_intrinsics(detection_data):
if "camera" not in detection_data:
raise ValueError("camera section missing")
camera = detection_data["camera"]
if "camera_matrix" not in camera:
raise ValueError("camera_matrix missing")
if "distortion_coefficients" not in camera:
raise ValueError("distortion_coefficients missing")
K = np.array(
camera["camera_matrix"],
dtype=np.float32
)
D = np.array(
camera["distortion_coefficients"],
dtype=np.float32
).reshape(-1, 1)
return K, D
# ============================================================
# ROTATION HELPERS
# ============================================================
def euler_to_rotation_matrix(
roll_deg,
pitch_deg,
yaw_deg
):
r = math.radians(roll_deg)
p = math.radians(pitch_deg)
y = math.radians(yaw_deg)
Rx = np.array([
[1, 0, 0],
[0, math.cos(r), -math.sin(r)],
[0, math.sin(r), math.cos(r)]
])
Ry = np.array([
[math.cos(p), 0, math.sin(p)],
[0, 1, 0],
[-math.sin(p), 0, math.cos(p)]
])
Rz = np.array([
[math.cos(y), -math.sin(y), 0],
[math.sin(y), math.cos(y), 0],
[0, 0, 1]
])
return Rz @ Ry @ Rx
# ============================================================
# ROBOT MARKERS
# ============================================================
def build_known_marker_lookup(robot_data):
"""
Nur Marker mit ABSOLUTER Weltposition.
relPos wird in Phase 2 verwendet.
"""
lookup = {}
for marker in robot_data.get("Marker", []):
marker_id = int(marker.get("id", -1))
if marker_id < 0:
continue
if "position" not in marker:
continue
pos = marker["position"]
if pos is None:
continue
if len(pos) != 3:
continue
lookup[marker_id] = np.array(
pos,
dtype=np.float32
)
return lookup
# ============================================================
# MARKER POSE REL CAMERA
# ============================================================
def estimate_marker_pose_camera(
image_points,
marker_size,
K,
D
):
half = marker_size / 2.0
object_points = np.array([
[-half, half, 0],
[half, half, 0],
[half, -half, 0],
[-half, -half, 0]
], dtype=np.float32)
image_points = np.array(
image_points,
dtype=np.float32
)
success, rvec, tvec = cv2.solvePnP(
object_points,
image_points,
K,
D,
flags=cv2.SOLVEPNP_IPPE_SQUARE
)
if not success:
return None
R_mc, _ = cv2.Rodrigues(rvec)
return {
"rvec": rvec,
"tvec": tvec.reshape(3),
"R_mc": R_mc
}
# ============================================================
# MARKER WORLD TRANSFORM
# ============================================================
def marker_world_position(
cam_world_pos,
R_wc,
t_mc
):
"""
Marker Mittelpunkt in Weltkoordinaten.
X_world = R_wc * X_cam + C
"""
return (
R_wc @ t_mc.reshape(3)
) + cam_world_pos
# ============================================================
# WEIGHTING
# ============================================================
def compute_marker_weight(
detection,
camera_pose_data
):
"""
Qualitätsgewicht.
Verwendet:
- confidence
- reprojection RMS
- Bildzentrum
- Markerfläche
- Sharpness
"""
confidence = float(
detection.get("confidence", 0.5)
)
quality = detection.get("quality", {})
area_px = float(
quality.get("area_px", 1000)
)
sharpness = quality.get(
"sharpness", {}
)
lap_var = float(
sharpness.get(
"laplacian_var",
500
)
)
geometry = quality.get(
"geometry", {}
)
dist_center = float(
geometry.get(
"distance_to_center_norm",
0.5
)
)
pose_quality = camera_pose_data.get(
"quality",
{}
)
reproj = float(
pose_quality.get(
"reprojection_rms_px",
10.0
)
)
reproj_weight = 1.0 / (1.0 + reproj)
area_weight = min(
area_px / 2000.0,
1.0
)
sharpness_weight = min(
lap_var / 5000.0,
1.0
)
center_weight = 1.0 - dist_center
weight = (
confidence *
reproj_weight *
area_weight *
sharpness_weight *
center_weight
)
return max(weight, 1e-6)
# ============================================================
# FUSION
# ============================================================
def weighted_average(points, weights):
points = np.array(points)
weights = np.array(weights)
if len(points) == 1:
return points[0]
total_weight = np.sum(weights)
if total_weight < 1e-9:
return np.mean(points, axis=0)
return np.sum(
points * weights[:, None],
axis=0
) / total_weight
# ============================================================
# MAIN
# ============================================================
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--json",
action="append",
required=True,
help="*.camera_pose.json"
)
parser.add_argument(
"--robots",
required=True,
help="robot.json"
)
parser.add_argument(
"--outdir",
default="."
)
args = parser.parse_args()
outdir = Path(args.outdir)
outdir.mkdir(
parents=True,
exist_ok=True
)
# ========================================================
# robot.json
# ========================================================
robot_data = load_json(
args.robots
)
known_markers = build_known_marker_lookup(
robot_data
)
# ========================================================
# globale Observationen
# ========================================================
observations = defaultdict(list)
camera_exports = []
# ========================================================
# alle Kameras
# ========================================================
for json_file in args.json:
print()
print("================================================")
print("LOAD CAMERA")
print("================================================")
print(json_file)
# ----------------------------------------------------
# camera pose json
# ----------------------------------------------------
camera_pose_data = load_json(
json_file
)
# ----------------------------------------------------
# detection json automatisch finden
# ----------------------------------------------------
detection_json = find_detection_json(
json_file
)
print(
f"Detection JSON:\n{detection_json}"
)
detection_data = load_json(
detection_json
)
# ----------------------------------------------------
# intrinsics
# ----------------------------------------------------
K, D = extract_intrinsics(
detection_data
)
# ----------------------------------------------------
# kamerapose
# ----------------------------------------------------
R_wc, cam_world_pos = extract_camera_pose(
camera_pose_data
)
camera_name = Path(
json_file
).stem
# ----------------------------------------------------
# camera export
# ----------------------------------------------------
camera_exports.append({
"camera": camera_name,
"x": float(cam_world_pos[0]),
"y": float(cam_world_pos[1]),
"z": float(cam_world_pos[2])
})
# ----------------------------------------------------
# detections
# ----------------------------------------------------
detections = detection_data.get(
"detections",
[]
)
print(
f"Detections: {len(detections)}"
)
# ----------------------------------------------------
# marker durchlaufen
# ----------------------------------------------------
for det in detections:
marker_id = int(
det["marker_id"]
)
marker_size = float(
det["marker_size_m"]
)
pose = estimate_marker_pose_camera(
det["image_points_px"],
marker_size,
K,
D
)
if pose is None:
continue
world_pos = marker_world_position(
cam_world_pos,
R_wc,
pose["tvec"]
)
weight = compute_marker_weight(
det,
camera_pose_data
)
observations[marker_id].append({
"world_pos": world_pos,
"weight": weight,
"camera": camera_name,
"confidence": float(
det.get("confidence", 0.5)
),
"known_marker": marker_id in known_markers
})
# ========================================================
# fusion
# ========================================================
fused_markers = []
print()
print("================================================")
print("FUSE MARKERS")
print("================================================")
for marker_id, obs_list in observations.items():
points = [
o["world_pos"]
for o in obs_list
]
weights = [
o["weight"]
for o in obs_list
]
fused = weighted_average(
points,
weights
)
spread = 0.0
if len(points) > 1:
dists = [
np.linalg.norm(p - fused)
for p in points
]
spread = float(
np.mean(dists)
)
known = marker_id in known_markers
mean_conf = float(np.mean([
o["confidence"]
for o in obs_list
]))
mean_weight = float(np.mean(weights))
print(
f"Marker {marker_id:3d} | "
f"cams={len(obs_list)} | "
f"spread={spread:.4f}m | "
f"known={known}"
)
fused_markers.append({
"marker_id": marker_id,
"x": float(fused[0]),
"y": float(fused[1]),
"z": float(fused[2]),
"num_cameras": len(obs_list),
"spread_m": spread,
"known_marker": known,
"mean_confidence": mean_conf,
"mean_weight": mean_weight
})
# ========================================================
# CSV EXPORT
# ========================================================
csv_file = outdir / "fused_markers.csv"
with open(
csv_file,
"w",
newline="",
encoding="utf-8"
) as f:
writer = csv.DictWriter(
f,
fieldnames=[
"marker_id",
"x",
"y",
"z",
"num_cameras",
"spread_m",
"known_marker",
"mean_confidence",
"mean_weight"
]
)
writer.writeheader()
for row in fused_markers:
writer.writerow(row)
# ========================================================
# JSON EXPORT
# ========================================================
export_json = {
"fused_markers": fused_markers,
"cameras": camera_exports
}
json_file = outdir / "fused_markers.json"
save_json(
json_file,
export_json
)
# ========================================================
# DONE
# ========================================================
print()
print("================================================")
print("EXPORT")
print("================================================")
print(csv_file)
print(json_file)
print()
# ============================================================
# ENTRY
# ============================================================
if __name__ == "__main__":
main()