diff --git a/documentation/Phase1.aux b/documentation/Phase1.aux new file mode 100644 index 0000000..6ec367e --- /dev/null +++ b/documentation/Phase1.aux @@ -0,0 +1,14 @@ +\relax +\@writefile{toc}{\contentsline {section}{\numberline {1}Ziel}{1}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {2}Koordinatensysteme}{1}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {3}solvePnP}{1}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {4}Kamerapose}{2}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {5}Markerposition in Weltkoordinaten}{2}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {6}Markerrotation in Weltkoordinaten}{2}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {7}Gewichtung der Beobachtungen}{3}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {8}Gewichtete Positionsfusion}{3}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {9}Rotationsfusion}{3}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {10}Qualitätsmetriken}{3}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {11}Rigid-Body Erweiterung}{4}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {12}Spätere Erweiterungen}{4}{}\protected@file@percent } +\gdef \@abspage@last{4} diff --git a/documentation/Phase1.pdf b/documentation/Phase1.pdf new file mode 100644 index 0000000..057cc98 Binary files /dev/null and b/documentation/Phase1.pdf differ diff --git a/documentation/Phase1.tex b/documentation/Phase1.tex new file mode 100644 index 0000000..2a93e2c --- /dev/null +++ b/documentation/Phase1.tex @@ -0,0 +1,248 @@ +\documentclass[a4paper,11pt]{article} + +\usepackage[utf8]{inputenc} +\usepackage{amsmath} +\usepackage{amssymb} +\usepackage{geometry} +\geometry{margin=2.5cm} + +\title{Mathematische Beschreibung von \texttt{3\_fuse\_markers\_world.py}} +\author{} +\date{} + +\begin{document} + +\maketitle + +\section{Ziel} + +Das Script fusioniert mehrere ArUco-Detektionen aus mehreren Kameras zu einem gemeinsamen Weltmodell. + +Gegeben sind: + +\begin{itemize} +\item Kameraposen im Weltkoordinatensystem +\item 2D Marker-Detektionen pro Kamera +\item Kameraintrinsics +\item Markergröße +\end{itemize} + +Gesucht sind: + +\begin{itemize} +\item Weltposition aller Marker +\item Markerorientierungen +\item Qualitätsmetriken +\end{itemize} + +\section{Koordinatensysteme} + +Verwendete Systeme: + +\begin{itemize} +\item Weltkoordinatensystem $W$ +\item Kamerakoordinatensystem $C$ +\item Markerkoordinatensystem $M$ +\end{itemize} + +\section{solvePnP} + +Für jeden Marker wird mittels OpenCV solvePnP berechnet: + +\[ +X_C = R_{CM} X_M + t_{CM} +\] + +Dabei gilt: + +\begin{itemize} +\item $R_{CM}$ = Rotation Marker $\rightarrow$ Kamera +\item $t_{CM}$ = Translation Marker $\rightarrow$ Kamera +\end{itemize} + +\section{Kamerapose} + +Aus der vorher berechneten Kamerapose: + +\[ +X_W = R_{WC} X_C + t_{WC} +\] + +mit: + +\[ +t_{WC} = C_W +\] + +(Kameraposition in Weltkoordinaten) + +\section{Markerposition in Weltkoordinaten} + +Markerzentrum: + +\[ +p_M = +\begin{bmatrix} +0 \\ +0 \\ +0 +\end{bmatrix} +\] + +Markerposition im Kamerasystem: + +\[ +p_C = R_{CM} p_M + t_{CM} +\] + +Da $p_M = 0$: + +\[ +p_C = t_{CM} +\] + +Transformation ins Weltkoordinatensystem: + +\[ +p_W = R_{WC} p_C + t_{WC} +\] + +Somit: + +\[ +p_W = R_{WC} t_{CM} + t_{WC} +\] + +\section{Markerrotation in Weltkoordinaten} + +Die Markerrotation relativ zur Kamera: + +\[ +R_{CM} +\] + +Die Kamerarotation relativ zur Welt: + +\[ +R_{WC} +\] + +Markerrotation relativ zur Welt: + +\[ +R_{WM} = R_{WC} R_{CM} +\] + +Dies ist die zentrale Rotationsgleichung des Scripts. + +\section{Gewichtung der Beobachtungen} + +Mehrere Kameras können denselben Marker beobachten. + +Für jede Beobachtung wird ein Gewicht berechnet: + +\[ +w_i = +w_{\text{confidence}} +\cdot +w_{\text{area}} +\cdot +w_{\text{view}} +\cdot +w_{\text{reprojection}} +\] + +Typische Faktoren: + +\begin{itemize} +\item Marker Confidence +\item Markergröße in Pixel +\item Sichtwinkel +\item Distanz zum Bildrand +\item Reprojektionsfehler +\end{itemize} + +\section{Gewichtete Positionsfusion} + +Die endgültige Markerposition: + +\[ +p = +\frac{ +\sum_i w_i p_i +}{ +\sum_i w_i +} +\] + +Dies entspricht einem gewichteten Mittelwert. + +\section{Rotationsfusion} + +Rotationen werden gesammelt: + +\[ +R_1, R_2, ..., R_n +\] + +Eine einfache erste Näherung: + +\begin{itemize} +\item Eulerwinkel mitteln +\item oder Quaternionen mitteln +\end{itemize} + +Später empfohlen: + +\begin{itemize} +\item SVD-basierte Rotationsmittelung +\item Lie-Group Mittelung auf $SO(3)$ +\end{itemize} + +\section{Qualitätsmetriken} + +Das Script berechnet: + +\begin{itemize} +\item Anzahl beobachtender Kameras +\item Positionsstreuung +\item Reprojektionsfehler +\item Gesamtgewicht +\item Sichtwinkel +\end{itemize} + +Beispiel: + +\[ +\sigma = +\sqrt{ +\frac{1}{N} +\sum_i ||p_i - \bar{p}||^2 +} +\] + +\section{Rigid-Body Erweiterung} + +Später können Marker über bekannte Relativpositionen gekoppelt werden. + +Beispiel: + +\[ +p_{M2} = p_{M1} + R_{Body} \Delta p +\] + +Dadurch können Marker rekonstruiert werden, selbst wenn sie nicht direkt sichtbar sind. + +\section{Spätere Erweiterungen} + +Geplant: + +\begin{itemize} +\item Bundle Adjustment +\item Kinematic Constraints +\item Joint Solver +\item Graph Optimization +\item Temporal Tracking +\end{itemize} + +\end{document} \ No newline at end of file diff --git a/documentation/Phase1_Phase2.aux b/documentation/Phase1_Phase2.aux new file mode 100644 index 0000000..88c61cc --- /dev/null +++ b/documentation/Phase1_Phase2.aux @@ -0,0 +1,28 @@ +\relax +\@writefile{toc}{\contentsline {section}{\numberline {1}Ziel}{1}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {2}Ausgangslage}{1}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {3}Grundidee}{2}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {4}Vorteil}{2}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {5}Erwartete Verbesserungen}{3}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsection}{\numberline {5.1}Stabilität}{3}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsection}{\numberline {5.2}Konsistenz}{3}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsection}{\numberline {5.3}Multi-Camera-Verkettung}{3}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {6}Benötigte Daten}{4}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsection}{\numberline {6.1}Bereits vorhanden}{4}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsubsection}{\numberline {6.1.1}Absolute Marker}{4}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsubsection}{\numberline {6.1.2}Relative Markerpositionen}{4}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsubsection}{\numberline {6.1.3}Body-Zuordnung}{4}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsubsection}{\numberline {6.1.4}Gelenke}{4}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {7}Noch fehlende Daten}{4}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsection}{\numberline {7.1}Marker-Orientierung relativ zum Body}{4}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {8}Geplante Solver-Strategie}{5}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsection}{\numberline {8.1}Phase 2A --- Rigid Body Fit}{5}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsection}{\numberline {8.2}Phase 2B --- Joint Constraints}{5}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsection}{\numberline {8.3}Phase 2C --- Global Optimization}{5}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {9}Wichtige Architekturentscheidung}{6}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {10}Geplante Datenstruktur}{6}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsection}{\numberline {10.1}Weltpose eines Körpers}{6}{}\protected@file@percent } +\@writefile{toc}{\contentsline {subsection}{\numberline {10.2}Relative Markerdefinition}{6}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {11}Phase 1 Reminder}{6}{}\protected@file@percent } +\@writefile{toc}{\contentsline {section}{\numberline {12}Zielbild}{7}{}\protected@file@percent } +\gdef \@abspage@last{7} diff --git a/documentation/Phase1_Phase2.pdf b/documentation/Phase1_Phase2.pdf new file mode 100644 index 0000000..08b7857 Binary files /dev/null and b/documentation/Phase1_Phase2.pdf differ diff --git a/documentation/Phase1_Phase2.tex b/documentation/Phase1_Phase2.tex new file mode 100644 index 0000000..173d21e --- /dev/null +++ b/documentation/Phase1_Phase2.tex @@ -0,0 +1,348 @@ +\documentclass[a4paper,11pt]{article} + +\usepackage[utf8]{inputenc} +\usepackage[T1]{fontenc} +\usepackage{geometry} +\usepackage{amsmath} +\usepackage{amssymb} +\usepackage{listings} +\usepackage{xcolor} + + +\title{Phase 2 --- Kinematic-Constrained Multi-Camera Solver} +\author{} +\date{} + +\begin{document} + +\maketitle + +\section{Ziel} + +Nach Phase 1 existiert: + +\begin{itemize} + \item ein gemeinsames Weltkoordinatensystem + \item Kameraposen aller Kameras + \item bekannte absolute Markerpositionen + \item fusionierte Beobachtungen mehrerer Kameras +\end{itemize} + +Phase 2 erweitert das System um: + +\begin{itemize} + \item Rigid-Body Constraints + \item mechanische Zusammenhänge + \item Gelenke + \item relative Marker-Geometrien (\texttt{relPos}) + \item Stabilisierung bei wenigen sichtbaren Markern +\end{itemize} + +\section{Ausgangslage} + +Aktuell wird jeder Marker unabhängig behandelt. + +Das ist suboptimal, weil: + +\begin{itemize} + \item viele Marker nur kurz sichtbar sind + \item oft nur 1--2 Marker eines Bauteils sichtbar sind + \item solvePnP bei wenigen Markern instabil wird + \item Markerrauschen direkt in die Weltkoordinaten eingeht +\end{itemize} + +Mechanisch sind die Marker jedoch nicht unabhängig. + +Mehrere Marker gehören jeweils zu: + +\begin{itemize} + \item Arm1 + \item Arm2 + \item Joint1 + \item Base + \item Finger1 + \item Finger2 +\end{itemize} + +und bilden jeweils starre Körper (Rigid Bodies). + +\section{Grundidee} + +Statt einzelne Marker zu lösen: + +\begin{verbatim} +Marker -> Welt +\end{verbatim} + +wird gelöst: + +\begin{verbatim} +RigidBody -> Welt +\end{verbatim} + +und daraus: + +\begin{verbatim} +Marker = RigidBody * relTransform +\end{verbatim} + +\section{Vorteil} + +Schon ein einzelner sichtbarer Marker kann: + +\begin{itemize} + \item einen ganzen Körper stabilisieren + \item andere unsichtbare Marker indirekt bestimmen +\end{itemize} + +Beispiel: + +Wenn Marker 198 sichtbar ist +und Marker 229 relativ dazu bekannt ist, +dann kann Marker 229 geschätzt werden, +auch wenn er aktuell unsichtbar ist. + +\section{Erwartete Verbesserungen} + +\subsection{Stabilität} + +Deutlich stabilere Pose-Schätzung bei: + +\begin{itemize} + \item Motion Blur + \item wenigen sichtbaren Markern + \item schlechten Blickwinkeln + \item Teilverdeckungen +\end{itemize} + +\subsection{Konsistenz} + +Marker eines Bauteils bleiben: + +\begin{itemize} + \item geometrisch korrekt + \item starr + \item ohne unrealistische Verzerrungen +\end{itemize} + +\subsection{Multi-Camera-Verkettung} + +Kameras können indirekt gekoppelt werden. + +Beispiel: + +Cam1 sieht: +\begin{itemize} + \item Marker 1,2,3 +\end{itemize} + +Cam2 sieht: +\begin{itemize} + \item Marker 3,198 +\end{itemize} + +Cam3 sieht: +\begin{itemize} + \item Marker 198,229 +\end{itemize} + +Dadurch wird: +\begin{itemize} + \item Arm1 relativ zur Welt bestimmbar + \item obwohl keine einzelne Kamera alles sieht +\end{itemize} + +\section{Benötigte Daten} + +\subsection{Bereits vorhanden} + +\subsubsection{Absolute Marker} + +\begin{verbatim} +"position":[x,y,z] +\end{verbatim} + +für Board-Marker. + +\subsubsection{Relative Markerpositionen} + +\begin{verbatim} +"relPos":[x,y,z] +\end{verbatim} + +für Marker auf einem Rigid Body. + +\subsubsection{Body-Zuordnung} + +\begin{verbatim} +"on":"Arm1" +\end{verbatim} + +\subsubsection{Gelenke} + +\begin{verbatim} +"type":"revolute" +"axis":[1,0,0] +\end{verbatim} + +\section{Noch fehlende Daten} + +\subsection{Marker-Orientierung relativ zum Body} + +Aktuell existiert nur: + +\begin{verbatim} +"relPos" +\end{verbatim} + +Empfohlen wird zusätzlich: + +\begin{verbatim} +"relRot":[rx,ry,rz] +\end{verbatim} + +oder alternativ: + +\begin{verbatim} +"normal":[x,y,z] +"up":[x,y,z] +\end{verbatim} + +Denn Marker besitzen nicht nur Position, +sondern auch Orientierung. + +Das verbessert spätere Pose-Fits deutlich. + +\section{Geplante Solver-Strategie} + +\subsection{Phase 2A --- Rigid Body Fit} + +Zunächst: + +\begin{itemize} + \item pro Element einen starren Körper fitten + \item noch keine Gelenkoptimierung +\end{itemize} + +Beispiel: + +\begin{verbatim} +T_world_arm1 +\end{verbatim} + +wird geschätzt. + +Alle Marker von Arm1 folgen daraus automatisch. + +\subsection{Phase 2B --- Joint Constraints} + +Danach: + +\begin{itemize} + \item Gelenkachsen erzwingen + \item Rotationen einschränken + \item mechanische Grenzen verwenden +\end{itemize} + +Beispiel: + +\begin{verbatim} +Arm2 darf sich nur um Y drehen +\end{verbatim} + +\subsection{Phase 2C --- Global Optimization} + +Später optional: + +\begin{itemize} + \item vollständiges Bundle Adjustment + \item gleichzeitige Optimierung aller: + \begin{itemize} + \item Kameras + \item Marker + \item Bodies + \item Gelenkwinkel + \end{itemize} +\end{itemize} + +\section{Wichtige Architekturentscheidung} + +NICHT: + +\begin{verbatim} +Marker einzeln lösen +\end{verbatim} + +SONDERN: + +\begin{verbatim} +Bodies + Constraints lösen +\end{verbatim} + +Marker werden damit Beobachtungen, +nicht mehr primäre Zustände. + +\section{Geplante Datenstruktur} + +\subsection{Weltpose eines Körpers} + +\begin{verbatim} +{ + "body":"Arm1", + "worldPose":{ + "position":[x,y,z], + "rotationMatrix":[...] + } +} +\end{verbatim} + +\subsection{Relative Markerdefinition} + +\begin{verbatim} +{ + "id":198, + "on":"Arm1", + "relPos":[x,y,z], + "relRot":[rx,ry,rz] +} +\end{verbatim} + +\section{Phase 1 Reminder} + +Phase 1 bleibt weiterhin wichtig: + +\begin{itemize} + \item alle Kameras finden + \item alle Detection-Dateien laden + \item gemeinsame Marker fusionieren + \item Weltkoordinaten berechnen + \item Qualitätsmetriken speichern + \item auch schlechte / unvollständige Beobachtungen abspeichern +\end{itemize} + +Die Ergebnisse von Phase 1 dienen als Eingang für Phase 2. + +\section{Zielbild} + +Langfristig entsteht: + +\begin{itemize} + \item ein globales Robotermodell + \item mit mehreren Kameras + \item mehreren Rigid Bodies + \item Gelenken + \item Unsicherheiten + \item Qualitätsmetriken + \item temporaler Stabilisierung +\end{itemize} + +basierend auf: + +\begin{itemize} + \item ArUco-Beobachtungen + \item Mechanik + \item Kinematik + \item Multi-View-Geometrie +\end{itemize} + +\end{document} \ No newline at end of file diff --git a/programs/2_estimate_camera_pose_from_aruco_json.py b/programs/2_estimate_camera_pose_from_aruco_json.py index 553afb4..1eb2104 100644 --- a/programs/2_estimate_camera_pose_from_aruco_json.py +++ b/programs/2_estimate_camera_pose_from_aruco_json.py @@ -1,27 +1,26 @@ #!/usr/bin/env python3 """ -estimate_camera_pose_from_aruco_json.py +2_estimate_camera_pose_from_aruco_json.py Berechnet die Kameraposition im Maschinen-/Board-Koordinatensystem aus: 1. einer ArUco-Detections-JSON 2. robots.json mit bekannten Marker-Positionen -Das Script verwendet ausschließlich bekannte Marker -und bestimmt daraus die Kamera-Extrinsics mittels solvePnP. - -Ergebnis: -- Kamera-Position im Weltkoordinatensystem -- Kamera-Orientierung (Roll/Pitch/Yaw) -- optionale Reprojektion zur Qualitätskontrolle +NEU: +- Marker-Orientierungen unterstützt +- Default: Board-Marker zeigen nach +Z +- Qualitätsbewertung erweitert +- Speichert ALLE erkannten Marker +- Speichert auch fehlgeschlagene Lösungen +- Bewertet Kamerageometrie +- Bewertet Markerabdeckung +- Bewertet Sichtwinkel +- Bewertet Markeranzahl +- Speichert vollständige Rohdaten Benötigt: pip install opencv-python numpy - -Beispiel: - python 2_estimate_camera_pose_from_aruco_json.py \ - --detections detection.json \ - --robots robots.json """ import argparse @@ -37,31 +36,45 @@ import numpy as np # Hilfsfunktionen # ============================================================ +def normalize(v): + n = np.linalg.norm(v) + + if n < 1e-9: + return v + + return v / n + + +def rotation_matrix_from_axes(x_axis, y_axis, z_axis): + + R = np.column_stack([ + normalize(x_axis), + normalize(y_axis), + normalize(z_axis) + ]) + + return R.astype(np.float32) + + def rvec_tvec_to_camera_pose(rvec, tvec): """ - OpenCV liefert: + OpenCV: X_cam = R * X_world + t - Gesucht: - Kamera-Pose im Weltkoordinatensystem - - => R_wc = R^T - => C = -R^T * t + Kamera im Weltkoordinatensystem: + C = -R^T * t """ R_cw, _ = cv2.Rodrigues(rvec) R_wc = R_cw.T + cam_pos = -R_wc @ tvec return R_wc, cam_pos.reshape(3) def rotation_matrix_to_euler_zyx(R): - """ - Euler ZYX: - yaw(Z), pitch(Y), roll(X) - """ yaw = math.degrees(math.atan2(R[1, 0], R[0, 0])) @@ -74,14 +87,42 @@ def rotation_matrix_to_euler_zyx(R): return roll, pitch, yaw -def build_marker_lookup(robot_data): - """ - Liest nur Marker mit ABSOLUTER Position. +# ============================================================ +# Marker-Orientierung +# ============================================================ - Unterstützt: - "position" -> absolute Weltposition [m] - "relPos" -> wird aktuell ignoriert - """ +def get_marker_rotation(marker): + + # Explizite Rotation vorhanden? + if "rotation_matrix" in marker: + return np.array( + marker["rotation_matrix"], + dtype=np.float32 + ) + + # Default: + # Marker zeigt nach +Z + # + # x = rechts + # y = oben + # z = aus Board heraus (+Z) + + x_axis = np.array([1, 0, 0], dtype=np.float32) + y_axis = np.array([0, 1, 0], dtype=np.float32) + z_axis = np.array([0, 0, 1], dtype=np.float32) + + return rotation_matrix_from_axes( + x_axis, + y_axis, + z_axis + ) + + +# ============================================================ +# Marker Lookup +# ============================================================ + +def build_marker_lookup(robot_data): marker_lookup = {} @@ -89,11 +130,10 @@ def build_marker_lookup(robot_data): marker_id = int(marker.get("id", -1)) - # negative IDs ignorieren if marker_id < 0: continue - # Nur absolute Weltpositionen verwenden + # Nur absolute Marker verwenden if "position" not in marker: continue @@ -105,29 +145,32 @@ def build_marker_lookup(robot_data): if len(pos) != 3: continue - marker_lookup[marker_id] = np.array( - pos, - dtype=np.float32 - ) + rotation = get_marker_rotation(marker) + + marker_lookup[marker_id] = { + "position": np.array( + pos, + dtype=np.float32 + ), + "rotation": rotation, + "on": marker.get("on", "unknown") + } return marker_lookup -def build_marker_object_points(marker_center_world, marker_size_m): - """ - Baut die 3D-Eckpunkte eines Markers auf. - Wichtig: - Die Corner-Reihenfolge MUSS zur OpenCV-ArUco-Reihenfolge passen. +# ============================================================ +# Marker-Eckpunkte +# ============================================================ - Reihenfolge: - top-left - top-right - bottom-right - bottom-left - """ +def build_marker_object_points( + marker_center_world, + marker_rotation, + marker_size_m): half = marker_size_m / 2.0 + # OpenCV Corner Reihenfolge local = np.array([ [-half, half, 0.0], [ half, half, 0.0], @@ -135,7 +178,113 @@ def build_marker_object_points(marker_center_world, marker_size_m): [-half, -half, 0.0], ], dtype=np.float32) - return local + marker_center_world.reshape(1, 3) + rotated = (marker_rotation @ local.T).T + + return rotated + marker_center_world.reshape(1, 3) + + +# ============================================================ +# Qualitätsmetriken +# ============================================================ + +def compute_marker_spread(points_3d): + + if len(points_3d) < 2: + return 0.0 + + mins = np.min(points_3d, axis=0) + maxs = np.max(points_3d, axis=0) + + diag = np.linalg.norm(maxs - mins) + + return float(diag) + + +def compute_viewing_angles( + camera_position, + marker_lookup, + used_markers): + + results = [] + + for marker_id in used_markers: + + marker = marker_lookup[marker_id] + + pos = marker["position"] + + R = marker["rotation"] + + normal = R[:, 2] + + to_camera = normalize(camera_position - pos) + + dot = np.clip( + np.dot(normal, to_camera), + -1.0, + 1.0 + ) + + angle = math.degrees(math.acos(dot)) + + results.append(angle) + + if len(results) == 0: + return { + "mean": None, + "max": None + } + + return { + "mean": float(np.mean(results)), + "max": float(np.max(results)) + } + + +def compute_pose_quality( + rms, + max_err, + num_markers, + marker_spread, + viewing_angle_mean): + + score = 100.0 + + # Reprojection Error + score -= rms * 8.0 + + # Max Error + score -= max_err * 2.0 + + # Wenige Marker + if num_markers < 2: + score -= 50 + + elif num_markers < 4: + score -= 25 + + elif num_markers < 6: + score -= 10 + + # Schlechte räumliche Verteilung + if marker_spread < 0.10: + score -= 30 + + elif marker_spread < 0.25: + score -= 15 + + # Schlechter Blickwinkel + if viewing_angle_mean is not None: + + if viewing_angle_mean > 70: + score -= 25 + + elif viewing_angle_mean > 50: + score -= 10 + + score = max(0.0, min(100.0, score)) + + return float(score) # ============================================================ @@ -148,28 +297,24 @@ def main(): parser.add_argument( "--detections", - required=True, - help="ArUco detection JSON" + required=True ) parser.add_argument( "--robots", - required=True, - help="robots.json" + required=True ) parser.add_argument( "--min-confidence", type=float, - default=0.5, - help="Minimale Marker-Confidence" + default=0.5 ) parser.add_argument( "--max-reprojection-error", type=float, - default=3.0, - help="Maximal erlaubter Reprojektionsfehler in Pixel" + default=3.0 ) args = parser.parse_args() @@ -185,7 +330,7 @@ def main(): robot_data = json.load(f) # ============================================================ - # Kamera-Parameter + # Kamera # ============================================================ K = np.array( @@ -199,19 +344,20 @@ def main(): ).reshape(-1, 1) # ============================================================ - # Bekannte Marker + # Marker laden # ============================================================ known_markers = build_marker_lookup(robot_data) # ============================================================ - # 2D/3D Punktlisten aufbauen + # Punktlisten # ============================================================ object_points = [] image_points = [] used_markers = [] + rejected_markers = [] detections = detection_data["detections"] @@ -219,20 +365,41 @@ def main(): marker_id = int(det["marker_id"]) - confidence = float(det.get("confidence", 1.0)) + confidence = float( + det.get("confidence", 1.0) + ) + + reason = None if confidence < args.min_confidence: + reason = "low_confidence" + + elif marker_id not in known_markers: + reason = "unknown_marker" + + if reason is not None: + + rejected_markers.append({ + "marker_id": marker_id, + "reason": reason, + "confidence": confidence + }) + continue - if marker_id not in known_markers: - continue + marker_info = known_markers[marker_id] - marker_center_world = known_markers[marker_id] + marker_center_world = marker_info["position"] - marker_size = float(det["marker_size_m"]) + marker_rotation = marker_info["rotation"] + + marker_size = float( + det["marker_size_m"] + ) obj_pts = build_marker_object_points( marker_center_world, + marker_rotation, marker_size ) @@ -246,25 +413,60 @@ def main(): used_markers.append(marker_id) + # ============================================================ + # Ausgabe vorbereiten + # ============================================================ + + out = { + "success": False, + "camera_pose": None, + "quality": {}, + "used_markers": [], + "rejected_markers": rejected_markers, + "all_detected_markers": [ + int(d["marker_id"]) + for d in detections + ] + } + + # ============================================================ + # Zu wenige Marker + # ============================================================ + if len(object_points) == 0: - raise RuntimeError( - "Keine bekannten Marker gefunden." + + out["quality"] = { + "error": "no_known_markers", + "num_detected_markers": len(detections), + "num_used_markers": 0 + } + + out_file = Path(args.detections).with_suffix( + ".camera_pose.json" ) - object_points = np.concatenate(object_points, axis=0) - image_points = np.concatenate(image_points, axis=0) + with open(out_file, "w", encoding="utf-8") as f: + json.dump(out, f, indent=2) - print() - print("==================================================") - print("Bekannte Marker verwendet:") - print(sorted(set(used_markers))) - print("==================================================") - print() + print("Keine bekannten Marker gefunden.") + print(out_file) + + return # ============================================================ # solvePnP # ============================================================ + object_points = np.concatenate( + object_points, + axis=0 + ) + + image_points = np.concatenate( + image_points, + axis=0 + ) + success, rvec, tvec = cv2.solvePnP( object_points, image_points, @@ -274,10 +476,24 @@ def main(): ) if not success: - raise RuntimeError("solvePnP fehlgeschlagen") + + out["quality"] = { + "error": "solvepnp_failed", + "num_used_markers": len(set(used_markers)) + } + + out_file = Path(args.detections).with_suffix( + ".camera_pose.json" + ) + + with open(out_file, "w", encoding="utf-8") as f: + json.dump(out, f, indent=2) + + print("solvePnP fehlgeschlagen") + return # ============================================================ - # Kamera-Pose berechnen + # Kamera Pose # ============================================================ R_wc, cam_pos = rvec_tvec_to_camera_pose( @@ -285,10 +501,12 @@ def main(): tvec ) - roll, pitch, yaw = rotation_matrix_to_euler_zyx(R_wc) + roll, pitch, yaw = rotation_matrix_to_euler_zyx( + R_wc + ) # ============================================================ - # Reprojektionsfehler + # Reprojektion # ============================================================ projected, _ = cv2.projectPoints( @@ -306,76 +524,126 @@ def main(): axis=1 ) - rms = np.sqrt(np.mean(reproj_error ** 2)) - max_err = np.max(reproj_error) + rms = float( + np.sqrt(np.mean(reproj_error ** 2)) + ) + + max_err = float( + np.max(reproj_error) + ) + + # ============================================================ + # Qualität + # ============================================================ + + marker_positions = np.array([ + known_markers[mid]["position"] + for mid in set(used_markers) + ]) + + marker_spread = compute_marker_spread( + marker_positions + ) + + viewing = compute_viewing_angles( + cam_pos, + known_markers, + set(used_markers) + ) + + quality_score = compute_pose_quality( + rms, + max_err, + len(set(used_markers)), + marker_spread, + viewing["mean"] + ) # ============================================================ # Ausgabe # ============================================================ + print() print("==================================================") print("KAMERA-POSE") print("==================================================") - print() + print() print("Position [m]") - print(f" X = {cam_pos[0]: .6f}") - print(f" Y = {cam_pos[1]: .6f}") - print(f" Z = {cam_pos[2]: .6f}") + print(f"X = {cam_pos[0]:.6f}") + print(f"Y = {cam_pos[1]:.6f}") + print(f"Z = {cam_pos[2]:.6f}") print() - print("Orientation [deg]") - print(f" Roll = {roll: .3f}") - print(f" Pitch = {pitch: .3f}") - print(f" Yaw = {yaw: .3f}") + print(f"Roll = {roll:.3f}") + print(f"Pitch = {pitch:.3f}") + print(f"Yaw = {yaw:.3f}") print() - - print("Reprojection Error") - print(f" RMS = {rms:.3f} px") - print(f" MAX = {max_err:.3f} px") + print("Reprojection") + print(f"RMS = {rms:.3f}px") + print(f"MAX = {max_err:.3f}px") print() + print("Quality") + print(f"Score = {quality_score:.1f}/100") + print(f"Marker Spread = {marker_spread:.3f}m") - if max_err > args.max_reprojection_error: - print("[WARNUNG] Reprojektionsfehler relativ hoch") - else: - print("[OK] Pose stabil") - - print() + if viewing["mean"] is not None: + print( + f"Mean Viewing Angle = " + f"{viewing['mean']:.1f}deg" + ) # ============================================================ # JSON speichern # ============================================================ - out = { - "camera_pose": { - "position_m": { - "x": float(cam_pos[0]), - "y": float(cam_pos[1]), - "z": float(cam_pos[2]), - }, - "orientation_deg": { - "roll": float(roll), - "pitch": float(pitch), - "yaw": float(yaw), - } + out["success"] = True + + out["camera_pose"] = { + "position_m": { + "x": float(cam_pos[0]), + "y": float(cam_pos[1]), + "z": float(cam_pos[2]), }, - "quality": { - "reprojection_rms_px": float(rms), - "reprojection_max_px": float(max_err), - "num_markers_used": len(set(used_markers)), - "markers_used": sorted(set(used_markers)) - } + "orientation_deg": { + "roll": float(roll), + "pitch": float(pitch), + "yaw": float(yaw), + }, + "rotation_matrix_world_from_camera": ( + R_wc.tolist() + ) } - out_file = Path(args.detections).with_suffix(".camera_pose.json") + out["quality"] = { + "pose_quality_score": quality_score, + "reprojection_rms_px": rms, + "reprojection_max_px": max_err, + "num_detected_markers": len(detections), + "num_used_markers": len(set(used_markers)), + "marker_spread_m": marker_spread, + "mean_viewing_angle_deg": viewing["mean"], + "max_viewing_angle_deg": viewing["max"], + "pose_stable": + max_err <= args.max_reprojection_error + } + + out["used_markers"] = sorted( + list(set(used_markers)) + ) + + out_file = Path(args.detections).with_suffix( + ".camera_pose.json" + ) with open(out_file, "w", encoding="utf-8") as f: json.dump(out, f, indent=2) - print(f"Pose gespeichert in:") + print() + print("Gespeichert:") print(out_file) diff --git a/programs/3_fuse_markers_world.py b/programs/3_fuse_markers_world.py new file mode 100644 index 0000000..538a799 --- /dev/null +++ b/programs/3_fuse_markers_world.py @@ -0,0 +1,765 @@ +#!/usr/bin/env python3 +""" +3_fuse_markers_world.py + +PHASE 1B +--------- + +Fusioniert Marker-Weltkoordinaten aus mehreren Kameras. + +EINGABE: + --json *.camera_pose.json (mehrfach möglich) + --robots robot.json + +Das Script findet automatisch: + *.aruco_detection.json + +Beispiel: + snapshot_video0_1779690911822_aruco_detection.camera_pose.json + +-> + + snapshot_video0_1779690911822_aruco_detection.json + +FEATURES: + - mehrere Kameras (2..5) + - automatische Detection-Datei-Erkennung + - bekannte Marker aus robot.json + - unbekannte Marker triangulieren + - gewichtete Marker-Fusion + - Qualitätsmetriken + - CSV Export + - JSON Export + - Kamera Export + - robuste Fehlerbehandlung + - vorbereitet für spätere Rigid-Body Constraints + +OUTPUT: + fused_markers.csv + fused_markers.json + +Benötigt: + pip install opencv-python numpy +""" + +import argparse +import csv +import json +import math +from collections import defaultdict +from pathlib import Path + +import cv2 +import numpy as np + + +# ============================================================ +# JSON HELPERS +# ============================================================ + + +def load_json(path): + + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + + +def save_json(path, data): + + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + + +# ============================================================ +# FILE MATCHING +# ============================================================ + + +def find_detection_json(camera_pose_json_path): + """ + Findet automatisch die passende + *_aruco_detection.json Datei. + + Beispiel: + + INPUT: + snapshot_video0_123_aruco_detection.camera_pose.json + + OUTPUT: + snapshot_video0_123_aruco_detection.json + """ + + pose_path = Path(camera_pose_json_path) + + name = pose_path.name + + if not name.endswith(".camera_pose.json"): + raise ValueError( + f"Expected .camera_pose.json: {pose_path}" + ) + + detection_name = name.replace( + ".camera_pose.json", + ".json" + ) + + detection_path = pose_path.with_name( + detection_name + ) + + if not detection_path.exists(): + + raise FileNotFoundError( + "Matching detection JSON not found:\n" + f"{detection_path}" + ) + + return detection_path + + +# ============================================================ +# CAMERA POSE +# ============================================================ + + +def extract_camera_pose(camera_pose_data): + + if "camera_pose" not in camera_pose_data: + raise ValueError("camera_pose missing") + + pose = camera_pose_data["camera_pose"] + + pos = pose["position_m"] + + cam_pos = np.array([ + pos["x"], + pos["y"], + pos["z"] + ], dtype=np.float32) + + if "rotation_matrix_world_from_camera" in pose: + + R_wc = np.array( + pose["rotation_matrix_world_from_camera"], + dtype=np.float32 + ) + + else: + + ori = pose["orientation_deg"] + + R_wc = euler_to_rotation_matrix( + ori["roll"], + ori["pitch"], + ori["yaw"] + ) + + return R_wc, cam_pos + + +# ============================================================ +# INTRINSICS +# ============================================================ + + +def extract_intrinsics(detection_data): + + if "camera" not in detection_data: + raise ValueError("camera section missing") + + camera = detection_data["camera"] + + if "camera_matrix" not in camera: + raise ValueError("camera_matrix missing") + + if "distortion_coefficients" not in camera: + raise ValueError("distortion_coefficients missing") + + K = np.array( + camera["camera_matrix"], + dtype=np.float32 + ) + + D = np.array( + camera["distortion_coefficients"], + dtype=np.float32 + ).reshape(-1, 1) + + return K, D + + +# ============================================================ +# ROTATION HELPERS +# ============================================================ + + +def euler_to_rotation_matrix( + roll_deg, + pitch_deg, + yaw_deg +): + + r = math.radians(roll_deg) + p = math.radians(pitch_deg) + y = math.radians(yaw_deg) + + Rx = np.array([ + [1, 0, 0], + [0, math.cos(r), -math.sin(r)], + [0, math.sin(r), math.cos(r)] + ]) + + Ry = np.array([ + [math.cos(p), 0, math.sin(p)], + [0, 1, 0], + [-math.sin(p), 0, math.cos(p)] + ]) + + Rz = np.array([ + [math.cos(y), -math.sin(y), 0], + [math.sin(y), math.cos(y), 0], + [0, 0, 1] + ]) + + return Rz @ Ry @ Rx + + +# ============================================================ +# ROBOT MARKERS +# ============================================================ + + +def build_known_marker_lookup(robot_data): + """ + Nur Marker mit ABSOLUTER Weltposition. + + relPos wird in Phase 2 verwendet. + """ + + lookup = {} + + for marker in robot_data.get("Marker", []): + + marker_id = int(marker.get("id", -1)) + + if marker_id < 0: + continue + + if "position" not in marker: + continue + + pos = marker["position"] + + if pos is None: + continue + + if len(pos) != 3: + continue + + lookup[marker_id] = np.array( + pos, + dtype=np.float32 + ) + + return lookup + + +# ============================================================ +# MARKER POSE REL CAMERA +# ============================================================ + + +def estimate_marker_pose_camera( + image_points, + marker_size, + K, + D +): + + half = marker_size / 2.0 + + object_points = np.array([ + [-half, half, 0], + [half, half, 0], + [half, -half, 0], + [-half, -half, 0] + ], dtype=np.float32) + + image_points = np.array( + image_points, + dtype=np.float32 + ) + + success, rvec, tvec = cv2.solvePnP( + object_points, + image_points, + K, + D, + flags=cv2.SOLVEPNP_IPPE_SQUARE + ) + + if not success: + return None + + R_mc, _ = cv2.Rodrigues(rvec) + + return { + "rvec": rvec, + "tvec": tvec.reshape(3), + "R_mc": R_mc + } + + +# ============================================================ +# MARKER WORLD TRANSFORM +# ============================================================ + + +def marker_world_position( + cam_world_pos, + R_wc, + t_mc +): + """ + Marker Mittelpunkt in Weltkoordinaten. + + X_world = R_wc * X_cam + C + """ + + return ( + R_wc @ t_mc.reshape(3) + ) + cam_world_pos + + +# ============================================================ +# WEIGHTING +# ============================================================ + + +def compute_marker_weight( + detection, + camera_pose_data +): + """ + Qualitätsgewicht. + + Verwendet: + - confidence + - reprojection RMS + - Bildzentrum + - Markerfläche + - Sharpness + """ + + confidence = float( + detection.get("confidence", 0.5) + ) + + quality = detection.get("quality", {}) + + area_px = float( + quality.get("area_px", 1000) + ) + + sharpness = quality.get( + "sharpness", {} + ) + + lap_var = float( + sharpness.get( + "laplacian_var", + 500 + ) + ) + + geometry = quality.get( + "geometry", {} + ) + + dist_center = float( + geometry.get( + "distance_to_center_norm", + 0.5 + ) + ) + + pose_quality = camera_pose_data.get( + "quality", + {} + ) + + reproj = float( + pose_quality.get( + "reprojection_rms_px", + 10.0 + ) + ) + + reproj_weight = 1.0 / (1.0 + reproj) + + area_weight = min( + area_px / 2000.0, + 1.0 + ) + + sharpness_weight = min( + lap_var / 5000.0, + 1.0 + ) + + center_weight = 1.0 - dist_center + + weight = ( + confidence * + reproj_weight * + area_weight * + sharpness_weight * + center_weight + ) + + return max(weight, 1e-6) + + +# ============================================================ +# FUSION +# ============================================================ + + +def weighted_average(points, weights): + + points = np.array(points) + weights = np.array(weights) + + if len(points) == 1: + return points[0] + + total_weight = np.sum(weights) + + if total_weight < 1e-9: + return np.mean(points, axis=0) + + return np.sum( + points * weights[:, None], + axis=0 + ) / total_weight + + +# ============================================================ +# MAIN +# ============================================================ + + +def main(): + + parser = argparse.ArgumentParser() + + parser.add_argument( + "--json", + action="append", + required=True, + help="*.camera_pose.json" + ) + + parser.add_argument( + "--robots", + required=True, + help="robot.json" + ) + + parser.add_argument( + "--outdir", + default="." + ) + + args = parser.parse_args() + + outdir = Path(args.outdir) + + outdir.mkdir( + parents=True, + exist_ok=True + ) + + # ======================================================== + # robot.json + # ======================================================== + + robot_data = load_json( + args.robots + ) + + known_markers = build_known_marker_lookup( + robot_data + ) + + # ======================================================== + # globale Observationen + # ======================================================== + + observations = defaultdict(list) + + camera_exports = [] + + # ======================================================== + # alle Kameras + # ======================================================== + + for json_file in args.json: + + print() + print("================================================") + print("LOAD CAMERA") + print("================================================") + print(json_file) + + # ---------------------------------------------------- + # camera pose json + # ---------------------------------------------------- + + camera_pose_data = load_json( + json_file + ) + + # ---------------------------------------------------- + # detection json automatisch finden + # ---------------------------------------------------- + + detection_json = find_detection_json( + json_file + ) + + print( + f"Detection JSON:\n{detection_json}" + ) + + detection_data = load_json( + detection_json + ) + + # ---------------------------------------------------- + # intrinsics + # ---------------------------------------------------- + + K, D = extract_intrinsics( + detection_data + ) + + # ---------------------------------------------------- + # kamerapose + # ---------------------------------------------------- + + R_wc, cam_world_pos = extract_camera_pose( + camera_pose_data + ) + + camera_name = Path( + json_file + ).stem + + # ---------------------------------------------------- + # camera export + # ---------------------------------------------------- + + camera_exports.append({ + "camera": camera_name, + "x": float(cam_world_pos[0]), + "y": float(cam_world_pos[1]), + "z": float(cam_world_pos[2]) + }) + + # ---------------------------------------------------- + # detections + # ---------------------------------------------------- + + detections = detection_data.get( + "detections", + [] + ) + + print( + f"Detections: {len(detections)}" + ) + + # ---------------------------------------------------- + # marker durchlaufen + # ---------------------------------------------------- + + for det in detections: + + marker_id = int( + det["marker_id"] + ) + + marker_size = float( + det["marker_size_m"] + ) + + pose = estimate_marker_pose_camera( + det["image_points_px"], + marker_size, + K, + D + ) + + if pose is None: + continue + + world_pos = marker_world_position( + cam_world_pos, + R_wc, + pose["tvec"] + ) + + weight = compute_marker_weight( + det, + camera_pose_data + ) + + observations[marker_id].append({ + "world_pos": world_pos, + "weight": weight, + "camera": camera_name, + "confidence": float( + det.get("confidence", 0.5) + ), + "known_marker": marker_id in known_markers + }) + + # ======================================================== + # fusion + # ======================================================== + + fused_markers = [] + + print() + print("================================================") + print("FUSE MARKERS") + print("================================================") + + for marker_id, obs_list in observations.items(): + + points = [ + o["world_pos"] + for o in obs_list + ] + + weights = [ + o["weight"] + for o in obs_list + ] + + fused = weighted_average( + points, + weights + ) + + spread = 0.0 + + if len(points) > 1: + + dists = [ + np.linalg.norm(p - fused) + for p in points + ] + + spread = float( + np.mean(dists) + ) + + known = marker_id in known_markers + + mean_conf = float(np.mean([ + o["confidence"] + for o in obs_list + ])) + + mean_weight = float(np.mean(weights)) + + print( + f"Marker {marker_id:3d} | " + f"cams={len(obs_list)} | " + f"spread={spread:.4f}m | " + f"known={known}" + ) + + fused_markers.append({ + "marker_id": marker_id, + "x": float(fused[0]), + "y": float(fused[1]), + "z": float(fused[2]), + "num_cameras": len(obs_list), + "spread_m": spread, + "known_marker": known, + "mean_confidence": mean_conf, + "mean_weight": mean_weight + }) + + # ======================================================== + # CSV EXPORT + # ======================================================== + + csv_file = outdir / "fused_markers.csv" + + with open( + csv_file, + "w", + newline="", + encoding="utf-8" + ) as f: + + writer = csv.DictWriter( + f, + fieldnames=[ + "marker_id", + "x", + "y", + "z", + "num_cameras", + "spread_m", + "known_marker", + "mean_confidence", + "mean_weight" + ] + ) + + writer.writeheader() + + for row in fused_markers: + writer.writerow(row) + + # ======================================================== + # JSON EXPORT + # ======================================================== + + export_json = { + "fused_markers": fused_markers, + "cameras": camera_exports + } + + json_file = outdir / "fused_markers.json" + + save_json( + json_file, + export_json + ) + + # ======================================================== + # DONE + # ======================================================== + + print() + print("================================================") + print("EXPORT") + print("================================================") + print(csv_file) + print(json_file) + print() + + +# ============================================================ +# ENTRY +# ============================================================ + +if __name__ == "__main__": + main() diff --git a/programs/3_fuse_markers_world____.py b/programs/3_fuse_markers_world____.py new file mode 100644 index 0000000..f819099 --- /dev/null +++ b/programs/3_fuse_markers_world____.py @@ -0,0 +1,1135 @@ +#!/usr/bin/env python3 +""" +3_fuse_markers_world.py + +Phase 1: +- Liest 2 bis 5 JSON-Dateien ein, die jeweils eine Kamera-Pose und ArUco-Detections enthalten. +- Rekonstruiert für jede Marker-Beobachtung eine Marker-Pose in Weltkoordinaten. +- Fusiert alle Beobachtungen pro Marker gewichtet. +- Gibt eine CSV-Liste mit Kameras und Markern aus. +- Optional wird robots.json nur für Metadaten verwendet (nicht für die Schätzung). + +Typische Eingabe-Dateien: + snapshot_video1_1779690911822_aruco_detection.camera_pose.json + snapshot_video2_1779690911822_aruco_detection.camera_pose.json + ... + + python 3_fuse_markers_world.py \ + --json snapshot_video1_1779690911822_aruco_detection.camera_pose.json \ + --json snapshot_video2_1779690911822_aruco_detection.camera_pose.json \ + --json snapshot_video3_1779690911822_aruco_detection.camera_pose.json \ + --robots robots.json + + +Benötigt: + pip install numpy opencv-python + +Hinweis: +- Das Skript nutzt die Kameraposen als fest. +- Die Markerkoordinaten werden pro Detektion aus SolvePnP bestimmt. +- Danach werden alle Beobachtungen eines Markers robust gemittelt. +""" + +from __future__ import annotations + +import argparse +import csv +import json +import math +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional, Sequence, Tuple + +import cv2 +import numpy as np + + +# ============================================================ +# Datenklassen +# ============================================================ + +@dataclass +class CameraPose: + camera_id: str + source_file: str + position_w: np.ndarray # (3,) + rotation_w_from_c: np.ndarray # (3,3) + orientation_rpy_deg: Tuple[float, float, float] + quality: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class MarkerObservation: + marker_id: int + camera_id: str + source_file: str + image_points_px: np.ndarray # (4,2) + marker_size_m: float + confidence: float + quality_score: float + camera_weight: float + observation_weight: float + rvec_cm: np.ndarray # (3,) + tvec_cm: np.ndarray # (3,) + position_w: np.ndarray # (3,) + rotation_w_from_m: np.ndarray # (3,3) + reprojection_rms_px: float + reprojection_max_px: float + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class FusedMarkerEstimate: + marker_id: int + position_w: np.ndarray + rotation_w_from_m: np.ndarray + orientation_rpy_deg: Tuple[float, float, float] + observations: List[MarkerObservation] + num_cameras: int + camera_ids: List[str] + weight_sum: float + position_std_m: float + angular_std_deg: float + confidence_mean: float + confidence_min: float + confidence_max: float + quality_mean: float + quality_min: float + quality_max: float + reproj_rms_mean_px: float + reproj_rms_max_px: float + reliability_0_1: float + metadata: Dict[str, Any] = field(default_factory=dict) + + +# ============================================================ +# Basisfunktionen: Rotationen / Euler / Quaternions +# ============================================================ + + +def clamp(value: float, lo: float = 0.0, hi: float = 1.0) -> float: + return float(max(lo, min(hi, value))) + + + +def normalize_vec(v: np.ndarray, eps: float = 1e-12) -> np.ndarray: + n = float(np.linalg.norm(v)) + if n < eps: + return v.astype(np.float64, copy=True) + return (v / n).astype(np.float64) + + + +def rotation_matrix_to_euler_zyx(R: np.ndarray) -> Tuple[float, float, float]: + """Returns roll, pitch, yaw in degrees. + + Convention: + R = Rz(yaw) @ Ry(pitch) @ Rx(roll) + """ + yaw = math.degrees(math.atan2(R[1, 0], R[0, 0])) + sp = math.sqrt(R[2, 1] ** 2 + R[2, 2] ** 2) + pitch = math.degrees(math.atan2(-R[2, 0], sp)) + roll = math.degrees(math.atan2(R[2, 1], R[2, 2])) + return float(roll), float(pitch), float(yaw) + + + +def euler_zyx_to_rotation_matrix(roll_deg: float, pitch_deg: float, yaw_deg: float) -> np.ndarray: + roll = math.radians(roll_deg) + pitch = math.radians(pitch_deg) + yaw = math.radians(yaw_deg) + + cr, sr = math.cos(roll), math.sin(roll) + cp, sp = math.cos(pitch), math.sin(pitch) + cy, sy = math.cos(yaw), math.sin(yaw) + + Rx = np.array([[1, 0, 0], [0, cr, -sr], [0, sr, cr]], dtype=np.float64) + Ry = np.array([[cp, 0, sp], [0, 1, 0], [-sp, 0, cp]], dtype=np.float64) + Rz = np.array([[cy, -sy, 0], [sy, cy, 0], [0, 0, 1]], dtype=np.float64) + + return (Rz @ Ry @ Rx).astype(np.float64) + + + +def rotation_matrix_to_quaternion(R: np.ndarray) -> np.ndarray: + """Returns quaternion as [w, x, y, z].""" + m = R.astype(np.float64) + t = np.trace(m) + + if t > 0.0: + s = math.sqrt(t + 1.0) * 2.0 + w = 0.25 * s + x = (m[2, 1] - m[1, 2]) / s + y = (m[0, 2] - m[2, 0]) / s + z = (m[1, 0] - m[0, 1]) / s + elif m[0, 0] > m[1, 1] and m[0, 0] > m[2, 2]: + s = math.sqrt(1.0 + m[0, 0] - m[1, 1] - m[2, 2]) * 2.0 + w = (m[2, 1] - m[1, 2]) / s + x = 0.25 * s + y = (m[0, 1] + m[1, 0]) / s + z = (m[0, 2] + m[2, 0]) / s + elif m[1, 1] > m[2, 2]: + s = math.sqrt(1.0 + m[1, 1] - m[0, 0] - m[2, 2]) * 2.0 + w = (m[0, 2] - m[2, 0]) / s + x = (m[0, 1] + m[1, 0]) / s + y = 0.25 * s + z = (m[1, 2] + m[2, 1]) / s + else: + s = math.sqrt(1.0 + m[2, 2] - m[0, 0] - m[1, 1]) * 2.0 + w = (m[1, 0] - m[0, 1]) / s + x = (m[0, 2] + m[2, 0]) / s + y = (m[1, 2] + m[2, 1]) / s + z = 0.25 * s + + q = np.array([w, x, y, z], dtype=np.float64) + return normalize_vec(q) + + + +def quaternion_to_rotation_matrix(q: np.ndarray) -> np.ndarray: + q = normalize_vec(q) + w, x, y, z = q + + R = np.array([ + [1 - 2 * (y * y + z * z), 2 * (x * y - z * w), 2 * (x * z + y * w)], + [2 * (x * y + z * w), 1 - 2 * (x * x + z * z), 2 * (y * z - x * w)], + [2 * (x * z - y * w), 2 * (y * z + x * w), 1 - 2 * (x * x + y * y)], + ], dtype=np.float64) + return R + + + +def average_rotations_weighted(rotations: Sequence[np.ndarray], weights: Sequence[float]) -> np.ndarray: + """Averages rotations by weighted matrix sum + projection to SO(3).""" + if not rotations: + return np.eye(3, dtype=np.float64) + + M = np.zeros((3, 3), dtype=np.float64) + for R, w in zip(rotations, weights): + M += float(w) * R.astype(np.float64) + + U, _, Vt = np.linalg.svd(M) + R = U @ Vt + if np.linalg.det(R) < 0: + U[:, -1] *= -1 + R = U @ Vt + return R.astype(np.float64) + + + +def angular_distance_deg(R_a: np.ndarray, R_b: np.ndarray) -> float: + """Smallest angle between two rotations.""" + R = R_a.T @ R_b + tr = float(np.trace(R)) + cos_theta = clamp((tr - 1.0) / 2.0, -1.0, 1.0) + return float(math.degrees(math.acos(cos_theta))) + + +# ============================================================ +# JSON / Metadaten +# ============================================================ + + +def load_json(path: str) -> Dict[str, Any]: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + + +def detect_camera_id(data: Dict[str, Any], source_file: str) -> str: + camera = data.get("camera", {}) + camera_pose = data.get("camera_pose", {}) + + for key in ("camera_id", "id", "name"): + if isinstance(camera, dict) and camera.get(key): + return str(camera[key]) + if isinstance(camera_pose, dict) and camera_pose.get(key): + return str(camera_pose[key]) + + return Path(source_file).stem + + + +def extract_camera_pose(data: Dict[str, Any], source_file: str) -> CameraPose: + camera_id = detect_camera_id(data, source_file) + + # Camera pose may appear in different layouts. + pose_block = data.get("camera_pose") + if pose_block is None: + pose_block = data.get("camera", {}) + + if not isinstance(pose_block, dict): + raise ValueError(f"Camera pose block missing or invalid in: {source_file}") + + quality = pose_block.get("quality", {}) if isinstance(pose_block.get("quality", {}), dict) else {} + + # Position + pos = None + if isinstance(pose_block.get("position_m"), dict): + pm = pose_block["position_m"] + pos = np.array([pm.get("x", 0.0), pm.get("y", 0.0), pm.get("z", 0.0)], dtype=np.float64) + elif isinstance(pose_block.get("position_mm"), (list, tuple)) and len(pose_block["position_mm"]) == 3: + pos = np.array(pose_block["position_mm"], dtype=np.float64) / 1000.0 + elif isinstance(pose_block.get("position"), (list, tuple)) and len(pose_block["position"]) == 3: + pos = np.array(pose_block["position"], dtype=np.float64) + + if pos is None: + raise ValueError(f"Camera position missing in: {source_file}") + + # Rotation + if isinstance(pose_block.get("rotation_matrix_world_from_camera"), (list, tuple)): + R_wc = np.array(pose_block["rotation_matrix_world_from_camera"], dtype=np.float64) + elif isinstance(pose_block.get("rotation_matrix"), (list, tuple)): + R_wc = np.array(pose_block["rotation_matrix"], dtype=np.float64) + elif isinstance(pose_block.get("orientation_deg"), dict): + od = pose_block["orientation_deg"] + roll = float(od.get("roll", 0.0)) + pitch = float(od.get("pitch", 0.0)) + yaw = float(od.get("yaw", 0.0)) + R_wc = euler_zyx_to_rotation_matrix(roll, pitch, yaw) + else: + R_wc = np.eye(3, dtype=np.float64) + + if R_wc.shape != (3, 3): + raise ValueError(f"Invalid camera rotation matrix in: {source_file}") + + rpy = rotation_matrix_to_euler_zyx(R_wc) + + return CameraPose( + camera_id=camera_id, + source_file=source_file, + position_w=pos, + rotation_w_from_c=R_wc, + orientation_rpy_deg=rpy, + quality=quality, + ) + + + +def load_robot_metadata(robot_path: Optional[str]) -> Dict[int, List[Dict[str, Any]]]: + """Optional metadata only. Duplicate IDs are kept as a list.""" + if not robot_path: + return {} + + data = load_json(robot_path) + marker_meta: Dict[int, List[Dict[str, Any]]] = {} + + for entry in data.get("Marker", []): + try: + mid = int(entry.get("id")) + except Exception: + continue + + marker_meta.setdefault(mid, []).append({ + "on": entry.get("on"), + "position": entry.get("position"), + "relPos": entry.get("relPos"), + "name": entry.get("name"), + "relPosSource": entry.get("relPosSource"), + }) + + return marker_meta + + + +def summarize_robot_metadata(marker_id: int, robot_meta: Dict[int, List[Dict[str, Any]]]) -> Dict[str, Any]: + entries = robot_meta.get(marker_id, []) + if not entries: + return { + "robot_known": False, + "robot_on": None, + "robot_positions": None, + "robot_relpos_count": 0, + } + + on_list = [] + abs_pos_list = [] + rel_pos_list = [] + for e in entries: + if e.get("on") is not None: + on_list.append(str(e.get("on"))) + if e.get("position") is not None: + abs_pos_list.append(e.get("position")) + if e.get("relPos") is not None: + rel_pos_list.append(e.get("relPos")) + + return { + "robot_known": True, + "robot_on": "|".join(dict.fromkeys(on_list)) if on_list else None, + "robot_positions": abs_pos_list if abs_pos_list else None, + "robot_relpos_count": len(rel_pos_list), + } + + +# ============================================================ +# Detection / Gewichtung +# ============================================================ + + +def get_camera_quality_score(camera_pose: CameraPose) -> float: + q = camera_pose.quality or {} + + if "pose_quality_score" in q: + try: + return clamp(float(q["pose_quality_score"]) / 100.0) + except Exception: + pass + + if "reprojection_rms_px" in q: + try: + rms = float(q["reprojection_rms_px"]) + # Heuristik: 0 px -> 1.0, 5 px -> 0.5, 10 px -> ~0.33 + return clamp(1.0 / (1.0 + 0.2 * rms)) + except Exception: + pass + + return 1.0 + + + +def get_marker_size_from_detection(det: Dict[str, Any], default_size_m: float) -> float: + try: + return float(det.get("marker_size_m", default_size_m)) + except Exception: + return float(default_size_m) + + + +def score_from_range(value: Optional[float], low: float, high: float, invert: bool = False) -> float: + if value is None: + return 1.0 + if high <= low: + return 1.0 + t = (float(value) - low) / (high - low) + t = clamp(t) + return 1.0 - t if invert else t + + + +def detection_quality_score(det: Dict[str, Any]) -> float: + """Heuristischer Score in [0,1]. Falls Felder fehlen, wird 1.0 angenommen.""" + confidence = float(det.get("confidence", 1.0)) + quality = det.get("quality", {}) if isinstance(det.get("quality", {}), dict) else {} + + # Subscores + area_px = quality.get("area_px") + sharpness = (((quality.get("sharpness") or {}).get("laplacian_var")) if isinstance(quality.get("sharpness"), dict) else None) + contrast = (((quality.get("contrast") or {}).get("dynamic_range")) if isinstance(quality.get("contrast"), dict) else None) + dist_border = (((quality.get("geometry") or {}).get("distance_to_border_px")) if isinstance(quality.get("geometry"), dict) else None) + dist_center = (((quality.get("geometry") or {}).get("distance_to_center_norm")) if isinstance(quality.get("geometry"), dict) else None) + edge_ratio = quality.get("edge_ratio") + + area_score = score_from_range(area_px, 150.0, 3000.0) + sharp_score = score_from_range(sharpness, 500.0, 5000.0) + contrast_score = score_from_range(contrast, 40.0, 180.0) + border_score = score_from_range(dist_border, 20.0, 250.0) + center_score = score_from_range(dist_center, 0.75, 0.0, invert=True) # closer to center -> higher + + edge_ratio_score = 1.0 + if edge_ratio is not None: + try: + edge_ratio_score = clamp(1.0 - min(abs(float(edge_ratio) - 1.0) / 1.5, 1.0)) + except Exception: + edge_ratio_score = 1.0 + + # Weighted sum, confidence strongest + score = ( + 0.35 * clamp(confidence) + + 0.15 * area_score + + 0.15 * sharp_score + + 0.10 * contrast_score + + 0.10 * border_score + + 0.10 * center_score + + 0.05 * edge_ratio_score + ) + return clamp(score) + + + +def marker_frontality_score(rotation_m_from_cam: np.ndarray) -> float: + """Marker normal relative to camera optical axis (+Z camera). 1.0 is frontal.""" + normal_cam = rotation_m_from_cam[:, 2] + # absolute because the sign depends on marker axis convention; we only want frontality + return clamp(abs(float(normal_cam[2]))) + + + +def compute_observation_weight(camera_pose: CameraPose, det: Dict[str, Any]) -> Tuple[float, float, float]: + det_score = detection_quality_score(det) + cam_score = get_camera_quality_score(camera_pose) + # frontality will be added after pose estimation; here only base score. + base = clamp(det_score * cam_score) + return base, det_score, cam_score + + +# ============================================================ +# Marker Pose Schätzung pro Observation +# ============================================================ + + +def build_marker_object_points(marker_size_m: float) -> np.ndarray: + half = marker_size_m / 2.0 + # OpenCV ArUco corner order: TL, TR, BR, BL + return np.array([ + [-half, half, 0.0], + [ half, half, 0.0], + [ half, -half, 0.0], + [-half, -half, 0.0], + ], dtype=np.float32) + + + +def solve_marker_pose_in_camera(img_pts: np.ndarray, marker_size_m: float, K: np.ndarray, D: np.ndarray) -> Tuple[np.ndarray, np.ndarray, float, float]: + obj_pts = build_marker_object_points(marker_size_m) + + success, rvec, tvec = cv2.solvePnP( + obj_pts, + img_pts.astype(np.float32), + K, + D, + flags=cv2.SOLVEPNP_IPPE_SQUARE, + ) + + if not success: + success, rvec, tvec = cv2.solvePnP( + obj_pts, + img_pts.astype(np.float32), + K, + D, + flags=cv2.SOLVEPNP_ITERATIVE, + ) + + if not success: + raise RuntimeError("solvePnP failed for one marker observation") + + projected, _ = cv2.projectPoints(obj_pts, rvec, tvec, K, D) + projected = projected.reshape(-1, 2) + err = np.linalg.norm(projected - img_pts, axis=1) + + rms = float(np.sqrt(np.mean(err ** 2))) + mx = float(np.max(err)) + return rvec.reshape(3), tvec.reshape(3), rms, mx + + + +def observation_to_world( + camera_pose: CameraPose, + marker_id: int, + det: Dict[str, Any], + K: np.ndarray, + D: np.ndarray, + robot_meta: Dict[int, List[Dict[str, Any]]], + default_marker_size_m: float, +) -> Optional[MarkerObservation]: + if det.get("image_points_px") is None: + return None + + img_pts = np.array(det["image_points_px"], dtype=np.float32) + if img_pts.shape != (4, 2): + return None + + marker_size_m = get_marker_size_from_detection(det, default_marker_size_m) + + try: + rvec_cm, tvec_cm, reproj_rms_px, reproj_max_px = solve_marker_pose_in_camera(img_pts, marker_size_m, K, D) + except Exception: + return None + + R_cm, _ = cv2.Rodrigues(rvec_cm) + cam_weight_base, det_score, cam_score = compute_observation_weight(camera_pose, det) + frontality = marker_frontality_score(R_cm) + + # Final observation weight: base quality * frontality + obs_weight = clamp(cam_weight_base * (0.25 + 0.75 * frontality)) + if obs_weight <= 1e-9: + obs_weight = 1e-9 + + # World transform + R_wc = camera_pose.rotation_w_from_c + t_wc = camera_pose.position_w + + position_w = R_wc @ tvec_cm + t_wc + rotation_w_from_m = R_wc @ R_cm + + metadata = summarize_robot_metadata(marker_id, robot_meta) + + return MarkerObservation( + marker_id=marker_id, + camera_id=camera_pose.camera_id, + source_file=camera_pose.source_file, + image_points_px=img_pts, + marker_size_m=float(marker_size_m), + confidence=float(det.get("confidence", 1.0)), + quality_score=float(det_score), + camera_weight=float(cam_score), + observation_weight=float(obs_weight), + rvec_cm=rvec_cm.astype(np.float64), + tvec_cm=tvec_cm.astype(np.float64), + position_w=position_w.astype(np.float64), + rotation_w_from_m=rotation_w_from_m.astype(np.float64), + reprojection_rms_px=reproj_rms_px, + reprojection_max_px=reproj_max_px, + metadata=metadata, + ) + + +# ============================================================ +# Fusion pro Marker +# ============================================================ + + +def fuse_marker_observations(marker_id: int, observations: List[MarkerObservation], robot_meta: Dict[int, List[Dict[str, Any]]]) -> FusedMarkerEstimate: + if not observations: + raise ValueError("No observations to fuse") + + # Optional outlier suppression: one soft reweighting pass based on position and angle residuals. + pos_stack = np.stack([o.position_w for o in observations], axis=0) + weights = np.array([o.observation_weight for o in observations], dtype=np.float64) + weights = np.maximum(weights, 1e-9) + + # Initial estimate + pos0 = np.average(pos_stack, axis=0, weights=weights) + rot0 = average_rotations_weighted([o.rotation_w_from_m for o in observations], weights) + + # Residual-based robustification + pos_err = np.linalg.norm(pos_stack - pos0.reshape(1, 3), axis=1) + ang_err = np.array([angular_distance_deg(rot0, o.rotation_w_from_m) for o in observations], dtype=np.float64) + + # Huber-like soft downweighting + pos_scale = max(0.01, float(np.median(pos_err) + 1e-9)) + ang_scale = max(2.0, float(np.median(ang_err) + 1e-9)) + robust = 1.0 / (1.0 + (pos_err / pos_scale) ** 2 + (ang_err / ang_scale) ** 2) + + final_w = weights * robust + final_w = np.maximum(final_w, 1e-9) + + pos = np.average(pos_stack, axis=0, weights=final_w) + rot = average_rotations_weighted([o.rotation_w_from_m for o in observations], final_w) + rpy = rotation_matrix_to_euler_zyx(rot) + + camera_ids = sorted({o.camera_id for o in observations}) + source_files = sorted({Path(o.source_file).name for o in observations}) + + weight_sum = float(np.sum(final_w)) + weight_mean = float(np.mean(final_w)) + + confidence_vals = np.array([o.confidence for o in observations], dtype=np.float64) + quality_vals = np.array([o.quality_score for o in observations], dtype=np.float64) + reproj_rms_vals = np.array([o.reprojection_rms_px for o in observations], dtype=np.float64) + + pos_std_m = float(np.sqrt(np.average(np.sum((pos_stack - pos.reshape(1, 3)) ** 2, axis=1), weights=final_w))) + angular_std_deg = float(np.sqrt(np.average(np.array([angular_distance_deg(rot, o.rotation_w_from_m) ** 2 for o in observations]), weights=final_w))) + + # Reliability heuristic in [0,1] + num_obs = len(observations) + num_cams = len(camera_ids) + obs_factor = clamp(num_obs / 5.0) + cam_factor = clamp(num_cams / 3.0) + conf_factor = clamp(float(np.mean(confidence_vals))) + qual_factor = clamp(float(np.mean(quality_vals))) + spread_factor = clamp(1.0 - min(pos_std_m / 0.08, 1.0)) + reproj_factor = clamp(1.0 - min(float(np.mean(reproj_rms_vals)) / 5.0, 1.0)) + reliability = clamp( + 0.18 * obs_factor + + 0.15 * cam_factor + + 0.22 * conf_factor + + 0.15 * qual_factor + + 0.15 * spread_factor + + 0.15 * reproj_factor + ) + + metadata = summarize_robot_metadata(marker_id, robot_meta) + + return FusedMarkerEstimate( + marker_id=marker_id, + position_w=pos.astype(np.float64), + rotation_w_from_m=rot.astype(np.float64), + orientation_rpy_deg=rpy, + observations=observations, + num_cameras=num_cams, + camera_ids=camera_ids, + weight_sum=weight_sum, + position_std_m=pos_std_m, + angular_std_deg=angular_std_deg, + confidence_mean=float(np.mean(confidence_vals)), + confidence_min=float(np.min(confidence_vals)), + confidence_max=float(np.max(confidence_vals)), + quality_mean=float(np.mean(quality_vals)), + quality_min=float(np.min(quality_vals)), + quality_max=float(np.max(quality_vals)), + reproj_rms_mean_px=float(np.mean(reproj_rms_vals)), + reproj_rms_max_px=float(np.max([o.reprojection_max_px for o in observations])), + reliability_0_1=reliability, + metadata=metadata, + ) + + +# ============================================================ +# Input Handling +# ============================================================ + + +def collect_input_files(args: argparse.Namespace) -> List[str]: + files: List[str] = [] + + if args.json: + files.extend(args.json) + + if args.input_dir and args.timestamp: + base = Path(args.input_dir) + ts = str(args.timestamp) + patterns = [ + f"*_{ts}_aruco_detection.camera_pose.json", + f"*_{ts}_aruco_detection.json", + f"*_{ts}.camera_pose.json", + f"*_{ts}.json", + ] + for pattern in patterns: + files.extend(str(p) for p in sorted(base.glob(pattern))) + + # de-duplicate, keep order + seen = set() + uniq = [] + for f in files: + ff = str(Path(f)) + if ff not in seen: + seen.add(ff) + uniq.append(ff) + + return uniq + + +def find_detection_json(camera_pose_json_path): + """ + Findet automatisch die passende Detection-JSON. + + Beispiel: + xxx_aruco_detection.camera_pose.json + -> + xxx_aruco_detection.json + """ + + pose_path = Path(camera_pose_json_path) + + name = pose_path.name + + if not name.endswith(".camera_pose.json"): + raise ValueError( + f"Not a .camera_pose.json file: {pose_path}" + ) + + detection_name = name.replace( + ".camera_pose.json", + ".json" + ) + + detection_path = pose_path.with_name(detection_name) + + if not detection_path.exists(): + raise FileNotFoundError( + f"Detection JSON not found: {detection_path}" + ) + + return detection_path + + +def load_detection_json(camera_pose_json_path): + """ + Lädt automatisch die passende Detection-Datei. + """ + + detection_path = find_detection_json( + camera_pose_json_path + ) + + with open(detection_path, "r", encoding="utf-8") as f: + return json.load(f) + + +def extract_intrinsics_from_detection(detection_data): + + if "camera" not in detection_data: + raise ValueError("camera section missing") + + camera = detection_data["camera"] + + if "camera_matrix" not in camera: + raise ValueError("camera_matrix missing") + + if "distortion_coefficients" not in camera: + raise ValueError("distortion_coefficients missing") + + K = np.array( + camera["camera_matrix"], + dtype=np.float32 + ) + + D = np.array( + camera["distortion_coefficients"], + dtype=np.float32 + ).reshape(-1, 1) + + return K, D + +def extract_default_marker_size(data: Dict[str, Any], robot_data: Dict[str, Any]) -> float: + # Priority: detection JSON -> robots.json -> fallback 25 mm + try: + vc = data.get("vision_config", {}) + if isinstance(vc, dict) and vc.get("MarkerSize") is not None: + return float(vc["MarkerSize"]) + except Exception: + pass + + try: + vc = robot_data.get("vision_config", {}) + if isinstance(vc, dict) and vc.get("MarkerSize") is not None: + return float(vc["MarkerSize"]) + except Exception: + pass + + return 0.025 + + +# ============================================================ +# CSV / JSON Export +# ============================================================ + + +def fmt_float(x: Optional[float], nd: int = 6) -> str: + if x is None: + return "" + try: + if isinstance(x, float) and (math.isnan(x) or math.isinf(x)): + return "" + return f"{float(x):.{nd}f}" + except Exception: + return "" + + + +def marker_row_from_estimate(est: FusedMarkerEstimate) -> Dict[str, Any]: + robot_on = est.metadata.get("robot_on") if est.metadata else None + robot_known = est.metadata.get("robot_known") if est.metadata else None + robot_relpos_count = est.metadata.get("robot_relpos_count") if est.metadata else 0 + + return { + "kind": "marker", + "id": str(est.marker_id), + "source_file": "", + "camera_id": "", + "robot_on": robot_on or "", + "robot_known": int(bool(robot_known)), + "robot_relpos_count": int(robot_relpos_count or 0), + "x_m": est.position_w[0], + "y_m": est.position_w[1], + "z_m": est.position_w[2], + "roll_deg": est.orientation_rpy_deg[0], + "pitch_deg": est.orientation_rpy_deg[1], + "yaw_deg": est.orientation_rpy_deg[2], + "num_observations": len(est.observations), + "num_cameras": est.num_cameras, + "cameras_seen_by": "|".join(est.camera_ids), + "weight_sum": est.weight_sum, + "weight_mean": est.weight_sum / max(1, len(est.observations)), + "confidence_mean": est.confidence_mean, + "confidence_min": est.confidence_min, + "confidence_max": est.confidence_max, + "quality_mean": est.quality_mean, + "quality_min": est.quality_min, + "quality_max": est.quality_max, + "reproj_rms_mean_px": est.reproj_rms_mean_px, + "reproj_rms_max_px": est.reproj_rms_max_px, + "position_std_m": est.position_std_m, + "angular_std_deg": est.angular_std_deg, + "reliability_0_1": est.reliability_0_1, + "source_files": "|".join(est.metadata.get("source_files", []) if est.metadata else []), + "note": "", + } + + + +def camera_row_from_pose(cam: CameraPose) -> Dict[str, Any]: + roll, pitch, yaw = cam.orientation_rpy_deg + return { + "kind": "camera", + "id": cam.camera_id, + "source_file": Path(cam.source_file).name, + "camera_id": cam.camera_id, + "robot_on": "", + "robot_known": 0, + "robot_relpos_count": 0, + "x_m": cam.position_w[0], + "y_m": cam.position_w[1], + "z_m": cam.position_w[2], + "roll_deg": roll, + "pitch_deg": pitch, + "yaw_deg": yaw, + "num_observations": 0, + "num_cameras": 0, + "cameras_seen_by": cam.camera_id, + "weight_sum": "", + "weight_mean": "", + "confidence_mean": "", + "confidence_min": "", + "confidence_max": "", + "quality_mean": cam.quality.get("pose_quality_score", ""), + "quality_min": "", + "quality_max": "", + "reproj_rms_mean_px": cam.quality.get("reprojection_rms_px", ""), + "reproj_rms_max_px": cam.quality.get("reprojection_max_px", ""), + "position_std_m": "", + "angular_std_deg": "", + "reliability_0_1": get_camera_quality_score(cam), + "source_files": Path(cam.source_file).name, + "note": "", + } + + + +def write_csv(rows: List[Dict[str, Any]], out_csv: str) -> None: + if not rows: + return + + fieldnames = [ + "kind", "id", "source_file", "camera_id", + "robot_on", "robot_known", "robot_relpos_count", + "x_m", "y_m", "z_m", + "roll_deg", "pitch_deg", "yaw_deg", + "num_observations", "num_cameras", "cameras_seen_by", + "weight_sum", "weight_mean", + "confidence_mean", "confidence_min", "confidence_max", + "quality_mean", "quality_min", "quality_max", + "reproj_rms_mean_px", "reproj_rms_max_px", + "position_std_m", "angular_std_deg", + "reliability_0_1", "source_files", "note", + ] + + with open(out_csv, "w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for row in rows: + cleaned = {} + for key in fieldnames: + value = row.get(key, "") + if isinstance(value, np.generic): + value = value.item() + cleaned[key] = value + writer.writerow(cleaned) + + + +def write_summary_json(summary: Dict[str, Any], out_json: str) -> None: + with open(out_json, "w", encoding="utf-8") as f: + json.dump(summary, f, indent=2, ensure_ascii=False) + + +# ============================================================ +# Main Pipeline +# ============================================================ + + +def main() -> None: + parser = argparse.ArgumentParser(description="Phase 1: multi-camera world marker fusion from camera pose JSONs") + parser.add_argument("--json", action="append", help="Input JSON file (repeatable). Expected 2 to 5 files.") + parser.add_argument("--input-dir", type=str, default=None, help="Optional directory to scan") + parser.add_argument("--timestamp", type=str, default=None, help="Optional timestamp used with --input-dir") + parser.add_argument("--robots", type=str, default=None, help="Optional robots.json for metadata only") + parser.add_argument("--out-csv", type=str, default=None, help="Output CSV path") + parser.add_argument("--out-json", type=str, default=None, help="Optional summary JSON path") + parser.add_argument("--min-confidence", type=float, default=0.0, help="Filter observations below this confidence") + parser.add_argument("--verbose", action="store_true", help="Verbose output") + args = parser.parse_args() + + files = collect_input_files(args) + if len(files) < 1: + raise SystemExit("No input JSON files provided. Use --json and/or --input-dir + --timestamp.") + if len(files) > 5: + print(f"[WARN] More than 5 input JSONs found ({len(files)}). All will be processed.") + elif len(files) < 2: + print(f"[WARN] Only {len(files)} input JSON file(s) found. The pipeline still runs.") + + robot_meta = load_robot_metadata(args.robots) + + # Parse all files first + cameras: List[CameraPose] = [] + all_observations: List[MarkerObservation] = [] + all_detections_count = 0 + total_rejected = 0 + + for file_path in files: + data = load_json(file_path) + cam = extract_camera_pose(data, file_path) + cameras.append(cam) + + + detection_data = load_detection_json(json_file) + + K, D = extract_intrinsics_from_detection(detection_data) + + + default_marker_size_m = extract_default_marker_size(data, load_json(args.robots) if args.robots else {}) + + detections = data.get("detections", []) if isinstance(data.get("detections", []), list) else [] + all_detections_count += len(detections) + + if args.verbose: + print(f"[INFO] {Path(file_path).name}: camera={cam.camera_id}, detections={len(detections)}") + + for det in detections: + marker_id = det.get("marker_id") + if marker_id is None: + total_rejected += 1 + continue + + try: + marker_id_int = int(marker_id) + except Exception: + total_rejected += 1 + continue + + confidence = float(det.get("confidence", 1.0)) + if confidence < args.min_confidence: + total_rejected += 1 + continue + + obs = observation_to_world( + camera_pose=cam, + marker_id=marker_id_int, + det=det, + K=K, + D=D, + robot_meta=robot_meta, + default_marker_size_m=default_marker_size_m, + ) + if obs is None: + total_rejected += 1 + continue + + all_observations.append(obs) + + # Group observations by marker id + obs_by_marker: Dict[int, List[MarkerObservation]] = {} + for obs in all_observations: + obs_by_marker.setdefault(obs.marker_id, []).append(obs) + + # Build fused estimates + fused_markers: List[FusedMarkerEstimate] = [] + for mid in sorted(obs_by_marker.keys()): + fused = fuse_marker_observations(mid, obs_by_marker[mid], robot_meta) + # attach source files to metadata for CSV convenience + fused.metadata = dict(fused.metadata) + fused.metadata["source_files"] = sorted({Path(o.source_file).name for o in fused.observations}) + fused_markers.append(fused) + + # CSV rows: cameras + markers + rows: List[Dict[str, Any]] = [] + for cam in cameras: + rows.append(camera_row_from_pose(cam)) + for est in fused_markers: + rows.append(marker_row_from_estimate(est)) + + # Sort: cameras first, then markers by numeric id if possible + def sort_key(row: Dict[str, Any]): + kind = row.get("kind", "") + if kind == "camera": + return (0, str(row.get("id", ""))) + try: + mid = int(row.get("id", 10**9)) + except Exception: + mid = 10**9 + return (1, mid) + + rows.sort(key=sort_key) + + # Output paths + if args.out_csv: + out_csv = args.out_csv + else: + stem = Path(files[0]).stem + if "camera_pose" in stem: + stem = stem.replace(".camera_pose", "") + out_csv = str(Path(files[0]).with_name(f"{stem}_world_markers.csv")) + + if args.out_json: + out_json = args.out_json + else: + out_json = str(Path(out_csv).with_suffix(".json")) + + write_csv(rows, out_csv) + + # Summary JSON: contains the fused marker estimates plus cameras and stats. + summary = { + "summary": { + "input_files": [Path(f).name for f in files], + "num_files": len(files), + "num_cameras": len(cameras), + "num_detections_total": all_detections_count, + "num_valid_observations": len(all_observations), + "num_markers_fused": len(fused_markers), + "num_rejected_detections": total_rejected, + }, + "cameras": [ + { + "camera_id": cam.camera_id, + "source_file": cam.source_file, + "position_m": cam.position_w.tolist(), + "rotation_matrix_world_from_camera": cam.rotation_w_from_c.tolist(), + "orientation_deg": { + "roll": cam.orientation_rpy_deg[0], + "pitch": cam.orientation_rpy_deg[1], + "yaw": cam.orientation_rpy_deg[2], + }, + "quality": cam.quality, + } + for cam in cameras + ], + "markers": [ + { + "marker_id": est.marker_id, + "position_m": est.position_w.tolist(), + "rotation_matrix_world_from_marker": est.rotation_w_from_m.tolist(), + "orientation_deg": { + "roll": est.orientation_rpy_deg[0], + "pitch": est.orientation_rpy_deg[1], + "yaw": est.orientation_rpy_deg[2], + }, + "num_observations": len(est.observations), + "num_cameras": est.num_cameras, + "camera_ids": est.camera_ids, + "weight_sum": est.weight_sum, + "position_std_m": est.position_std_m, + "angular_std_deg": est.angular_std_deg, + "confidence_mean": est.confidence_mean, + "quality_mean": est.quality_mean, + "reprojection_rms_mean_px": est.reproj_rms_mean_px, + "reliability_0_1": est.reliability_0_1, + "metadata": est.metadata, + "observations": [ + { + "camera_id": o.camera_id, + "source_file": o.source_file, + "confidence": o.confidence, + "quality_score": o.quality_score, + "camera_weight": o.camera_weight, + "observation_weight": o.observation_weight, + "position_m": o.position_w.tolist(), + "rotation_matrix_world_from_marker": o.rotation_w_from_m.tolist(), + "reprojection_rms_px": o.reprojection_rms_px, + "reprojection_max_px": o.reprojection_max_px, + "metadata": o.metadata, + } + for o in est.observations + ], + } + for est in fused_markers + ], + } + write_summary_json(summary, out_json) + + print(f"[OK] CSV written: {out_csv}") + print(f"[OK] JSON written: {out_json}") + print(f"[OK] Cameras: {len(cameras)}, markers fused: {len(fused_markers)}, observations: {len(all_observations)}") + + +if __name__ == "__main__": + main() diff --git a/test/2_estimate_camera_pose_from_aruco_json.test.js b/test/2_estimate_camera_pose_from_aruco_json.test.js index d183271..9acab6e 100644 --- a/test/2_estimate_camera_pose_from_aruco_json.test.js +++ b/test/2_estimate_camera_pose_from_aruco_json.test.js @@ -11,7 +11,7 @@ const TEST_SETUP_FILE = path.join(TEST_PATH, 'data', '0_testSetup.json'); const SCRIPT_FILE = path.join(PROJECT_PATH, 'programs', '1_detect_aruco_observations.py'); const ROBOT_PATH = path.join(__dirname, 'data', 'robot', 'robot.json'); const SCRIPT_FILE_2 = path.join(PROJECT_PATH, 'programs', '2_estimate_camera_pose_from_aruco_json.py'); - +const SCRIPT_FILE_3 = path.join(PROJECT_PATH, 'programs', '3_fuse_markers_world.py'); const cam = { id : 'cam1', @@ -19,7 +19,11 @@ const cam = { intrinsics: path.join(PROJECT_PATH, 'data', 'settings','callibration_cam0.npz') }; - +const cam2 = { + id : 'cam1', + image: 'snapshot_video0_1779690911822.jpg', + intrinsics: path.join(PROJECT_PATH, 'data', 'settings','callibration_cam0.npz') +}; describe('Check if Python 2 runs', () => { @@ -61,6 +65,61 @@ describe('Check if Python 2 runs', () => { }); + test('Second File Run of Python second script', () => { + + + console.log('Intrinsics : ', cam.intrinsics); + execFileSync(PYTHON_CMD, [ + SCRIPT_FILE, + '-i', path.join(SOURCE_DIR, cam2.image), + '-npz', cam.intrinsics, + '-robot', ROBOT_PATH, + '-cameraId', cam2.id + + , + '-outDir', TARGET_DIR + ], { + stdio: 'inherit', + cwd: SOURCE_DIR // <- wichtig + }); + + + const resultFile = path.join(TARGET_DIR, 'snapshot_video0_1779690911822_aruco_detection.json'); + + if (!fs.existsSync(resultFile)) { + throw new Error(`Erwartete Datei fehlt: ${resultFile}`); + } + + + console.log('Intrinsics : ', cam.intrinsics); + execFileSync(PYTHON_CMD, [ + SCRIPT_FILE_2, + '--detections' , resultFile, + '--robots', ROBOT_PATH + ], { + stdio: 'inherit', + cwd: SOURCE_DIR // <- wichtig + }); + + }); + + + test('Second File Run of Python second script', () => { + + + console.log('Intrinsics : ', cam.intrinsics); + execFileSync(PYTHON_CMD, [ + SCRIPT_FILE_3, + '--json', path.join(TARGET_DIR, 'snapshot_video0_1779690911822_aruco_detection.camera_pose.json'), + '--json', path.join(TARGET_DIR, 'snapshot_video1_1779690911822_aruco_detection.camera_pose.json'), + '--robots', ROBOT_PATH + ], { + stdio: 'inherit', + cwd: SOURCE_DIR // <- wichtig + }); + }); + + /* // ✅ Cleanup läuft IMMER nach jedem Test afterEach(() => { diff --git a/test/data/screenShots/fused_markers.csv b/test/data/screenShots/fused_markers.csv new file mode 100644 index 0000000..468dad7 --- /dev/null +++ b/test/data/screenShots/fused_markers.csv @@ -0,0 +1,20 @@ +marker_id,x,y,z,num_cameras,spread_m,known_marker,mean_confidence,mean_weight +219,0.43451087900435764,-0.2430617527365844,0.24864745186629233,2,0.049300733586341176,False,0.5277170316988301,0.009300142928461588 +200,0.2883150365803509,0.0027948039097262806,0.09467189774796789,2,0.033606991516837784,False,0.7555202975260764,0.06883103535083403 +210,0.002038147844813181,0.018380350725407422,-0.018394701031480637,2,0.006772066645204387,True,0.6723693051501728,0.0204532377982721 +215,0.21054334774487873,-0.0718736597935183,-0.02391368422209966,2,0.013509208968825837,True,0.7724894180619204,0.08402370302052731 +197,0.30386845803628715,-0.12692615651017353,0.06409248498248943,1,0.0,False,0.6986174216600093,0.14966530971529068 +218,0.43184321660132874,-0.1704819414678139,0.16367009746963013,2,0.03802223210213124,False,0.3413914136453039,0.010365470204791368 +229,0.3602436104705841,-0.11844379401765685,0.08440112143347289,2,0.032107134962667656,False,0.8405383706616931,0.06951698019008724 +243,0.3687471932606844,-0.14650136025597116,0.042188479232535214,1,0.0,False,0.65908988611209,0.10324035134123742 +211,0.2137591400767156,0.026959799238792163,-0.028605385312781718,2,0.010891850650024732,True,0.6444060145125223,0.050073587946979366 +198,0.35259204770580493,-0.030524117439818314,0.08694698191739618,2,0.02706837734419262,False,0.7253533793498493,0.058021822336083224 +201,0.24149429994177674,0.06896595765348834,0.09401141969091542,1,0.0,False,0.4573781640909966,0.04398530324515346 +204,0.27355941290220315,0.13289470613188933,0.1235983427523017,2,0.02985425204964259,False,0.5691140927436947,0.023331477027783525 +217,0.6516780631570386,0.043108222213155606,-0.050180784675989236,2,0.049194700206082534,True,0.5355335672966184,0.014438151705364624 +196,0.3889281825540628,-0.37463423301617327,0.2628466843421774,1,0.0,False,0.9637599331074872,0.012495067650793962 +180,0.42148181203795254,-0.3888496207938902,0.3076404324887513,1,0.0,False,0.6855326216817852,0.003758399623359111 +189,0.39699505778356714,-0.414056777239273,0.2548045860642588,1,0.0,False,0.5680798096818698,0.005206949324425661 +208,0.5002433623385828,-0.10538715198248683,-0.04743817619792867,1,0.0,True,0.7535707616050168,0.006307514070673037 +214,0.41191377730374135,-0.0008334112347655465,-0.03314121580279006,1,0.0,True,0.6847250665842876,0.005021652275542729 +226,0.4275023262853984,-0.13129556163502792,0.04582380048078871,1,0.0,False,0.10322993259729876,0.0003523077820839931