Initial commit

This commit is contained in:
chk
2026-06-08 19:50:36 +02:00
commit 53db55ba36
39 changed files with 5860 additions and 0 deletions

149
README.md Normal file
View File

@@ -0,0 +1,149 @@
# appRobotBodyTrack
3D-Body-Tracking für Roboter aus Mehrkamera-ArUco-Bildern.
**Input**
- Bilder: `render_*.png`
- Intrinsics: `render_*.npz`
- Konfiguration: `robot.json`
**Output**
- Gelenke **R⁷**`{x, y, z, a, b, c, e}` (mm / Grad)
---
## Interfaces
Eine Logik, drei Zugänge:
- **Python**
- **CLI**
- **REST (FastAPI)**
---
## Quickstart
### Python
```python
from scripts import estimate_from_dir
result = estimate_from_dir("data/Scene8", robot_json="robot.json")
print(result.joints)
print(result.confidence)
```
---
### CLI
```bash
pip install -e .
python -m scripts data/Scene8 --robot robot.json
python -m scripts data/Scene8 --robot robot.json --cameras a,b,d
```
---
### REST API
```bash
docker compose up
```
**Request:**
```python
import requests
resp = requests.post(
"http://localhost:8446/v1/estimate",
files=[
("images", ("render_a.png", open("render_a.png", "rb"))),
("intrinsics", ("render_a.npz", open("render_a.npz", "rb"))),
],
)
print(resp.json()["joints"])
```
---
## API
| Endpoint | Methode | Zweck |
|----------|--------|------|
| `/v1/estimate` | POST | Bilder → Gelenke |
| `/v1/health` | GET | Status |
| `/v1/config` | GET | aktive Konfiguration |
**Response:**
```json
{
"joints": {"x": 50.2, "y": -2.1, "z": 94.8, "a": 20.1},
"confidence": {"x": "high", "b": "low"},
"residual_rms": 1.45,
"n_markers": 56,
"processing_ms": 1240
}
```
---
## Struktur
```
.
├── scripts/
├── config/robot.json
├── tests/
└── docker-compose.yaml
```
---
## Deployment (Docker / Portainer)
**Volume:**
```yaml
- /opt/approbot/config/robot.json:/config/robot.json:ro
```
**Healthcheck:**
```bash
curl http://<host>:8446/v1/health
```
---
## Konfiguration
Zentrale Datei: **`robot.json`**
Verwendete Bereiche:
- `links`
- `pose_estimation`
- `vision_config`
- `movements`
- `units`
---
## Stack (minimal)
- numpy
- scipy
- opencv (aruco)
- fastapi + uvicorn
---
## Naming
- **BodyTrack** → Tracking (dynamisch) ✅
- **BodyMap** → Modell / Repräsentation
- **BodySense** → Wahrnehmung (low-level)

19
config/robot.json.example Normal file
View File

@@ -0,0 +1,19 @@
{
"_comment": "Kopiere deine robot.json hierher und benenne sie in robot.json um.",
"_comment2": "Die Pipeline liest nur: links, pose_estimation, vision_config, units.",
"_comment3": "Alle anderen Abschnitte (renderingInfo, robot_test_poses) werden ignoriert.",
"units": { "length": "mm", "angle": "deg" },
"links": [],
"vision_config": { "aruco_dict": "DICT_4X4_250" },
"pose_estimation": {
"method": "hybrid",
"marker_observation": "corner_pose",
"use_normals": true,
"normal_weight": 100.0,
"robust_loss": "huber",
"huber_delta_mm": 8.0,
"max_iterations": 200,
"min_cameras_per_marker": 2,
"per_link_method": {}
}
}

239
doc/api_integration.md Normal file
View File

@@ -0,0 +1,239 @@
# API Integration — appRobotBodyTrack
Der Service läuft als HTTP-Server auf Port 8446 und ist von jeder Sprache aus
erreichbar. Alle Requests nutzen `multipart/form-data`.
---
## Endpunkte im Überblick
| Endpunkt | Methode | Input | Output |
|---|---|---|---|
| `/v1/estimate` | POST | Bilder + Intrinsiken (+ optional robot.json) | Gelenkwinkel JSON |
| `/v1/health` | GET | — | `{"status": "ok", "version": "1.0.0"}` |
| `/v1/config` | GET | — | Aktiver `pose_estimation`-Block |
---
## Python (`requests`)
```python
import requests
BASE = "http://localhost:8446"
# ── Health-Check ────────────────────────────────────────────────
resp = requests.get(f"{BASE}/v1/health")
print(resp.json()) # {"status": "ok", "version": "1.0.0"}
# ── Pose-Schätzung ───────────────────────────────────────────────
# Bilder und zugehörige Intrinsiken in der gleichen Reihenfolge übergeben.
# Dateinamen müssen render_<id>.png / render_<id>.npz sein —
# die ID (a, b, c, ...) verknüpft Bild und Intrinsik intern.
camera_ids = ["a", "b", "c"]
files = []
for cid in camera_ids:
files.append(("images", (f"render_{cid}.png", open(f"render_{cid}.png", "rb"), "image/png")))
files.append(("intrinsics", (f"render_{cid}.npz", open(f"render_{cid}.npz", "rb"), "application/octet-stream")))
resp = requests.post(f"{BASE}/v1/estimate", files=files)
resp.raise_for_status()
result = resp.json()
print(result["joints"]) # {"x": 50.2, "y": -2.1, "z": 94.8, "a": 20.1, "b": 59.9, "c": 9.0, "e": 3.0}
print(result["confidence"]) # {"x": "high", "b": "low", ...}
print(result["residual_rms"]) # 1.45
print(result["processing_ms"]) # 1240
```
### robot.json pro Request mitschicken (überschreibt Server-Konfig)
```python
files.append(("robot_json", ("robot.json", open("robot.json", "rb"), "application/json")))
resp = requests.post(f"{BASE}/v1/estimate", files=files)
```
### Fehlerbehandlung
```python
resp = requests.post(f"{BASE}/v1/estimate", files=files)
if resp.status_code == 400:
print("Ungültige Eingabe:", resp.json()["detail"])
elif resp.status_code == 500:
print("Pipeline-Fehler:", resp.json()["detail"])
else:
resp.raise_for_status()
joints = resp.json()["joints"]
```
### Async mit `httpx`
```python
import asyncio
import httpx
async def estimate(camera_ids: list[str]) -> dict:
async with httpx.AsyncClient(base_url="http://localhost:8446") as client:
files = []
for cid in camera_ids:
files.append(("images", (f"render_{cid}.png", open(f"render_{cid}.png", "rb"))))
files.append(("intrinsics", (f"render_{cid}.npz", open(f"render_{cid}.npz", "rb"))))
resp = await client.post("/v1/estimate", files=files, timeout=60.0)
resp.raise_for_status()
return resp.json()
result = asyncio.run(estimate(["a", "b", "c"]))
```
---
## Node.js
### Native `fetch` + `FormData` (Node 18+, kein extra Paket)
```js
import { readFileSync } from "fs";
const BASE = "http://localhost:8446";
const cameraIds = ["a", "b", "c"];
// ── Health-Check ────────────────────────────────────────────────
const health = await fetch(`${BASE}/v1/health`);
console.log(await health.json()); // { status: 'ok', version: '1.0.0' }
// ── Pose-Schätzung ───────────────────────────────────────────────
const form = new FormData();
for (const id of cameraIds) {
form.append(
"images",
new Blob([readFileSync(`render_${id}.png`)], { type: "image/png" }),
`render_${id}.png`
);
form.append(
"intrinsics",
new Blob([readFileSync(`render_${id}.npz`)], { type: "application/octet-stream" }),
`render_${id}.npz`
);
}
const resp = await fetch(`${BASE}/v1/estimate`, { method: "POST", body: form });
if (!resp.ok) {
const err = await resp.json();
throw new Error(`Pipeline-Fehler ${resp.status}: ${err.detail}`);
}
const result = await resp.json();
console.log(result.joints); // { x: 50.2, y: -2.1, z: 94.8, a: 20.1, b: 59.9, c: 9.0, e: 3.0 }
console.log(result.confidence); // { x: 'high', b: 'low', ... }
```
### `axios` + `form-data` (Node 16 / CommonJS-Umgebungen)
```bash
npm install axios form-data
```
```js
const axios = require("axios");
const FormData = require("form-data");
const fs = require("fs");
const BASE = "http://localhost:8446";
const cameraIds = ["a", "b", "c"];
const form = new FormData();
for (const id of cameraIds) {
form.append("images", fs.createReadStream(`render_${id}.png`), `render_${id}.png`);
form.append("intrinsics", fs.createReadStream(`render_${id}.npz`), `render_${id}.npz`);
}
const resp = await axios.post(`${BASE}/v1/estimate`, form, {
headers: form.getHeaders(),
timeout: 60_000,
});
const { joints, confidence, residual_rms, n_markers, processing_ms } = resp.data;
console.log(joints);
```
---
## Response-Format
```json
{
"joints": {
"x": 50.2,
"y": -2.1,
"z": 94.8,
"a": 20.1,
"b": 59.9,
"c": 9.0,
"e": 3.0
},
"confidence": {
"x": "high",
"y": "high",
"z": "high",
"a": "high",
"b": "low",
"c": "low",
"e": "low"
},
"residual_rms": 1.45,
"n_markers": 56,
"processing_ms": 1240
}
```
### Felder
| Feld | Typ | Einheit | Beschreibung |
|---|---|---|---|
| `joints.x` | float | mm | Linearachse X |
| `joints.y` | float | mm | Linearachse Y |
| `joints.z` | float | mm | Linearachse Z (Höhe) |
| `joints.a` | float | ° | Drehgelenk A |
| `joints.b` | float | ° | Drehgelenk B |
| `joints.c` | float | ° | Drehgelenk C |
| `joints.e` | float | ° | Fingergelenk E |
| `confidence.*` | string | — | `high` / `medium` / `low` / `none` |
| `residual_rms` | float | mm | RMS-Restfehler der Schätzung |
| `n_markers` | int | — | Anzahl triangulierter Marker |
| `processing_ms` | int | ms | Gesamtlaufzeit der Pipeline |
### Confidence-Stufen
| Wert | Bedeutung |
|---|---|
| `high` | Gelenk gut durch mehrere Marker beobachtet |
| `medium` | Gelenk beobachtet, aber mit eingeschränkter Geometrie |
| `low` | Nur indirekt oder mit wenigen Markern beobachtet |
| `none` | Gelenk nicht beobachtbar (z.B. alle Marker verdeckt) |
### HTTP-Fehlercodes
| Code | Bedeutung |
|---|---|
| `400` | Eingabefehler (fehlende Dateien, falsche Namen, keine robot.json) |
| `500` | Pipeline-Fehler (ArUco nicht gefunden, Triangulation fehlgeschlagen, …) |
---
## Dateinamens-Konvention
Die Kamera-ID in Dateinamen verknüpft Bild und Intrinsik:
```
render_a.png ←→ render_a.npz # Kamera "a"
render_b.png ←→ render_b.npz # Kamera "b"
render_c.png ←→ render_c.npz # Kamera "c"
```
Die ID kann ein Buchstabe oder eine kurze alphanumerische Zeichenkette sein.
Reihenfolge der `files`-Liste ist egal — die Zuordnung erfolgt über den Dateinamen.

280
doc/robot_json.md Normal file
View File

@@ -0,0 +1,280 @@
# robot.json — Entwurf und Schema
## Entwurfsprinzip: Eine Datei pro Roboter
`robot.json` ist die **zentrale Identitätsdatei** des Roboters. Sie beschreibt
alles, was zum Roboter gehört — Kinematik, Marker, Kamera-Setup, Rendering-Parameter
und Algorithmus-Tuning. Es gibt genau eine Datei pro Roboter.
```
robot.json → Pipeline-Service (liest: links, vision_config, pose_estimation, ...)
→ Blender-Renderer (liest: links, renderingInfo, robot_test_poses, ...)
→ Benchmark-Tools (liest: robot_test_poses, test_camera_positions, ...)
```
Jeder Konsument liest nur seine Abschnitte und ignoriert alle anderen stillschweigend.
Das macht `robot.json` **additiv erweiterbar**: neue Tools fügen neue Abschnitte hinzu,
ohne bestehende zu berühren.
**Roboter wechseln = `robot.json` austauschen.**
Alle Werkzeuge der Umgebung stellen sich damit automatisch auf den neuen Roboter ein.
---
## Abschnitts-Übersicht
| Abschnitt | Pipeline | Renderer | Benchmark | Beschreibung |
|---|:---:|:---:|:---:|---|
| `units` | ✅ | ✅ | ✅ | Maßeinheiten (mm, deg) |
| `coordinateSystem` | ✅ | ✅ | — | Basis-Koordinatensystem |
| `links` | ✅ | ✅ | — | Kinematische Kette + ArUco-Marker |
| `movements` | ✅ | ✅ | ✅ | Gelenkachsen-Definition, Ausgabeformat |
| `vision_config` | ✅ | — | — | ArUco-Dictionary, Markergröße |
| `pose_estimation` | ✅ | — | — | Algorithmus-Parameter |
| `constraint_rules` | ✅ | — | — | Gelenkwinkel-Grenzen |
| `observation_weighting` | ✅ | — | — | Gewichtung pro Gelenk/Beobachtungstyp |
| `multiview_calculation` | ✅ | — | — | Bundle-Adjustment-Einstellungen |
| `renderingInfo` | — | ✅ | — | Blender-Szene, Kamera-Rig, Materialien |
| `robot_test_poses` | — | ✅ | ✅ | Teststellungen für Rendering / Evaluierung |
| `test_camera_positions` | — | ✅ | ✅ | Kamera-Aufstellungen für Tests |
| `test_camera_targets` | — | ✅ | — | Blickziele der Test-Kameras |
| `state_pose_params` | ✅ | ✅ | — | Parameterraum-Definition (R⁷) |
| `defaultPosition` | ✅ | ✅ | — | Referenz-Nullstellung |
---
## Pflichtabschnitte
### `units`
```json
"units": {
"length": "mm",
"angle": "deg"
}
```
Definiert die Einheiten für alle Längen- und Winkelangaben in der gesamten Datei.
---
### `links`
Kinematische Kette des Roboters, von der Basis zum Endeffektor.
Jedes Glied kennt sein Gelenk, seine Transformation in Nullstellung und
die auf ihm montierten ArUco-Marker.
```json
"links": [
{
"name": "Base",
"joint": "x",
"joint_type": "prismatic",
"axis": [0, 0, 1],
"T_parent_link_home": [
[1, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, 100],
[0, 0, 0, 1]
],
"markers": [
{
"id": 0,
"size_mm": 60.0,
"T_link_marker": [
[1, 0, 0, 0],
[0, 1, 0, 50],
[0, 0, 1, 0],
[0, 0, 0, 1]
]
}
]
}
]
```
| Feld | Typ | Pflicht | Beschreibung |
|---|---|:---:|---|
| `name` | string | ✅ | Name des Glieds |
| `joint` | string | ✅ | Gelenkvariable: `x`, `y`, `z`, `a`, `b`, `c`, `e` |
| `joint_type` | string | ✅ | `"prismatic"` oder `"revolute"` |
| `axis` | [x,y,z] | ✅ | Gelenkachse im Eltern-KS |
| `T_parent_link_home` | 4×4 | ✅ | Transformation Eltern→Glied in Nullstellung |
| `markers` | Array | — | ArUco-Marker auf diesem Glied (kann leer sein) |
| `markers[].id` | int | ✅ | ArUco-Marker-ID |
| `markers[].size_mm` | float | ✅ | Kantenlänge in mm |
| `markers[].T_link_marker` | 4×4 | ✅ | Transformation Glied→Marker-Mittelpunkt |
---
### `movements`
Definiert die sieben Gelenkachsen des Roboters, ihre physikalischen Grenzen
und wie sie im Output (`robot_state.json`) benannt und geordnet werden.
```json
"movements": {
"x": { "type": "prismatic", "min_mm": 0, "max_mm": 800 },
"y": { "type": "prismatic", "min_mm": -400, "max_mm": 400 },
"z": { "type": "prismatic", "min_mm": 0, "max_mm": 1200 },
"a": { "type": "revolute", "min_deg": -180,"max_deg": 180 },
"b": { "type": "revolute", "min_deg": -90, "max_deg": 90 },
"c": { "type": "revolute", "min_deg": -90, "max_deg": 90 },
"e": { "type": "revolute", "min_deg": 0, "max_deg": 90 }
}
```
---
## Pipeline-Abschnitte
### `vision_config`
```json
"vision_config": {
"aruco_dict": "DICT_4X4_250",
"marker_size_mm": 20.0
}
```
| Feld | Default | Beschreibung |
|---|---|---|
| `aruco_dict` | `"DICT_4X4_250"` | OpenCV-ArUco-Dictionary |
| `marker_size_mm` | aus `links[].markers[].size_mm` | Globale Fallback-Markergröße |
---
### `pose_estimation`
Algorithmus-Parameter für die Gelenkwinkelschätzung.
Alle Felder haben Defaults — fehlende Felder werden still ignoriert.
```json
"pose_estimation": {
"method": "hybrid",
"marker_observation": "corner_pose",
"use_normals": true,
"normal_weight": 100.0,
"robust_loss": "huber",
"huber_delta_mm": 8.0,
"max_iterations": 200,
"min_cameras_per_marker": 2,
"per_link_method": {}
}
```
| Feld | Default | Beschreibung |
|---|---|---|
| `method` | `"hybrid"` | `sequential_fk` / `global_ba` / `hybrid` |
| `marker_observation` | `"corner_pose"` | `"corner_pose"` (pos+normal) oder `"center_point"` (pos only) |
| `use_normals` | `true` | Marker-Flächennormalen als Zusatz-Constraint |
| `normal_weight` | `100.0` | Gewicht Normal-Residuen vs. Positions-Residuen |
| `robust_loss` | `"huber"` | `"none"` / `"huber"` / `"cauchy"` |
| `huber_delta_mm` | `8.0` | Huber-Schwelle in mm |
| `max_iterations` | `200` | Bundle-Adjustment-Iterationslimit |
| `min_cameras_per_marker` | `2` | Mindestanzahl Kameras für Triangulation |
| `per_link_method` | `{}` | Override pro Gelenk, z.B. `{"e": "sequential_fk"}` |
---
### `observation_weighting`
Gewichtung der einzelnen Marker-Beobachtungen in der Schätzung,
z.B. um bekannte schwache Geometrien zu dämpfen.
```json
"observation_weighting": {
"default": 1.0,
"per_link": { "Hand": 0.5 }
}
```
---
### `multiview_calculation`
Einstellungen für Schritt 3 (Bundle Adjustment über alle Kameras).
```json
"multiview_calculation": {
"lambda_weight": 100.0,
"min_views": 2
}
```
---
### `constraint_rules`
Gelenkwinkel-Abhängigkeiten und -Grenzen, die in der Schätzung als
Hard- oder Soft-Constraints wirken.
```json
"constraint_rules": [
{ "joint": "b", "min_deg": 0, "max_deg": 180 }
]
```
---
## Renderer-Abschnitte
### `renderingInfo`
Blender-spezifische Szenenparameter: Pfad zur `.blend`-Datei, Materialien,
Beleuchtungssetup, Auflösung und Kamera-Rig-Konfiguration.
Wird von der Pipeline vollständig ignoriert.
---
### `robot_test_poses`
Liste von Roboterstellungen, die im Renderer gerendert und in der Evaluierung
als Ground-Truth verwendet werden. Jeder Eintrag ist ein vollständiger R⁷-Zustand.
```json
"robot_test_poses": [
{ "x": 50, "y": 0, "z": 600, "a": 30, "b": 45, "c": 0, "e": 20 },
{ "x": 200, "y": -100, "z": 700, "a": -15, "b": 60, "c": 10, "e": 0 }
]
```
---
### `test_camera_positions`
Kamera-Aufstellungen für den Renderer, als Liste von 3D-Positionen und Ausrichtungen.
---
## Extensibilität
Neue Tools oder Features fügen neue Abschnitte hinzu, ohne bestehende zu ändern:
```json
{
"units": { ... }, // alle Tools
"links": [ ... ], // alle Tools
"pose_estimation": { ... },// Pipeline
"renderingInfo": { ... }, // Renderer
"my_new_tool": { ... } // neues Tool — alle anderen ignorieren es
}
```
**Versionsregel:** Neue Felder innerhalb bestehender Abschnitte haben immer Defaults.
Felder werden nie entfernt, nur als veraltet markiert. Eine ältere `robot.json`
läuft damit auf einer neueren Pipeline-Version unverändert.
---
## Roboter wechseln
Um auf einen anderen Roboter umzustellen, wird ausschließlich `robot.json` ausgetauscht:
```
robot_A.json → robot.json # Roboter A aktiv
robot_B.json → robot.json # Roboter B aktiv
```
Pipeline, Renderer, Benchmark-Tools und Portainer-Stack lesen denselben
Volume-Mount `/config/robot.json` — kein weiterer Eingriff nötig.

View File

@@ -0,0 +1,28 @@
# robot.json — Pipeline-Schema
> Dieses Dokument beschreibt nur die Pipeline-relevanten Felder.
> Die vollständige Beschreibung aller Abschnitte und das Entwurfsprinzip
> (eine Datei für alle Werkzeuge) steht in [robot_json.md](robot_json.md).
---
## Pipeline-Pflichtfelder
| Abschnitt | Pflicht | Beschreibung |
|---|:---:|---|
| `units` | ✅ | Maßeinheiten (`mm`, `deg`) |
| `links` | ✅ | Kinematische Kette + ArUco-Marker |
| `vision_config` | ✅ | ArUco-Dictionary, Markergröße |
## Pipeline-Optionalfelder (alle mit Defaults)
| Abschnitt | Beschreibung |
|---|---|
| `pose_estimation` | Algorithmus-Parameter |
| `observation_weighting` | Gewichtung pro Glied |
| `multiview_calculation` | Bundle-Adjustment-Einstellungen |
| `constraint_rules` | Gelenkwinkel-Grenzen |
| `movements` | Parameterraum-Definition |
Alle weiteren Abschnitte (`renderingInfo`, `robot_test_poses`, …) werden von
der Pipeline stillschweigend ignoriert.

25
docker-compose.yaml Normal file
View File

@@ -0,0 +1,25 @@
services:
pipeline:
build:
context: .
dockerfile_inline: |
FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install --no-cache-dir .
ENV ROBOT_JSON=/config/robot.json
EXPOSE 8446
HEALTHCHECK --interval=30s --timeout=10s --start-period=15s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8446/v1/health')"
CMD ["python", "-m", "scripts.api", \
"--robot", "/config/robot.json", \
"--host", "0.0.0.0", "--port", "8446"]
image: approbot/pose-pipeline:1.0.0
container_name: appRobotBodyTracker
restart: unless-stopped
ports:
- "8446:8446"
volumes:
- ./config/robot.json:/config/robot.json:ro
environment:
- ROBOT_JSON=/config/robot.json

25
pyproject.toml Normal file
View File

@@ -0,0 +1,25 @@
[build-system]
requires = ["setuptools>=68", "wheel"]
build-backend = "setuptools.backends.legacy:build"
[project]
name = "approbot-pipeline"
version = "1.0.0"
description = "Robot pose estimation from multi-camera ArUco images"
readme = "doc/README.md"
requires-python = ">=3.10"
dependencies = [
"numpy==1.26.4",
"scipy==1.13.1",
"opencv-contrib-python-headless==4.10.0.84",
"fastapi==0.115.0",
"uvicorn[standard]==0.30.6",
"python-multipart==0.0.9",
]
[project.scripts]
approbot-pipeline = "scripts.__main__:main"
[tool.setuptools.packages.find]
where = ["."]
include = ["scripts*"]

23
scripts/__init__.py Normal file
View File

@@ -0,0 +1,23 @@
"""
scripts
=======
Roboter-Pose-Schätzung aus Mehrkamera-ArUco-Bildern.
Zwei Interfaces, gleiche Logik darunter:
(A) Python-Bibliothek — direkt einbindbar:
from scripts import estimate_from_dir, PipelineResult
result = estimate_from_dir("path/to/images", robot_json="robot.json")
print(result.joints) # {"x": 50.2, "y": -2.1, ...}
print(result.confidence) # {"x": "high", "b": "low", ...}
(B) REST-API — läuft als Service im Docker-Container:
POST /v1/estimate (multipart: images + intrinsics)
GET /v1/health
GET /v1/config
→ JSON mit joints, confidence, residual_rms, processing_ms
"""
from .pipeline import estimate_from_dir, PipelineResult
__version__ = "1.0.0"
__all__ = ["estimate_from_dir", "PipelineResult", "__version__"]

43
scripts/__main__.py Normal file
View File

@@ -0,0 +1,43 @@
"""Einstiegspunkt: python -m scripts <image_dir> [--robot ...] [--cameras ...]"""
import argparse
import json
import sys
def main() -> None:
ap = argparse.ArgumentParser(
description="appRobotBodyTrack: Bilder + robot.json → Gelenkwinkel im R⁷"
)
ap.add_argument("image_dir", help="Ordner mit render_*.png/jpg und render_*.npz")
ap.add_argument("--robot", default=None, help="Pfad zu robot.json")
ap.add_argument("--evalDir", default=None, help="Ausgabeordner (Standard: temporäres Verzeichnis)")
ap.add_argument("--cameras", default=None, help="Kamera-IDs, kommagetrennt, z.B. a,b,d")
ap.add_argument("--lambdaWeight", type=float, default=100.0)
args = ap.parse_args()
from scripts import estimate_from_dir
camera_filter = args.cameras.split(",") if args.cameras else None
try:
result = estimate_from_dir(
args.image_dir,
robot_json=args.robot,
eval_dir=args.evalDir,
lambda_weight=args.lambdaWeight,
camera_filter=camera_filter,
)
except Exception as exc:
print(f"[ERROR] {exc}", file=sys.stderr)
sys.exit(1)
print(json.dumps({
"joints": result.joints,
"confidence": result.confidence,
"n_markers": result.n_markers,
"residual_rms": result.residual_rms,
"processing_ms": result.processing_ms,
}, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()

12
scripts/api/__init__.py Normal file
View File

@@ -0,0 +1,12 @@
"""scripts.api — FastAPI REST-Service."""
from .server import create_app
def start_server(
robot_json=None,
host: str = "0.0.0.0",
port: int = 8080,
) -> None:
import uvicorn
app = create_app(robot_json=robot_json)
uvicorn.run(app, host=host, port=port)

17
scripts/api/__main__.py Normal file
View File

@@ -0,0 +1,17 @@
"""Einstiegspunkt: python -m scripts.api [--robot ...] [--host ...] [--port ...]"""
import argparse
from scripts.api import start_server
def main() -> None:
ap = argparse.ArgumentParser(description="approbot-pipeline REST-API starten")
ap.add_argument("--robot", default=None, help="Pfad zu robot.json")
ap.add_argument("--host", default="0.0.0.0")
ap.add_argument("--port", type=int, default=8080)
args = ap.parse_args()
start_server(robot_json=args.robot, host=args.host, port=args.port)
if __name__ == "__main__":
main()

83
scripts/api/server.py Normal file
View File

@@ -0,0 +1,83 @@
"""FastAPI REST-API für appRobotBodyTrack."""
from __future__ import annotations
import json
import tempfile
from pathlib import Path
from typing import List, Optional
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.responses import JSONResponse
from scripts import __version__, estimate_from_dir
_robot_json: Optional[Path] = None
def create_app(robot_json: str | Path | None = None) -> FastAPI:
"""App-Fabrik — setzt optionale Server-weite robot.json-Konfig."""
global _robot_json
if robot_json:
_robot_json = Path(robot_json).resolve()
return _app
_app = FastAPI(title="approbot-pipeline", version=__version__)
@_app.get("/v1/health")
def health():
return {"status": "ok", "version": __version__}
@_app.get("/v1/config")
def config():
if _robot_json is None or not _robot_json.exists():
raise HTTPException(404, "Keine robot.json konfiguriert")
data = json.loads(_robot_json.read_text(encoding="utf-8"))
return data.get("pose_estimation", {})
@_app.post("/v1/estimate")
async def estimate(
images: List[UploadFile] = File(..., description="Kamerabilder (render_<id>.png)"),
intrinsics: List[UploadFile] = File(..., description="Kamera-Intrinsiken (render_<id>.npz)"),
robot_json: Optional[UploadFile] = File(default=None, description="robot.json (überschreibt Server-Konfig)"),
):
"""Pose-Schätzung aus Kamerabildern.
Multipart-Upload:
images[] — render_a.png, render_b.png, ...
intrinsics[] — render_a.npz, render_b.npz, ... (gleiche Reihenfolge)
robot_json — optional, überschreibt die Server-Konfiguration
"""
with tempfile.TemporaryDirectory(prefix="approbot_req_") as tmp:
tmp_path = Path(tmp)
if robot_json is not None:
rj_path = tmp_path / "robot.json"
rj_path.write_bytes(await robot_json.read())
elif _robot_json and _robot_json.exists():
rj_path = _robot_json
else:
raise HTTPException(400, "Keine robot.json angegeben (weder Upload noch Server-Konfig)")
for img in images:
(tmp_path / img.filename).write_bytes(await img.read())
for npz in intrinsics:
(tmp_path / npz.filename).write_bytes(await npz.read())
try:
result = estimate_from_dir(tmp_path, robot_json=rj_path, eval_dir=tmp_path)
except FileNotFoundError as exc:
raise HTTPException(400, str(exc))
except Exception as exc:
raise HTTPException(500, str(exc))
return JSONResponse({
"joints": result.joints,
"confidence": result.confidence,
"residual_rms": result.residual_rms,
"n_markers": result.n_markers,
"processing_ms": result.processing_ms,
})

180
scripts/pipeline.py Normal file
View File

@@ -0,0 +1,180 @@
"""
pipeline.py
===========
Orchestrator — ruft die Pipeline-Schritte als Subprocess auf und gibt
ein strukturiertes PipelineResult zurück.
Schritte:
1 ArUco-Detektion (scripts/1_detect_aruco_observations.py)
2 Kamera-Posen (scripts/2_estimate_camera_from_observations.py)
3 Multi-View Triangulation (scripts/3_multiview_bundle_adjustment_v4.py)
3b Eck-Marker-Posen (scripts/3b_corner_marker_poses.py)
4 Pose-Estimation (scripts/pose_estimation.py)
"""
from __future__ import annotations
import glob
import json
import os
import re
import subprocess
import sys
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional
SCRIPTS = Path(__file__).parent / "pipeline"
PY = sys.executable
@dataclass
class PipelineResult:
"""Ergebnis der Pose-Schätzung."""
joints: Dict[str, float] # x,y,z,a,b,c,e → Wert (mm oder °)
confidence: Dict[str, str] # x,y,z,a,b,c,e → high|medium|low|none
n_markers: int # Anzahl triangulierter Marker
residual_rms: float # Residuum der Schätzung (mm)
processing_ms: float # Laufzeit der Pipeline
robot_state_path: Optional[Path] = None # Pfad zur erzeugten robot_state.json
errors: List[str] = field(default_factory=list)
def _run(cmd: list, step: str) -> None:
"""Subprocess-Aufruf mit Fehlerbehandlung."""
r = subprocess.run(cmd, capture_output=True, text=True)
if r.returncode != 0:
raise RuntimeError(f"[{step}] exit {r.returncode}:\n{r.stderr.strip()[-800:]}")
def _cam_id(path: str) -> Optional[str]:
m = re.match(r"render_([A-Za-z0-9]+)\.(png|jpg|jpeg)", os.path.basename(path), re.I)
return m.group(1) if m else None
def estimate_from_dir(
image_dir: str | Path,
robot_json: str | Path | None = None,
eval_dir: str | Path | None = None,
lambda_weight: float = 100.0,
camera_filter: Optional[List[str]] = None,
) -> PipelineResult:
"""
Pose-Schätzung aus einem Bildordner.
Parameters
----------
image_dir Ordner mit render_*.png und render_*.npz
robot_json Pfad zu robot.json (Default: ROBOT_JSON env oder Exception)
eval_dir Ausgabeordner (Default: temporäres Verzeichnis)
lambda_weight Constraint-Gewicht für Bundle Adjustment
camera_filter Liste von Kamera-IDs; None = alle
Returns
-------
PipelineResult
"""
t0 = time.time()
image_dir = Path(image_dir).resolve()
# robot.json bestimmen
if robot_json is None:
robot_json = os.environ.get("ROBOT_JSON")
if not robot_json:
raise ValueError("robot_json muss angegeben werden oder ROBOT_JSON env gesetzt sein")
robot_json = Path(robot_json).resolve()
# Ausgabeordner
_tmp = None
if eval_dir is None:
_tmp = tempfile.mkdtemp(prefix="approbot_")
eval_dir = Path(_tmp)
else:
eval_dir = Path(eval_dir).resolve()
eval_dir.mkdir(parents=True, exist_ok=True)
errors = []
try:
# Bilder sammeln
imgs = sorted(
glob.glob(str(image_dir / "render_*.png")) +
glob.glob(str(image_dir / "render_*.PNG")) +
glob.glob(str(image_dir / "render_*.jpg")) +
glob.glob(str(image_dir / "render_*.jpeg"))
)
if camera_filter:
imgs = [i for i in imgs if _cam_id(i) in set(camera_filter)]
if not imgs:
raise FileNotFoundError(f"Keine render_*.png/jpg in {image_dir}")
# ── Schritt 1: ArUco-Detektion ──────────────────────────────
for img in imgs:
cid = _cam_id(img)
if not cid:
continue
npz = image_dir / f"render_{cid}.npz"
if not npz.exists():
npz_candidates = list(image_dir.glob("*.npz"))
if not npz_candidates:
raise FileNotFoundError(f"Keine .npz-Intrinsik in {image_dir}")
npz = npz_candidates[0]
_run([PY, str(SCRIPTS / "1_detect_aruco_observations.py"),
"-i", str(img), "-npz", str(npz),
"-outDir", str(eval_dir), "-robot", str(robot_json), "-cameraId", cid],
"Schritt 1")
# ── Schritt 2: Kamera-Posen ──────────────────────────────────
dets = sorted(glob.glob(str(eval_dir / "*_aruco_detection.json")))
if not dets:
raise RuntimeError("Keine ArUco-Detektionen erzeugt (Schritt 1)")
for d in dets:
_run([PY, str(SCRIPTS / "2_estimate_camera_from_observations.py"),
"-i", d, "-robot", str(robot_json), "-outDir", str(eval_dir)],
"Schritt 2")
# ── Schritt 3: Multi-View Triangulation ──────────────────────
poses = sorted(glob.glob(str(eval_dir / "*_camera_pose.json")))
if not poses:
raise RuntimeError("Keine Kamera-Posen erzeugt (Schritt 2)")
det_args = sum([["-det", d] for d in dets], [])
pose_args = sum([["-pose", p] for p in poses], [])
_run([PY, str(SCRIPTS / "3_multiview_bundle_adjustment_v4.py"),
"-robot", str(robot_json), "-lambdaWeight", str(lambda_weight)]
+ det_args + pose_args, "Schritt 3")
# ── Schritt 3b: Eck-Marker-Posen ─────────────────────────────
_run([PY, str(SCRIPTS / "3b_corner_marker_poses.py"),
"--evalDir", str(eval_dir), "--robot", str(robot_json)], "Schritt 3b")
# ── Schritt 4: Pose-Estimation ────────────────────────────────
marker_poses = eval_dir / "aruco_marker_poses.json"
state_out = eval_dir / "robot_state.json"
_run([PY, str(SCRIPTS / "pose_estimation.py"),
str(marker_poses), "-robot", str(robot_json), "-out", str(state_out)],
"Schritt 4")
# Ergebnis lesen
state = json.load(open(state_out, "r", encoding="utf-8"))
mv = state.get("movements", {})
joints = {k: float(v.get("value", 0.0)) for k, v in mv.items() if isinstance(v, dict)}
confidence = {k: str(v.get("confidence", "none")) for k, v in mv.items() if isinstance(v, dict)}
return PipelineResult(
joints=joints,
confidence=confidence,
n_markers=int(state.get("num_markers", 0)),
residual_rms=float(state.get("residual_rms", 0.0)),
processing_ms=round((time.time() - t0) * 1000),
robot_state_path=state_out,
errors=errors,
)
except Exception as exc:
errors.append(str(exc))
raise
finally:
# Temporäres Verzeichnis bleibt absichtlich erhalten für Debugging;
# Aufrufer kann es über result.robot_state_path.parent aufräumen.
pass

View File

@@ -0,0 +1,608 @@
#!/usr/bin/env python3
import argparse
import json
import os
import hashlib
import time
import uuid
from typing import Dict, Any
import cv2
import numpy as np
# ------------------------------------------------------------
# Utilities
# ------------------------------------------------------------
def resolve_path(path):
path = os.path.expanduser(path)
# Absoluter Pfad → direkt verwenden
if os.path.isabs(path):
return path
# Relativer Pfad → absolut machen (auf Basis aktuellem cwd)
return os.path.abspath(path)
def load_intrinsics_npz(npz_path: str):
data = np.load(npz_path)
for k in ('camera_matrix', 'mtx', 'K'):
if k in data:
K = data[k].astype(np.float32)
break
else:
raise KeyError('Camera matrix not found in npz')
for k in ('dist_coeffs', 'dist', 'D'):
if k in data:
D = data[k].astype(np.float32).reshape(-1, 1)
break
else:
D = np.zeros((5, 1), dtype=np.float32)
return K, D
# ------------------------------------------------------------
def load_robot_vision_config(robot_json_path: str):
robot_json_path = resolve_path(robot_json_path)
with open(robot_json_path, 'r', encoding='utf-8') as f:
robot = json.load(f)
vision_config = robot.get('vision_config', {})
marker_type = vision_config.get('MarkerType', 'DICT_4X4_250')
marker_size = float(vision_config.get('MarkerSize', 0.025))
return {
'MarkerType': marker_type,
'MarkerSize': marker_size
}
# ------------------------------------------------------------
def get_aruco_detector(dict_name: str):
mapping = {
'DICT_4X4_250': cv2.aruco.DICT_4X4_250,
'DICT_5X5_100': cv2.aruco.DICT_5X5_100,
'DICT_6X6_250': cv2.aruco.DICT_6X6_250,
'DICT_ARUCO_ORIGINAL': cv2.aruco.DICT_ARUCO_ORIGINAL,
}
dict_id = mapping.get(dict_name, cv2.aruco.DICT_4X4_250)
dictionary = cv2.aruco.getPredefinedDictionary(dict_id)
try:
params = cv2.aruco.DetectorParameters()
except Exception:
params = cv2.aruco.DetectorParameters_create()
try:
detector = cv2.aruco.ArucoDetector(dictionary, params)
return detector, None
except Exception:
return None, (dictionary, params)
# ------------------------------------------------------------
def detect_markers(image, detector_tuple):
detector, fallback = detector_tuple
if detector is not None:
corners, ids, rejected = detector.detectMarkers(image)
else:
dictionary, params = fallback
corners, ids, rejected = cv2.aruco.detectMarkers(
image,
dictionary,
parameters=params
)
return corners, ids, rejected
# ------------------------------------------------------------
def hash_file(path):
sha = hashlib.sha256()
with open(path, 'rb') as f:
while True:
chunk = f.read(1024 * 1024)
if not chunk:
break
sha.update(chunk)
return sha.hexdigest()
# ------------------------------------------------------------
def polygon_mask(shape, polygon):
mask = np.zeros(shape, dtype=np.uint8)
cv2.fillConvexPoly(
mask,
polygon.astype(np.int32),
255
)
return mask
# ------------------------------------------------------------
def shrink_polygon(points, scale=0.80):
center = np.mean(points, axis=0)
shrunk = center + (points - center) * scale
return shrunk.astype(np.float32)
# ------------------------------------------------------------
def compute_sharpness(gray_image, polygon):
shrunk = shrink_polygon(polygon, scale=0.80)
mask = polygon_mask(gray_image.shape, shrunk)
pixels = gray_image[mask == 255]
if pixels.size == 0:
return 0.0
temp = np.zeros_like(gray_image)
temp[mask == 255] = gray_image[mask == 255]
lap = cv2.Laplacian(temp, cv2.CV_64F)
values = lap[mask == 255]
if values.size == 0:
return 0.0
return float(values.var())
# ------------------------------------------------------------
def compute_contrast(gray_image, polygon):
shrunk = shrink_polygon(polygon, scale=0.80)
mask = polygon_mask(gray_image.shape, shrunk)
pixels = gray_image[mask == 255]
if pixels.size == 0:
return {
'p05': 0.0,
'p95': 0.0,
'dynamic_range': 0.0,
'mean_gray': 0.0,
'std_gray': 0.0
}
p05 = float(np.percentile(pixels, 5))
p95 = float(np.percentile(pixels, 95))
return {
'p05': p05,
'p95': p95,
'dynamic_range': float(p95 - p05),
'mean_gray': float(np.mean(pixels)),
'std_gray': float(np.std(pixels))
}
# ------------------------------------------------------------
def compute_edge_ratio(corners):
edge_lengths = []
for k in range(4):
p1 = corners[k]
p2 = corners[(k + 1) % 4]
edge_lengths.append(
float(np.linalg.norm(p1 - p2))
)
edge_ratio = (
max(edge_lengths) /
max(1e-6, min(edge_lengths))
)
return edge_ratio, edge_lengths
# ------------------------------------------------------------
def compute_geometry_metrics(center, corners, width, height):
image_center = np.array(
[width / 2.0, height / 2.0],
dtype=np.float32
)
dist_center = np.linalg.norm(center - image_center)
max_dist = np.linalg.norm(image_center)
distance_center_norm = float(
dist_center / max(1e-6, max_dist)
)
min_x = np.min(corners[:, 0])
max_x = np.max(corners[:, 0])
min_y = np.min(corners[:, 1])
max_y = np.max(corners[:, 1])
border_distance_px = float(min(
min_x,
min_y,
width - max_x,
height - max_y
))
return {
'distance_to_center_norm': distance_center_norm,
'distance_to_border_px': border_distance_px
}
# ------------------------------------------------------------
def compute_confidence(
area_px,
sharpness,
edge_ratio,
dynamic_range,
border_distance_px
):
score = 1.0
# area
score *= min(1.0, area_px / 1500.0)
# sharpness
score *= min(1.0, sharpness / 120.0)
# edge distortion
score *= 1.0 / max(1.0, edge_ratio)
# contrast
score *= min(1.0, dynamic_range / 80.0)
# border distance
score *= min(1.0, max(0.0, border_distance_px) / 50.0)
score = max(0.0, min(1.0, score))
return float(score)
# ------------------------------------------------------------
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'-i',
'--image',
required=True
)
parser.add_argument(
'-npz',
'--intrinsics',
required=True
)
parser.add_argument(
'-robot',
'--robot',
required=True
)
parser.add_argument(
'-cameraId',
'--cameraId',
required=True,
type=str
)
parser.add_argument(
'-outDir',
'--outDir',
required=True
)
args = parser.parse_args()
out_dir = resolve_path(args.outDir)
os.makedirs(out_dir, exist_ok=True)
# --------------------------------------------------------
# Load robot vision config
# --------------------------------------------------------
vision_config = load_robot_vision_config(args.robot)
marker_type = vision_config['MarkerType']
marker_size = vision_config['MarkerSize']
# --------------------------------------------------------
# Load image
# --------------------------------------------------------
image_path = resolve_path(args.image)
image = cv2.imread(image_path)
if image is None:
raise RuntimeError(f'Cannot read image: {args.image}')
gray = cv2.cvtColor(
image,
cv2.COLOR_BGR2GRAY
)
height, width = gray.shape[:2]
# --------------------------------------------------------
# Intrinsics
# --------------------------------------------------------
intrinsics_path = resolve_path(args.intrinsics)
K, D = load_intrinsics_npz(intrinsics_path)
# --------------------------------------------------------
# Detection
# --------------------------------------------------------
detector_tuple = get_aruco_detector(marker_type)
corners_list, ids, rejected = detect_markers(
gray,
detector_tuple
)
detections = []
# --------------------------------------------------------
# Valid detections
# --------------------------------------------------------
if ids is not None:
ids = ids.flatten().tolist()
for i, marker_id in enumerate(ids):
corners = corners_list[i].reshape((4, 2)).astype(np.float32)
center = corners.mean(axis=0)
area_px = float(
cv2.contourArea(corners)
)
perimeter_px = float(
cv2.arcLength(corners, True)
)
edge_ratio, edge_lengths = compute_edge_ratio(corners)
sharpness = compute_sharpness(
gray,
corners
)
contrast = compute_contrast(
gray,
corners
)
geometry = compute_geometry_metrics(
center,
corners,
width,
height
)
confidence = compute_confidence(
area_px=area_px,
sharpness=sharpness,
edge_ratio=edge_ratio,
dynamic_range=contrast['dynamic_range'],
border_distance_px=geometry['distance_to_border_px']
)
detection = {
'observation_id': str(uuid.uuid4()),
'type': 'aruco',
'marker_id': int(marker_id),
'marker_size_m': marker_size,
'image_points_px': corners.tolist(),
'center_px': center.tolist(),
'quality': {
'area_px': area_px,
'perimeter_px': perimeter_px,
'sharpness': {
'laplacian_var': sharpness
},
'contrast': contrast,
'geometry': geometry,
'edge_ratio': edge_ratio,
'edge_lengths_px': edge_lengths
},
'confidence': confidence
}
detections.append(detection)
# --------------------------------------------------------
# Rejected candidates
# --------------------------------------------------------
rejected_candidates = []
if rejected is not None:
for candidate in rejected:
pts = candidate.reshape((-1, 2)).astype(np.float32)
center = pts.mean(axis=0)
area_px = float(
cv2.contourArea(pts)
)
rejected_candidates.append({
'image_points_px': pts.tolist(),
'center_px': center.tolist(),
'area_px': area_px
})
# --------------------------------------------------------
# Final output
# --------------------------------------------------------
output = {
'schema_version': '1.0',
'created_utc': time.strftime(
'%Y-%m-%dT%H:%M:%SZ',
time.gmtime()
),
'vision_config': {
'MarkerType': marker_type,
'MarkerSize': marker_size
},
'camera': {
'camera_id': args.cameraId,
'intrinsics_file': os.path.abspath(args.intrinsics),
'camera_matrix': K.tolist(),
'distortion_coefficients': D.reshape(-1).tolist()
},
'image': {
'image_file': os.path.abspath(args.image),
'image_sha256': hash_file(args.image),
'width_px': int(width),
'height_px': int(height)
},
'aruco': {
'dictionary': marker_type,
'num_detected_markers': len(detections),
'num_rejected_candidates': len(rejected_candidates)
},
'detections': detections,
'rejected_candidates': rejected_candidates
}
# --------------------------------------------------------
# Output path
# --------------------------------------------------------
input_filename = os.path.basename(args.image)
input_base = os.path.splitext(input_filename)[0]
out_json = os.path.join(
out_dir,
f'{input_base}_aruco_detection.json'
)
# --------------------------------------------------------
# Save JSON
# --------------------------------------------------------
with open(out_json, 'w', encoding='utf-8') as f:
json.dump(
output,
f,
indent=2
)
print(f'Saved: {out_json}')
# ------------------------------------------------------------
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,834 @@
#!/usr/bin/env python3
"""
2_estimate_camera_from_observations.py
Estimate a single camera pose from ArUco observations stored in
*_aruco_detection.json, using marker world positions from robot.json.
This follows the same mathematical idea as readTwoImages.py:
1) use detected marker observations,
2) get an initial pose from a rigid transform,
3) refine with Levenberg-Marquardt on normalized reprojection residuals.
Difference to readTwoImages.py:
- No image processing here.
- Input is the observation JSON created by 1_detect_aruco_observations.py.
- Output is xxx_camera_pose.json.
- Unknown marker reconstruction is intentionally omitted.
Assumptions:
- robot.json contains a marker list for the board/world frame.
- At minimum, marker positions are present for the reference markers.
- The detection JSON contains camera intrinsics and marker corners.
Typical usage:
python3 2_estimate_camera_from_observations.py \
-i frame_0001_aruco_detection.json \
-robot robot.json \
-outDir results/
Output:
frame_0001_camera_pose.json
Notes on uncertainty:
- The script computes an approximate 6x6 covariance for the pose parameters
[rvec_x, rvec_y, rvec_z, t_x, t_y, t_z].
- It also propagates that covariance to camera center uncertainty in world
coordinates and to approximate roll/pitch/yaw uncertainty.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from typing import Any, Dict, List, Optional, Tuple
import cv2
import numpy as np
# ---------------------------------------------------------------------
# Path / JSON helpers
# ---------------------------------------------------------------------
def resolve_path(path: str) -> str:
path = os.path.expanduser(path)
if os.path.isabs(path):
return path
return os.path.abspath(path)
def load_json(path: str) -> Dict[str, Any]:
with open(resolve_path(path), "r", encoding="utf-8") as f:
return json.load(f)
def save_json(path: str, data: Dict[str, Any]) -> None:
with open(resolve_path(path), "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
# ---------------------------------------------------------------------
# Intrinsics
# ---------------------------------------------------------------------
def load_intrinsics_from_detection(detection: Dict[str, Any]) -> Tuple[np.ndarray, np.ndarray]:
"""
Primary source: the embedded camera intrinsics in the detection JSON.
"""
camera = detection.get("camera", {})
K = camera.get("camera_matrix", None)
D = camera.get("distortion_coefficients", None)
if K is None:
raise KeyError("camera_matrix missing in detection JSON.")
if D is None:
D = [0, 0, 0, 0, 0]
K = np.array(K, dtype=np.float32).reshape(3, 3)
D = np.array(D, dtype=np.float32).reshape(-1, 1)
return K, D
# ---------------------------------------------------------------------
# Robot JSON parsing
# ---------------------------------------------------------------------
def _rotation_matrix_from_any(rotation: Any) -> np.ndarray:
"""
Best-effort parser for marker rotation.
Supported inputs:
- 3x3 matrix as nested list
- flat 9 list
- dict with keys:
* rotation_matrix / matrix
* rvec / rodriques / rodrigues
* euler_deg / rpy_deg / roll_pitch_yaw_deg
* euler_rad / rpy_rad / roll_pitch_yaw_rad
* quaternion / quat (best-effort, expects [x,y,z,w] unless specified)
- None => identity
The pose estimator below only needs marker positions, but we keep
this parser for completeness and future extension.
"""
if rotation is None:
return np.eye(3, dtype=np.float32)
# Direct matrix
if isinstance(rotation, (list, tuple, np.ndarray)):
arr = np.array(rotation, dtype=np.float32)
if arr.shape == (3, 3):
return arr
if arr.size == 9:
return arr.reshape(3, 3).astype(np.float32)
if arr.size == 3:
# Treat as Rodrigues vector
R, _ = cv2.Rodrigues(arr.reshape(3, 1))
return R.astype(np.float32)
return np.eye(3, dtype=np.float32)
if isinstance(rotation, dict):
for key in ("rotation_matrix", "matrix"):
if key in rotation:
return _rotation_matrix_from_any(rotation[key])
for key in ("rvec", "rodrigues", "rodriques"):
if key in rotation:
v = np.array(rotation[key], dtype=np.float32).reshape(3, 1)
R, _ = cv2.Rodrigues(v)
return R.astype(np.float32)
def euler_to_R(roll: float, pitch: float, yaw: float, degrees: bool = True) -> np.ndarray:
if degrees:
roll = np.deg2rad(roll)
pitch = np.deg2rad(pitch)
yaw = np.deg2rad(yaw)
cr, sr = np.cos(roll), np.sin(roll)
cp, sp = np.cos(pitch), np.sin(pitch)
cy, sy = np.cos(yaw), np.sin(yaw)
Rx = np.array([[1, 0, 0],
[0, cr, -sr],
[0, sr, cr]], dtype=np.float32)
Ry = np.array([[cp, 0, sp],
[0, 1, 0],
[-sp, 0, cp]], dtype=np.float32)
Rz = np.array([[cy, -sy, 0],
[sy, cy, 0],
[0, 0, 1]], dtype=np.float32)
# ZYX convention
return (Rz @ Ry @ Rx).astype(np.float32)
for key in ("euler_deg", "rpy_deg", "roll_pitch_yaw_deg"):
if key in rotation:
vals = np.array(rotation[key], dtype=np.float32).reshape(-1)
if vals.size == 3:
return euler_to_R(float(vals[0]), float(vals[1]), float(vals[2]), degrees=True)
for key in ("euler_rad", "rpy_rad", "roll_pitch_yaw_rad"):
if key in rotation:
vals = np.array(rotation[key], dtype=np.float32).reshape(-1)
if vals.size == 3:
return euler_to_R(float(vals[0]), float(vals[1]), float(vals[2]), degrees=False)
for key in ("quaternion", "quat"):
if key in rotation:
q = np.array(rotation[key], dtype=np.float32).reshape(-1)
if q.size == 4:
# Best-effort: try [x,y,z,w]
x, y, z, w = [float(v) for v in q]
R = np.array([
[1 - 2*y*y - 2*z*z, 2*x*y - 2*z*w, 2*x*z + 2*y*w],
[2*x*y + 2*z*w, 1 - 2*x*x - 2*z*z, 2*y*z - 2*x*w],
[2*x*z - 2*y*w, 2*y*z + 2*x*w, 1 - 2*x*x - 2*y*y]
], dtype=np.float32)
return R
return np.eye(3, dtype=np.float32)
def get_marker_rotation(marker: Dict[str, Any]) -> np.ndarray:
"""
Flexible rotation extraction. Falls back to identity if absent.
"""
for key in ("rotation", "rotation_matrix", "matrix", "pose_rotation", "orientation"):
if key in marker:
return _rotation_matrix_from_any(marker[key])
# Also allow flat pose-style fields
if "rvec" in marker or "rodrigues" in marker:
return _rotation_matrix_from_any({"rvec": marker.get("rvec", marker.get("rodrigues"))})
if "euler_deg" in marker:
return _rotation_matrix_from_any({"euler_deg": marker["euler_deg"]})
if "rpy_deg" in marker:
return _rotation_matrix_from_any({"rpy_deg": marker["rpy_deg"]})
if "quaternion" in marker:
return _rotation_matrix_from_any({"quaternion": marker["quaternion"]})
return np.eye(3, dtype=np.float32)
def load_marker_lookup(robot_json_path: str) -> Dict[int, Dict[str, Any]]:
"""
Supports the new format:
robot_data["links"]["Board"]["markers"]
Fallback:
robot_data["Marker"]
"""
robot_json_path = resolve_path(robot_json_path)
with open(robot_json_path, "r", encoding="utf-8") as f:
robot_data = json.load(f)
length_units = str(robot_data.get("units", {}).get("length", "")).strip().lower()
length_scale = 1.0
if length_units in ("mm", "millimeter", "millimeters"):
length_scale = 1.0 / 1000.0
elif length_units in ("cm", "centimeter", "centimeters"):
length_scale = 1.0 / 100.0
marker_lookup: Dict[int, Dict[str, Any]] = {}
links = robot_data.get("links", {})
board = links.get("Board")
markers = None
if board and "markers" in board:
markers = board["markers"]
if not markers:
markers = robot_data.get("Marker", [])
for marker in markers:
marker_id = int(marker.get("id", -1))
if marker_id < 0:
continue
if "position" not in marker:
continue
pos = marker.get("position")
if pos is None:
continue
if len(pos) != 3:
continue
rotation = get_marker_rotation(marker)
marker_lookup[marker_id] = {
"position": np.array(pos, dtype=np.float32) * np.float32(length_scale),
"rotation": rotation,
"on": marker.get("on", "unknown"),
}
return marker_lookup
def load_robot_marker_size(robot_json_path: str) -> Optional[float]:
"""
Best-effort marker size reader from robot.json.
Returns meters if found, otherwise None.
"""
robot_json_path = resolve_path(robot_json_path)
with open(robot_json_path, "r", encoding="utf-8") as f:
robot_data = json.load(f)
vision_config = robot_data.get("vision_config", {})
size = vision_config.get("MarkerSize", None)
if size is None:
return None
try:
return float(size)
except Exception:
return None
# ---------------------------------------------------------------------
# Geometry / pose helpers
# ---------------------------------------------------------------------
def marker_local_corners(marker_size_m: float) -> np.ndarray:
half = marker_size_m / 2.0
# Same corner order as the readTwoImages.py example
return np.array([
[-half, half, 0.0],
[ half, half, 0.0],
[ half, -half, 0.0],
[-half, -half, 0.0],
], dtype=np.float32)
def rigid_transform_no_scale(A: np.ndarray, B: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
Find R, t such that B ≈ R A + t.
A, B: Nx3
"""
assert A.shape == B.shape and A.shape[1] == 3, "A and B must be Nx3"
N = A.shape[0]
if N < 2:
raise ValueError("Need at least 2 points; 3+ recommended.")
centroid_A = A.mean(axis=0)
centroid_B = B.mean(axis=0)
AA = A - centroid_A
BB = B - centroid_B
H = AA.T @ BB
U, S, Vt = np.linalg.svd(H)
R = Vt.T @ U.T
if np.linalg.det(R) < 0:
Vt[-1, :] *= -1
R = Vt.T @ U.T
t = centroid_B - R @ centroid_A
return R.astype(np.float32), t.astype(np.float32)
def undistort_to_normalized(points_px: np.ndarray, K: np.ndarray, D: np.ndarray) -> np.ndarray:
pts = points_px.reshape(-1, 1, 2).astype(np.float32)
und = cv2.undistortPoints(pts, K, D, P=None)
return und.reshape(-1, 2).astype(np.float32)
def rvec_to_R(rvec: np.ndarray) -> np.ndarray:
R, _ = cv2.Rodrigues(rvec.reshape(3, 1))
return R.astype(np.float32)
def R_to_euler_zyx(R: np.ndarray) -> Tuple[float, float, float]:
"""
Return roll, pitch, yaw in degrees using ZYX convention.
"""
yaw = float(np.degrees(np.arctan2(R[1, 0], R[0, 0])))
sp = np.sqrt(R[2, 1] ** 2 + R[2, 2] ** 2)
pitch = float(np.degrees(np.arctan2(-R[2, 0], sp)))
roll = float(np.degrees(np.arctan2(R[2, 1], R[2, 2])))
return roll, pitch, yaw
def theta_to_camera_pose(theta: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
theta = [omega_x, omega_y, omega_z, t_x, t_y, t_z]
Returns:
R_wc, t_wc, camera_center_world
"""
omega = theta[0:3]
t_wc = theta[3:6].reshape(3, 1).astype(np.float32)
R_wc, _ = cv2.Rodrigues(omega.reshape(3, 1))
R_wc = R_wc.astype(np.float32)
R_cw = R_wc.T
camera_center_world = (-R_cw @ t_wc).reshape(3)
return R_wc, t_wc.reshape(3), camera_center_world
def build_projection_matrix(K: np.ndarray, R: np.ndarray, t: np.ndarray) -> np.ndarray:
return K @ np.hstack([R, t.reshape(3, 1)])
# ---------------------------------------------------------------------
# LM on normalized residuals (same style as readTwoImages.py)
# ---------------------------------------------------------------------
def pack_params(omega: np.ndarray, t: np.ndarray) -> np.ndarray:
return np.hstack([omega.reshape(3), t.reshape(3)]).astype(np.float64)
def unpack_params(theta: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
omega = theta[0:3]
t = theta[3:6]
return omega, t
def residuals_centers_normalized(theta: np.ndarray,
X_world: np.ndarray,
obs_norm: np.ndarray) -> np.ndarray:
"""
Residuals in normalized coordinates:
obs_norm - project(R X_world + t)
"""
omega, t = unpack_params(theta)
R_wc = cv2.Rodrigues(omega.reshape(3, 1))[0].astype(np.float64)
X_cam = (R_wc @ X_world.T + t.reshape(3, 1)).T
uv = X_cam[:, :2] / X_cam[:, 2:3]
r = (obs_norm - uv).reshape(-1)
return r
def numerical_jacobian(f, theta: np.ndarray, eps: float, *args) -> Tuple[np.ndarray, np.ndarray]:
r0 = f(theta, *args)
m = r0.size
n = theta.size
J = np.zeros((m, n), dtype=np.float64)
for k in range(n):
th = theta.copy()
th[k] += eps
rk = f(th, *args)
J[:, k] = (rk - r0) / eps
return J, r0
def lm_solve(theta0: np.ndarray,
X_world: np.ndarray,
obs_norm: np.ndarray,
max_iter: int = 60,
eps_jac: float = 1e-6,
lambda_init: float = 1e-3) -> Tuple[np.ndarray, Dict[str, List[float]]]:
lam = lambda_init
theta = theta0.copy().astype(np.float64)
history = {"iters": [], "rms": [], "lambda": []}
for it in range(max_iter):
J, r = numerical_jacobian(residuals_centers_normalized, theta, eps_jac, X_world, obs_norm)
rms = float(np.sqrt(np.mean(r * r))) if r.size else 0.0
history["iters"].append(it)
history["rms"].append(rms)
history["lambda"].append(lam)
JTJ = J.T @ J
g = J.T @ r
H = JTJ + lam * np.eye(JTJ.shape[0], dtype=np.float64)
try:
delta = -np.linalg.solve(H, g)
except np.linalg.LinAlgError:
delta, *_ = np.linalg.lstsq(H, -g, rcond=None)
theta_trial = theta + delta
r_trial = residuals_centers_normalized(theta_trial, X_world, obs_norm)
rms_trial = float(np.sqrt(np.mean(r_trial * r_trial))) if r_trial.size else rms
if rms_trial < rms:
theta = theta_trial
lam *= 0.5
else:
lam *= 2.0
if np.linalg.norm(delta) < 1e-10:
break
if abs(rms - rms_trial) < 1e-12:
break
return theta, history
def pose_covariance(theta: np.ndarray,
X_world: np.ndarray,
obs_norm: np.ndarray,
eps_jac: float = 1e-6) -> Tuple[np.ndarray, float, np.ndarray]:
"""
Returns:
cov_theta_6x6, sigma2, residual_vector
"""
J, r = numerical_jacobian(residuals_centers_normalized, theta, eps_jac, X_world, obs_norm)
m = r.size
n = theta.size
dof = max(1, m - n)
sigma2 = float((r @ r) / dof)
JTJ = J.T @ J
cov = sigma2 * np.linalg.pinv(JTJ)
return cov.astype(np.float64), sigma2, r
def propagate_covariance(theta: np.ndarray,
cov_theta: np.ndarray) -> Dict[str, Any]:
"""
Propagate pose covariance to camera center and Euler angle uncertainties.
"""
def camera_center_fn(th: np.ndarray) -> np.ndarray:
_, _, c = theta_to_camera_pose(th)
return c.astype(np.float64)
def euler_fn(th: np.ndarray) -> np.ndarray:
R_wc, _, _ = theta_to_camera_pose(th)
return np.array(R_to_euler_zyx(R_wc), dtype=np.float64) # deg
Jc, _ = numerical_jacobian(lambda th, *_: camera_center_fn(th), theta, 1e-6)
cov_center = Jc @ cov_theta @ Jc.T
Je, _ = numerical_jacobian(lambda th, *_: euler_fn(th), theta, 1e-6)
cov_euler = Je @ cov_theta @ Je.T
center_std_m = np.sqrt(np.maximum(0.0, np.diag(cov_center)))
euler_std_deg = np.sqrt(np.maximum(0.0, np.diag(cov_euler)))
# Parameter std directly from covariance
param_std = np.sqrt(np.maximum(0.0, np.diag(cov_theta)))
rvec_std_deg = np.degrees(param_std[0:3])
tvec_std_m = param_std[3:6]
return {
"pose_covariance_6x6": cov_theta.tolist(),
"parameter_std": {
"rvec_std_deg": [float(x) for x in rvec_std_deg],
"tvec_std_m": [float(x) for x in tvec_std_m],
},
"camera_center_std_m": [float(x) for x in center_std_m],
"camera_center_std_mm": [float(x * 1000.0) for x in center_std_m],
"orientation_std_deg": {
"roll": float(euler_std_deg[0]),
"pitch": float(euler_std_deg[1]),
"yaw": float(euler_std_deg[2]),
},
}
# ---------------------------------------------------------------------
# Marker processing
# ---------------------------------------------------------------------
def build_object_corners_from_world_position(position_m: np.ndarray,
marker_size_m: float) -> np.ndarray:
"""
Marker corners in world coordinates, assuming the marker frame is aligned
with the world frame and only translated to 'position_m'.
This is the direct analogue of readTwoImages.py using marker center positions.
"""
h = marker_size_m / 2.0
local = np.array([
[-h, h, 0.0],
[ h, h, 0.0],
[ h, -h, 0.0],
[-h, -h, 0.0],
], dtype=np.float32)
return local + position_m.reshape(1, 3)
def solve_single_marker_pose(corners_px: np.ndarray,
K: np.ndarray,
D: np.ndarray,
marker_size_m: float) -> Optional[Tuple[np.ndarray, np.ndarray]]:
obj = marker_local_corners(marker_size_m)
success, rvec, tvec = cv2.solvePnP(
obj,
corners_px.astype(np.float32),
K,
D,
flags=cv2.SOLVEPNP_IPPE_SQUARE
)
if not success:
success, rvec, tvec = cv2.solvePnP(
obj,
corners_px.astype(np.float32),
K,
D,
flags=cv2.SOLVEPNP_ITERATIVE
)
if not success:
return None
return rvec.reshape(3), tvec.reshape(3)
# ---------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="Estimate camera pose from ArUco observation JSON")
parser.add_argument("-i", "--input", required=True, help="*_aruco_detection.json")
parser.add_argument("-robot", "--robot", required=True, help="robot.json with board markers")
parser.add_argument("-outDir", "--outDir", default=None, help="Optional output directory")
parser.add_argument("--minConfidence", type=float, default=0.0,
help="Skip detections below this confidence")
parser.add_argument("--minCommonMarkers", type=int, default=3,
help="Minimum number of world markers required")
parser.add_argument("--maxRmsPx", type=float, default=None,
help="Optional soft warning threshold for final reprojection RMS in pixels")
parser.add_argument("--epsJac", type=float, default=1e-6, help="Finite-difference epsilon")
args = parser.parse_args()
detection_path = resolve_path(args.input)
robot_path = resolve_path(args.robot)
detection = load_json(detection_path)
marker_lookup = load_marker_lookup(robot_path)
K, D = load_intrinsics_from_detection(detection)
robot_marker_size = load_robot_marker_size(robot_path)
det_marker_size = detection.get("vision_config", {}).get("MarkerSize", None)
if det_marker_size is not None:
marker_size_m = float(det_marker_size)
elif robot_marker_size is not None:
marker_size_m = float(robot_marker_size)
else:
marker_size_m = 0.025
detections = detection.get("detections", [])
if not isinstance(detections, list):
raise TypeError("detection['detections'] must be a list")
used_ids: List[int] = []
used_world_positions: List[np.ndarray] = []
used_obs_centers_px: List[np.ndarray] = []
used_obs_centers_norm: List[np.ndarray] = []
used_marker_cam_centers: List[np.ndarray] = []
used_marker_meta: List[Dict[str, Any]] = []
sanity_notes: List[str] = []
for det in detections:
if det.get("type", "aruco") != "aruco":
continue
marker_id = int(det.get("marker_id", -1))
if marker_id < 0:
continue
if marker_id not in marker_lookup:
continue
confidence = float(det.get("confidence", 1.0))
if confidence < args.minConfidence:
continue
corners = det.get("image_points_px", None)
if corners is None:
continue
corners_px = np.array(corners, dtype=np.float32).reshape(4, 2)
center_from_corners = corners_px.mean(axis=0)
center_px = np.array(det.get("center_px", center_from_corners), dtype=np.float32).reshape(2)
center_delta = float(np.linalg.norm(center_from_corners - center_px))
if center_delta > 0.75:
sanity_notes.append(
f"marker {marker_id}: center_px differs from corner-mean by {center_delta:.2f}px"
)
pnp = solve_single_marker_pose(corners_px, K, D, marker_size_m)
if pnp is None:
continue
rvec_m, tvec_m = pnp
world_pos = marker_lookup[marker_id]["position"].astype(np.float32)
used_ids.append(marker_id)
used_world_positions.append(world_pos)
used_obs_centers_px.append(center_from_corners.astype(np.float32))
used_obs_centers_norm.append(undistort_to_normalized(center_from_corners.reshape(1, 2), K, D)[0])
used_marker_cam_centers.append(tvec_m.astype(np.float32))
used_marker_meta.append({
"marker_id": marker_id,
"confidence": confidence,
"center_px": [float(center_from_corners[0]), float(center_from_corners[1])],
"marker_size_m": marker_size_m,
})
# Unique / deduplicate by marker_id while preserving order
dedup: Dict[int, int] = {}
uniq_ids: List[int] = []
uniq_world_positions: List[np.ndarray] = []
uniq_obs_px: List[np.ndarray] = []
uniq_obs_norm: List[np.ndarray] = []
uniq_cam_centers: List[np.ndarray] = []
uniq_meta: List[Dict[str, Any]] = []
for idx, mid in enumerate(used_ids):
if mid in dedup:
continue
dedup[mid] = idx
uniq_ids.append(mid)
uniq_world_positions.append(used_world_positions[idx])
uniq_obs_px.append(used_obs_centers_px[idx])
uniq_obs_norm.append(used_obs_centers_norm[idx])
uniq_cam_centers.append(used_marker_cam_centers[idx])
uniq_meta.append(used_marker_meta[idx])
if len(uniq_ids) < args.minCommonMarkers:
raise RuntimeError(
f"Need at least {args.minCommonMarkers} common markers; found {len(uniq_ids)}: {uniq_ids}"
)
X_world = np.stack(uniq_world_positions, axis=0).astype(np.float64)
obs_px = np.stack(uniq_obs_px, axis=0).astype(np.float64)
obs_norm = np.stack(uniq_obs_norm, axis=0).astype(np.float64)
marker_cam_centers = np.stack(uniq_cam_centers, axis=0).astype(np.float64)
# Initial pose from rigid transform of per-marker camera-frame centers to world positions
# B ≈ R A + t -> world = R * camera + t
R_cw_init, t_cw_init = rigid_transform_no_scale(marker_cam_centers, X_world)
R_wc_init = R_cw_init.T
t_wc_init = -(R_wc_init @ t_cw_init).reshape(3)
omega_init = cv2.Rodrigues(R_wc_init)[0].reshape(3)
theta0 = pack_params(omega_init, t_wc_init)
theta_opt, hist = lm_solve(
theta0=theta0,
X_world=X_world,
obs_norm=obs_norm,
max_iter=60,
eps_jac=args.epsJac,
lambda_init=1e-3,
)
R_wc, t_wc, camera_center_world = theta_to_camera_pose(theta_opt)
cov_theta, sigma2, residual_vec = pose_covariance(
theta_opt, X_world, obs_norm, eps_jac=args.epsJac
)
propagated = propagate_covariance(theta_opt, cov_theta)
# Exact pixel-space reprojection statistics
proj_pts, _ = cv2.projectPoints(
X_world.reshape(-1, 1, 3).astype(np.float32),
theta_opt[0:3].reshape(3, 1).astype(np.float32),
theta_opt[3:6].reshape(3, 1).astype(np.float32),
K,
D,
)
proj_pts = proj_pts.reshape(-1, 2)
reproj_err_px = np.linalg.norm(proj_pts - obs_px, axis=1)
rms_px = float(np.sqrt(np.mean(reproj_err_px ** 2))) if reproj_err_px.size else 0.0
median_px = float(np.median(reproj_err_px)) if reproj_err_px.size else 0.0
max_px = float(np.max(reproj_err_px)) if reproj_err_px.size else 0.0
if args.maxRmsPx is not None and rms_px > args.maxRmsPx:
print(f"[WARN] Final reprojection RMS is {rms_px:.3f}px (threshold {args.maxRmsPx:.3f}px).")
# Convert outputs
roll, pitch, yaw = R_to_euler_zyx(R_wc)
position_mm = (camera_center_world * 1000.0).astype(float).tolist()
# Reproject each used marker center for QA
per_marker_results = []
proj_pts_exact, _ = cv2.projectPoints(
X_world.reshape(-1, 1, 3).astype(np.float32),
theta_opt[0:3].reshape(3, 1).astype(np.float32),
theta_opt[3:6].reshape(3, 1).astype(np.float32),
K,
D,
)
proj_pts_exact = proj_pts_exact.reshape(-1, 2)
for idx, mid in enumerate(uniq_ids):
x = proj_pts_exact[idx]
err = float(np.linalg.norm(x - obs_px[idx]))
per_marker_results.append({
"marker_id": int(mid),
"observed_center_px": [float(obs_px[idx, 0]), float(obs_px[idx, 1])],
"projected_center_px": [float(x[0]), float(x[1])],
"reprojection_error_px": err,
"confidence": float(uniq_meta[idx]["confidence"]),
})
# Output directory
in_base = os.path.splitext(os.path.basename(detection_path))[0]
out_name = in_base.replace("_aruco_detection", "_camera_pose") + ".json"
if args.outDir is not None:
out_dir = resolve_path(args.outDir)
else:
out_dir = os.path.dirname(detection_path) or "."
os.makedirs(out_dir, exist_ok=True)
out_json = os.path.join(out_dir, out_name)
output = {
"schema_version": "1.0",
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"source": {
"detection_json": detection_path,
"robot_json": robot_path,
},
"camera": {
"camera_id": detection.get("camera", {}).get("camera_id", "unknown"),
"camera_matrix": K.tolist(),
"distortion_coefficients": D.reshape(-1).tolist(),
},
"estimation": {
"method": "single_camera_marker_center_lm",
"description": "Rigid init from per-marker pose estimates, followed by LM on normalized marker-center reprojection residuals.",
"marker_size_m": float(marker_size_m),
"num_used_markers": int(len(uniq_ids)),
"used_marker_ids": [int(x) for x in uniq_ids],
"history": hist,
"residual_rms_px": float(rms_px),
"residual_median_px": float(median_px),
"residual_max_px": float(max_px),
"sigma2_normalized": float(sigma2),
},
"camera_pose": {
"world_to_camera": {
"rotation_matrix": R_wc.tolist(),
"translation_m": [float(x) for x in t_wc.tolist()],
"rvec_rad": [float(x) for x in theta_opt[0:3].tolist()],
},
"camera_in_world": {
"position_m": [float(x) for x in camera_center_world.tolist()],
"position_mm": [float(x) for x in position_mm],
"orientation_deg": {
"roll": float(roll),
"pitch": float(pitch),
"yaw": float(yaw),
},
},
"uncertainty": propagated,
},
"observations": {
"markers": per_marker_results,
},
"qa": {
"sanity_notes": sanity_notes,
},
}
save_json(out_json, output)
print(f"[INFO] Saved camera pose JSON: {out_json}")
if __name__ == "__main__":
try:
main()
except Exception as exc:
print(f"[ERROR] {exc}", file=sys.stderr)
sys.exit(1)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,189 @@
#!/usr/bin/env python3
"""
3b_corner_marker_poses.py
=========================
Produktiver Pipeline-Schritt: leitet aus den 4 ArUco-Ecken jedes Markers eine
volle Marker-Pose ab (Position + gemessene Normale), statt nur den Center zu
triangulieren.
Validiert in benchmark/stage0_corner_normals.py: die aus triangulierten Ecken
abgeleitete Normale ist ~1 deg genau (Median), auch fuer Finger-Marker.
Input:
--evalDir Ordner mit render_*_aruco_detection.json + _camera_pose.json
--robot robot.json (fuer marker_id -> link Zuordnung)
Output:
<evalDir>/aruco_marker_poses.json (pro Marker: position, gemessene normal,
4 triangulierte Ecken, #Kameras, Kantenlaenge)
Das Format ist kompatibel mit robot_viewer.html (marker_id, position_m/mm, normal)
und mit 9_evaluateMarker.py (position_m), erweitert um die gemessene Orientierung.
"""
from __future__ import annotations
import argparse
import glob
import json
import os
import re
import time
from typing import Dict, List, Tuple
import numpy as np
import cv2
# ------------------------------------------------------------------
# Loading
# ------------------------------------------------------------------
def load_cameras(eval_dir: str) -> Dict[str, dict]:
cams: Dict[str, dict] = {}
for det_path in glob.glob(os.path.join(eval_dir, "*_aruco_detection.json")):
base = os.path.basename(det_path)
m = re.match(r"render_([A-Za-z0-9]+)_aruco_detection\.json", base)
if not m:
continue
cam_id = m.group(1)
pose_path = os.path.join(eval_dir, f"render_{cam_id}_camera_pose.json")
if not os.path.exists(pose_path):
print(f"[WARN] no pose for camera {cam_id}, skipping")
continue
det = json.load(open(det_path, "r", encoding="utf-8"))
pose = json.load(open(pose_path, "r", encoding="utf-8"))
K = np.array(det["camera"]["camera_matrix"], dtype=float).reshape(3, 3)
D = np.array(det["camera"]["distortion_coefficients"], dtype=float).reshape(-1, 1)
w2c = pose["camera_pose"]["world_to_camera"]
R = np.array(w2c["rotation_matrix"], dtype=float).reshape(3, 3)
t = np.array(w2c["translation_m"], dtype=float).reshape(3)
markers: Dict[int, np.ndarray] = {}
for d in det.get("detections", []):
pts = d.get("image_points_px")
if pts is not None:
markers[int(d["marker_id"])] = np.array(pts, dtype=float).reshape(4, 2)
cams[cam_id] = dict(K=K, D=D, R=R, t=t, markers=markers)
return cams
def load_marker_links(robot_path: str) -> Dict[int, str]:
robot = json.load(open(robot_path, "r", encoding="utf-8"))
out: Dict[int, str] = {}
for link_name, link in (robot.get("links", {}) or {}).items():
for mk in link.get("markers", []) or []:
mid = int(mk.get("id", -1))
if mid >= 0:
out[mid] = link_name
return out
# ------------------------------------------------------------------
# Geometry (validated in stage0)
# ------------------------------------------------------------------
def triangulate_multiview(observations) -> np.ndarray:
A = []
for K, D, R, t, uv in observations:
und = cv2.undistortPoints(np.array([[uv]], dtype=np.float32), K, D).reshape(2)
x, y = float(und[0]), float(und[1])
P = np.hstack([R, t.reshape(3, 1)])
A.append(x * P[2] - P[0])
A.append(y * P[2] - P[1])
_, _, Vt = np.linalg.svd(np.asarray(A, dtype=float))
X = Vt[-1]
return np.array([np.nan] * 3) if abs(X[3]) < 1e-12 else X[:3] / X[3]
def corner_plane_normal(corners3d: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
center = corners3d.mean(axis=0)
_, _, Vt = np.linalg.svd(corners3d - center)
n = Vt[-1]
# ArUco corners clockwise from the front: outward (camera-facing) normal,
# matching the Blender/robot.json convention, points opposite cross(e01,e02).
cross = np.cross(corners3d[1] - corners3d[0], corners3d[2] - corners3d[0])
if np.dot(n, cross) > 0:
n = -n
nn = np.linalg.norm(n)
return (n / nn if nn > 1e-12 else n), center
# ------------------------------------------------------------------
# Main
# ------------------------------------------------------------------
def main() -> None:
ap = argparse.ArgumentParser(description="Derive marker poses (position + measured normal) from ArUco corners")
ap.add_argument("--evalDir", required=True, help="folder with detection + camera_pose JSONs")
ap.add_argument("--robot", required=True, help="robot.json (for marker->link)")
ap.add_argument("--minCams", type=int, default=2, help="min cameras to triangulate a marker")
ap.add_argument("--out", default=None, help="output path (default <evalDir>/aruco_marker_poses.json)")
args = ap.parse_args()
cams = load_cameras(args.evalDir)
if len(cams) < 2:
print("[ERROR] need >=2 cameras")
return
links = load_marker_links(args.robot)
print(f"[INFO] Cameras: {sorted(cams.keys())} | marker-link entries: {len(links)}")
marker_cams: Dict[int, List[str]] = {}
for cid, cam in cams.items():
for mid in cam["markers"]:
marker_cams.setdefault(mid, []).append(cid)
markers_out = []
for mid, cam_ids in sorted(marker_cams.items()):
if len(cam_ids) < args.minCams:
continue
corners3d, ok = [], True
for ci in range(4):
obs = [(cams[c]["K"], cams[c]["D"], cams[c]["R"], cams[c]["t"], cams[c]["markers"][mid][ci])
for c in cam_ids]
X = triangulate_multiview(obs)
if not np.all(np.isfinite(X)):
ok = False
break
corners3d.append(X)
if not ok:
continue
corners3d = np.array(corners3d)
normal, center = corner_plane_normal(corners3d)
edge_mm = float(np.mean([np.linalg.norm(corners3d[(i + 1) % 4] - corners3d[i]) for i in range(4)]) * 1000.0)
markers_out.append({
"marker_id": int(mid),
"link": links.get(mid, "unknown"),
"position_m": [float(v) for v in center],
"position_mm": [float(v * 1000.0) for v in center],
"normal": [float(v) for v in normal],
"corners_m": [[float(v) for v in c] for c in corners3d],
"num_cameras": len(cam_ids),
"edge_length_mm": edge_mm,
})
# camera poses in world (for viewer frusta): centre C = -R^T t, view axis = R[2]
cameras_out = []
for cid in sorted(cams.keys()):
cam = cams[cid]
C = -cam["R"].T @ cam["t"]
cameras_out.append({
"camera_id": cid,
"position_m": [float(v) for v in C],
"position_mm": [float(v * 1000.0) for v in C],
"direction": [float(v) for v in cam["R"][2]],
})
out_path = args.out or os.path.join(args.evalDir, "aruco_marker_poses.json")
output = {
"schema_version": "1.1",
"stage": "corner_marker_poses",
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"summary": {"num_cameras": len(cams), "num_markers": len(markers_out)},
"cameras": cameras_out,
"markers": markers_out,
}
json.dump(output, open(out_path, "w", encoding="utf-8"), indent=2)
print(f"[INFO] {len(markers_out)} marker poses -> {out_path}")
if __name__ == "__main__":
main()

Binary file not shown.

View File

@@ -0,0 +1,539 @@
#!/usr/bin/env python3
"""
pose_estimation.py
==================
Estimate robot joint angles (x, y, z, a, b, c, e) from triangulated marker
poses, using the kinematic model in robot.json (via robot_fk.py).
Design
------
The estimator is parametrised over JOINT VARIABLES, not links. This handles the
tricky cases of this robot family generically:
* Links with zero own markers (Base/x, Hand/b, Palm/c) — their variable is
observable only through descendant markers.
* A variable shared by several links (FingerA & FingerB share 'e').
* Occluded middle links — global BA reconstructs them from the fingers.
Four switchable methods (robot.json -> pose_estimation.method):
sequential_vector : analytic per joint from marker-pair / normal vectors (fast)
sequential_fk : block-wise least squares along the chain (robust, 1 marker ok)
global_ba : all variables jointly, position + normal residuals, robust loss
hybrid : sequential_fk init -> global_ba refine (default, most stable)
Observation input:
marker_observation = "corner_pose" -> aruco_marker_poses.json (pos + measured normal)
marker_observation = "center_point" -> aruco_positions_*.json (pos only)
Both the engine (estimate_pose) and a CLI (main) live here.
"""
from __future__ import annotations
import argparse
import json
import math
import os
import sys
import time
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
sys.path.insert(0, str(Path(__file__).parent))
from robot_fk import RobotFK, STATE_KEYS # noqa: E402
try:
from scipy.optimize import least_squares
HAVE_SCIPY = True
except ImportError:
HAVE_SCIPY = False
# ==================================================================
# Config
# ==================================================================
DEFAULT_CFG: Dict[str, Any] = {
"method": "hybrid",
"marker_observation": "corner_pose",
"use_normals": True,
"normal_weight": 100.0,
"robust_loss": "huber",
"huber_delta_mm": 8.0,
"max_iterations": 200,
"min_cameras_per_marker": 2,
"finger_block_joints": ["b", "c", "e"],
"per_link_method": {},
}
def load_pose_cfg(robot_data: Dict[str, Any]) -> Dict[str, Any]:
cfg = dict(DEFAULT_CFG)
cfg.update(robot_data.get("pose_estimation", {}) or {})
return cfg
# ==================================================================
# Observations
# ==================================================================
def load_observations(path: str, use_normals: bool, min_cams: int = 2) -> Dict[int, Dict[str, Any]]:
"""
Load marker observations. Accepts aruco_marker_poses.json (with measured
normal + num_cameras) or aruco_positions_*.json (position only).
Returns: marker_id -> {pos_mm:(3,), normal:(3,)|None, link:str, n_cams:int}
"""
data = json.load(open(path, "r", encoding="utf-8"))
out: Dict[int, Dict[str, Any]] = {}
for m in data.get("markers", []):
mid = int(m.get("marker_id", m.get("id", -1)))
if mid < 0:
continue
n_cams = int(m.get("num_cameras", 99))
if n_cams < min_cams:
continue
if "position_mm" in m:
pos = np.array(m["position_mm"], dtype=float)
elif "position_m" in m:
pos = np.array(m["position_m"], dtype=float) * 1000.0
else:
continue
nrm = None
if use_normals and m.get("normal") is not None:
nv = np.array(m["normal"], dtype=float)
nn = np.linalg.norm(nv)
if nn > 1e-9:
nrm = nv / nn
out[mid] = {"pos_mm": pos, "normal": nrm, "link": m.get("link", "?"), "n_cams": n_cams}
return out
# ==================================================================
# Kinematic chain analysis
# ==================================================================
def analyze_chain(fk: RobotFK) -> Dict[str, Any]:
"""
Derive, generically from the FK topology:
ordered_vars : movable joint variables, root->tip order, de-duplicated
var_links : variable -> list of links it drives
link_markers : link -> [model marker ids]
blocks : sequential estimation blocks; each block groups the
zero-marker ancestor variables with the next marker-
bearing joint variable, estimated from that link's own
markers (+ siblings sharing the same variable).
"""
links = fk.links
topo = fk._topo
link_markers: Dict[str, List[int]] = {}
for ln, ld in links.items():
ids = []
for mk in ld.get("markers", []) or []:
if "id" in mk and "position" in mk:
ids.append(int(mk["id"]))
link_markers[ln] = ids
link_var: Dict[str, str] = {}
for ln, ld in links.items():
j = ld.get("jointToParent", {}) or {}
if str(j.get("type", "")).lower() in ("revolute", "linear"):
v = str(j.get("variable", "")).lower()
if v:
link_var[ln] = v
var_type: Dict[str, str] = {}
var_links: Dict[str, List[str]] = defaultdict(list)
for ln, v in link_var.items():
var_links[v].append(ln)
var_type[v] = str(links[ln].get("jointToParent", {}).get("type", "")).lower()
ordered_vars: List[str] = []
for ln in topo:
if ln in link_var and link_var[ln] not in ordered_vars:
ordered_vars.append(link_var[ln])
# ---- build blocks ----
blocks: List[Dict[str, Any]] = []
var_block: Dict[str, int] = {}
pending: List[str] = []
for ln in topo:
if ln not in link_var:
continue
v = link_var[ln]
own = link_markers.get(ln, [])
if v in var_block:
# shared variable already in a block -> add this link's markers there
if own:
blocks[var_block[v]]["markers"].extend(own)
continue
if own:
bvars = []
for x in pending + [v]:
if x not in bvars and x not in var_block:
bvars.append(x)
blocks.append({"vars": bvars, "markers": list(own), "anchor": ln})
for x in bvars:
var_block[x] = len(blocks) - 1
pending = []
else:
if v not in pending:
pending.append(v)
if pending:
blocks.append({"vars": pending, "markers": [], "anchor": None})
for x in pending:
var_block[x] = len(blocks) - 1
return {
"ordered_vars": ordered_vars,
"var_type": var_type,
"var_links": dict(var_links),
"link_markers": link_markers,
"blocks": blocks,
}
# ==================================================================
# Residuals
# ==================================================================
def model_markers(fk: RobotFK, state: Dict[str, float]) -> Dict[int, Dict[str, np.ndarray]]:
T = fk.compute(state)
return fk.all_markers_world(T) # mid -> {world_mm, normal_world, link, local_mm}
def residual_vector(state: Dict[str, float], fk: RobotFK, obs: Dict[int, Dict[str, Any]],
marker_ids: List[int], cfg: Dict[str, Any]) -> np.ndarray:
"""Position (mm) + optional normal (scaled) residuals over the given markers."""
model = model_markers(fk, state)
res: List[float] = []
w_n = float(cfg.get("normal_weight", 30.0))
use_n = bool(cfg.get("use_normals", True))
for mid in marker_ids:
if mid not in model or mid not in obs:
continue
mm = model[mid]
dp = np.asarray(mm["world_mm"], float) - obs[mid]["pos_mm"]
res.extend(dp.tolist())
if use_n and obs[mid]["normal"] is not None and "normal_world" in mm:
dn = (np.asarray(mm["normal_world"], float) - obs[mid]["normal"]) * w_n
res.extend(dn.tolist())
return np.asarray(res, dtype=float)
def _state_from_vec(var_names: List[str], vec: np.ndarray, base: Dict[str, float]) -> Dict[str, float]:
s = dict(base)
for name, val in zip(var_names, vec):
s[name] = float(val)
return s
# ==================================================================
# Method: global bundle adjustment
# ==================================================================
def estimate_global_ba(fk: RobotFK, obs: Dict[int, Dict[str, Any]], var_names: List[str],
x0: Dict[str, float], cfg: Dict[str, Any]) -> Dict[str, float]:
if not HAVE_SCIPY:
print("[WARN] scipy missing — global_ba skipped, returning init")
return dict(x0)
marker_ids = list(obs.keys())
base = {k: 0.0 for k in STATE_KEYS}
base.update(x0)
vec0 = np.array([base.get(v, 0.0) for v in var_names], dtype=float)
def fun(vec):
st = _state_from_vec(var_names, vec, base)
return residual_vector(st, fk, obs, marker_ids, cfg)
loss = cfg.get("robust_loss", "huber")
f_scale = float(cfg.get("huber_delta_mm", 8.0))
try:
sol = least_squares(fun, vec0, loss=loss, f_scale=f_scale,
max_nfev=int(cfg.get("max_iterations", 200)) * max(1, len(var_names)))
return _state_from_vec(var_names, sol.x, base)
except Exception as exc:
print(f"[WARN] global_ba failed: {exc}")
return dict(base)
# ==================================================================
# Method: sequential block-wise FK fit
# ==================================================================
def _multistart_values(vtype: str) -> List[float]:
# revolute: scan the circle to escape local minima at large angles
if vtype == "revolute":
return [0.0, 60.0, 120.0, 180.0, 240.0, 300.0]
return [0.0]
def estimate_sequential_fk(fk: RobotFK, obs: Dict[int, Dict[str, Any]], chain: Dict[str, Any],
cfg: Dict[str, Any]) -> Dict[str, float]:
"""Estimate block by block along the chain, freezing already-solved variables."""
state = {k: 0.0 for k in STATE_KEYS}
var_type = chain["var_type"]
for block in chain["blocks"]:
bvars = block["vars"]
bmarkers = [m for m in block["markers"] if m in obs]
if not bvars:
continue
if not bmarkers:
# unobservable block: leave at 0, flag later
continue
if not HAVE_SCIPY:
continue
base = dict(state)
def fun(vec, _bvars=bvars, _bm=bmarkers, _base=base):
st = _state_from_vec(_bvars, vec, _base)
return residual_vector(st, fk, obs, _bm, cfg)
# multi-start over the first revolute variable in the block
starts = [[0.0] * len(bvars)]
lead_type = var_type.get(bvars[0], "linear")
if lead_type == "revolute":
starts = []
for a0 in _multistart_values("revolute"):
s = [0.0] * len(bvars)
s[0] = a0
starts.append(s)
best, best_cost = None, float("inf")
for s0 in starts:
try:
sol = least_squares(fun, np.array(s0, dtype=float),
loss=cfg.get("robust_loss", "huber"),
f_scale=float(cfg.get("huber_delta_mm", 8.0)),
max_nfev=200 * max(1, len(bvars)))
if sol.cost < best_cost:
best_cost, best = sol.cost, sol.x
except Exception:
continue
if best is not None:
for name, val in zip(bvars, best):
state[name] = float(val)
# wrap revolute angles to (-180, 180]
for v, vt in var_type.items():
if vt == "revolute":
state[v] = (state[v] + 180.0) % 360.0 - 180.0
return state
# ==================================================================
# Method: sequential analytic vector (per revolute joint)
# ==================================================================
def estimate_sequential_vector(fk: RobotFK, obs: Dict[int, Dict[str, Any]], chain: Dict[str, Any],
cfg: Dict[str, Any]) -> Dict[str, float]:
"""
Analytic angle from marker geometry where possible. For revolute joints with
>=2 markers on the link, use the perpendicular marker-pair vector. Falls back
to the FK block solver for linear / zero-marker / single-marker cases, so it
always returns a full state (still cheaper than full sequential_fk because
well-populated joints are solved in closed form).
"""
state = {k: 0.0 for k in STATE_KEYS}
var_type = chain["var_type"]
link_markers = chain["link_markers"]
var_links = chain["var_links"]
for block in chain["blocks"]:
bvars = block["vars"]
if len(bvars) == 1 and var_type.get(bvars[0]) == "revolute":
v = bvars[0]
ln = var_links[v][0]
mids = [m for m in link_markers.get(ln, []) if m in obs]
if len(mids) >= 2:
# model vectors must be expressed in the WORLD frame at angle=0
# (the link frame is already rotated by the parents y,z,...), so
# use FK marker world positions with this joint set to 0.
state_v0 = dict(state)
state_v0[v] = 0.0
model_v0 = model_markers(fk, state_v0)
axis_world = fk.joint_axis_world(ln, state_v0)
ang = _angle_from_pairs_world(mids, model_v0, obs, axis_world)
if ang is not None:
state[v] = ang
continue
# fallback: block FK fit for this single block
_fit_single_block(fk, obs, block, var_type, cfg, state)
for v, vt in var_type.items():
if vt == "revolute":
state[v] = (state[v] + 180.0) % 360.0 - 180.0
return state
def _angle_from_pairs_world(mids: List[int], model_v0: Dict[int, Dict[str, np.ndarray]],
obs: Dict[int, Dict[str, Any]], axis_world: np.ndarray) -> Optional[float]:
from itertools import combinations
a = np.asarray(axis_world, float)
a = a / (np.linalg.norm(a) + 1e-12)
angs, ws = [], []
for i, j in combinations(mids, 2):
if i not in model_v0 or j not in model_v0:
continue
vm = np.asarray(model_v0[j]["world_mm"], float) - np.asarray(model_v0[i]["world_mm"], float) # world @ angle 0
vo = obs[j]["pos_mm"] - obs[i]["pos_mm"] # observed vector (world, mm)
vm_p = vm - np.dot(vm, a) * a
vo_p = vo - np.dot(vo, a) * a
if np.linalg.norm(vm_p) < 5 or np.linalg.norm(vo_p) < 5:
continue
ang = math.atan2(float(np.dot(a, np.cross(vm_p, vo_p))), float(np.dot(vm_p, vo_p)))
angs.append(ang)
ws.append(np.linalg.norm(vm_p) * np.linalg.norm(vo_p))
if not angs:
return None
s = sum(w * math.sin(x) for w, x in zip(ws, angs))
c = sum(w * math.cos(x) for w, x in zip(ws, angs))
return math.degrees(math.atan2(s, c))
def _fit_single_block(fk, obs, block, var_type, cfg, state):
if not HAVE_SCIPY:
return
bvars = block["vars"]
bmarkers = [m for m in block["markers"] if m in obs]
if not bvars or not bmarkers:
return
base = dict(state)
def fun(vec):
return residual_vector(_state_from_vec(bvars, vec, base), fk, obs, bmarkers, cfg)
starts = [[0.0] * len(bvars)]
if var_type.get(bvars[0]) == "revolute":
starts = [[a0] + [0.0] * (len(bvars) - 1) for a0 in _multistart_values("revolute")]
best, best_cost = None, float("inf")
for s0 in starts:
try:
sol = least_squares(fun, np.array(s0, float), loss=cfg.get("robust_loss", "huber"),
f_scale=float(cfg.get("huber_delta_mm", 8.0)), max_nfev=200 * max(1, len(bvars)))
if sol.cost < best_cost:
best_cost, best = sol.cost, sol.x
except Exception:
continue
if best is not None:
for name, val in zip(bvars, best):
state[name] = float(val)
# ==================================================================
# Dispatch
# ==================================================================
def observability(chain: Dict[str, Any], obs: Dict[int, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""
Per-variable confidence from how well its estimation block is determined.
A block groups coupled variables (e.g. b,c,e on the fingers); confidence is
driven by markers-per-variable in that block:
high : >= 2 markers per variable (well over-determined)
medium : >= 1 marker per variable
low : fewer markers than variables (under-determined — distrust!)
none : no markers at all (variable left at 0)
"""
info: Dict[str, Dict[str, Any]] = {}
for block in chain["blocks"]:
seen = [m for m in block["markers"] if m in obs]
nvars = max(1, len(block["vars"]))
ratio = len(seen) / nvars
if len(seen) == 0:
conf = "none"
elif ratio >= 2.0:
conf = "high"
elif ratio >= 1.0:
conf = "medium"
else:
conf = "low"
for v in block["vars"]:
info[v] = {"observable": len(seen) > 0, "n_markers": len(seen),
"block_vars": len(block["vars"]), "confidence": conf,
"block_anchor": block["anchor"]}
return info
def estimate_pose(fk: RobotFK, obs: Dict[int, Dict[str, Any]], cfg: Dict[str, Any]) -> Dict[str, Any]:
chain = analyze_chain(fk)
var_names = chain["ordered_vars"]
method = str(cfg.get("method", "hybrid")).lower()
obsv = observability(chain, obs)
if method == "sequential_vector":
state = estimate_sequential_vector(fk, obs, chain, cfg)
elif method == "sequential_fk":
state = estimate_sequential_fk(fk, obs, chain, cfg)
elif method == "global_ba":
init = estimate_sequential_fk(fk, obs, chain, cfg) # cheap robust init
state = estimate_global_ba(fk, obs, var_names, init, cfg)
else: # hybrid (default)
init = estimate_sequential_fk(fk, obs, chain, cfg)
state = estimate_global_ba(fk, obs, var_names, init, cfg)
# final residual stats over all observed markers
final_res = residual_vector(state, fk, obs, list(obs.keys()), cfg)
rms = float(np.sqrt(np.mean(final_res ** 2))) if final_res.size else 0.0
return {"state": state, "method": method, "observability": obsv,
"residual_rms": rms, "num_markers": len(obs)}
# ==================================================================
# CLI
# ==================================================================
def main() -> None:
ap = argparse.ArgumentParser(description="Estimate robot joint angles from marker poses")
ap.add_argument("markers", help="aruco_marker_poses.json (corner_pose) or aruco_positions_*.json (center)")
ap.add_argument("-robot", "--robot", required=True)
ap.add_argument("-out", "--out", default=None)
ap.add_argument("--method", default=None, help="override robot.json method")
args = ap.parse_args()
robot_data = json.load(open(args.robot, "r", encoding="utf-8"))
cfg = load_pose_cfg(robot_data)
if args.method:
cfg["method"] = args.method
fk = RobotFK(robot_data)
obs = load_observations(args.markers, cfg.get("use_normals", True),
int(cfg.get("min_cameras_per_marker", 2)))
print(f"[INFO] method={cfg['method']} | observed markers={len(obs)} | use_normals={cfg.get('use_normals')}")
result = estimate_pose(fk, obs, cfg)
st = result["state"]
print("\nEstimated joint values:")
for v in ["x", "y", "z", "a", "b", "c", "e"]:
ob = result["observability"].get(v, {})
unit = "mm" if v in ("x", "e") else "deg"
conf = ob.get("confidence", "?")
tag = "" if ob.get("observable", False) else " [UNOBSERVABLE -> 0]"
print(f" {v}: {st.get(v, 0.0):8.2f} {unit} (markers={ob.get('n_markers','?')}, conf={conf}){tag}")
print(f"\n[INFO] residual RMS over {result['num_markers']} markers: {result['residual_rms']:.3f}")
out = {
"schema_version": "1.0",
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"method": result["method"],
"movements": {v: {"value": st.get(v, 0.0),
"unit": "mm" if v in ("x", "e") else "deg",
"observable": result["observability"].get(v, {}).get("observable", False),
"confidence": result["observability"].get(v, {}).get("confidence", "none"),
"n_markers": result["observability"].get(v, {}).get("n_markers", 0)}
for v in ["x", "y", "z", "a", "b", "c", "e"]},
"residual_rms": result["residual_rms"],
"num_markers": result["num_markers"],
}
out_path = args.out or os.path.join(os.path.dirname(args.markers), "robot_state.json")
json.dump(out, open(out_path, "w", encoding="utf-8"), indent=2)
print(f"[INFO] wrote {out_path}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,310 @@
#!/usr/bin/env python3
"""
robot_fk.py
-----------
Minimal forward kinematics engine for the robot.json format.
Matches the Blender hierarchy used by render_robot.py exactly:
world_T_link = world_T_parent
@ Translate(mountPosition) @ Rotate_xyz(mountRotation)
@ Translate(jointOrigin) @ Rotate_xyz(joint.rotation)
@ T_motion
T_motion = Rotate(axis, value_deg) for revolute joints
Translate(axis * value_mm) for linear joints
Units throughout: millimetres, degrees.
Public API
----------
fk = RobotFK.from_file("robot.json")
transforms = fk.compute({"x": 180, "y": 86, "z": -120,
"a": -60, "b": 22, "c": 91, "e": 10})
# → dict link_name -> 4×4 np.ndarray (world frame, mm)
p_world = fk.marker_world(transforms, "Arm1", [0, -160, 35])
# → np.ndarray shape (3,), in mm
all_m = fk.all_markers_world(transforms)
# → dict marker_id -> {"world_mm", "link", "local_mm"}
# Cumulative x-offset for a link at all-zero state
# (useful for 4a: x_slider = world_x - local_x - link_x_at_zero)
x0 = fk.link_x_at_zero_state("Arm1") # → float mm
"""
from __future__ import annotations
import json
import math
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Tuple
import numpy as np
STATE_KEYS = ("x", "y", "z", "a", "b", "c", "e")
# ──────────────────────────────────────────────────────────────
# Low-level matrix helpers
# ──────────────────────────────────────────────────────────────
def _rot_axis_angle(axis: Sequence[float], angle_deg: float) -> np.ndarray:
"""3×3 rotation matrix via Rodrigues (axis need not be normalised)."""
ax = np.asarray(axis, dtype=float)
n = float(np.linalg.norm(ax))
if n < 1e-12:
return np.eye(3)
ax = ax / n
c = math.cos(math.radians(angle_deg))
s = math.sin(math.radians(angle_deg))
t = 1.0 - c
x, y, z = ax
return np.array([
[t*x*x + c, t*x*y - s*z, t*x*z + s*y],
[t*x*y + s*z, t*y*y + c, t*y*z - s*x],
[t*x*z - s*y, t*y*z + s*x, t*z*z + c ],
])
def _rot_xyz_euler(rx: float, ry: float, rz: float) -> np.ndarray:
"""XYZ Euler angles (degrees) → 3×3 — matches Blender XYZ Euler mode."""
return (_rot_axis_angle([0, 0, 1], rz)
@ _rot_axis_angle([0, 1, 0], ry)
@ _rot_axis_angle([1, 0, 0], rx))
def make_T(R: np.ndarray, t: Sequence[float]) -> np.ndarray:
"""4×4 homogeneous transform."""
T = np.eye(4)
T[:3, :3] = R
T[:3, 3] = np.asarray(t, dtype=float)
return T
def transform_point(T: np.ndarray, p: Sequence[float]) -> np.ndarray:
"""Apply 4×4 transform to a 3-D point."""
h = np.array([p[0], p[1], p[2], 1.0])
return (T @ h)[:3]
# ──────────────────────────────────────────────────────────────
# FK engine
# ──────────────────────────────────────────────────────────────
class RobotFK:
"""Forward kinematics for the robot.json format."""
def __init__(self, robot_data: Dict[str, Any]):
self.robot = robot_data
self.links: Dict[str, Any] = robot_data.get("links", {})
self._topo: List[str] = self._topological_sort()
# ── construction ─────────────────────────────────────────
@classmethod
def from_file(cls, path) -> "RobotFK":
with open(path, "r", encoding="utf-8") as f:
return cls(json.load(f))
def _topological_sort(self) -> List[str]:
parent_map = {n: d.get("parent") for n, d in self.links.items()}
visited, order = set(), []
def visit(name: str) -> None:
if name in visited:
return
visited.add(name)
p = parent_map.get(name)
if p and p in self.links:
visit(p)
order.append(name)
for name in self.links:
visit(name)
return order
# ── core computation ──────────────────────────────────────
def compute(self, joint_values: Dict[str, float]) -> Dict[str, np.ndarray]:
"""
Compute link world transforms for the given joint state.
Parameters
----------
joint_values : dict variable_name -> value
Linear joints (x, e): mm
Revolute joints (y, z, a, b, c): degrees
Returns
-------
dict link_name -> 4×4 np.ndarray (world frame, mm)
"""
state = {k: 0.0 for k in STATE_KEYS}
for k, v in joint_values.items():
if k in state:
state[k] = float(v)
transforms: Dict[str, np.ndarray] = {}
for link_name in self._topo:
d = self.links[link_name]
parent = d.get("parent")
T_parent = transforms.get(parent, np.eye(4)) if parent else np.eye(4)
# 1 · Mount (static in parent frame)
mp = d.get("mountPosition", [0, 0, 0])
mr = d.get("mountRotation", [0, 0, 0])
T_m = make_T(_rot_xyz_euler(*mr), mp)
# 2 · Joint origin (pivot point in mount frame)
ji = d.get("jointToParent", {}) or {}
jp = ji.get("origin", [0, 0, 0])
jr = ji.get("rotation", [0, 0, 0])
T_j = make_T(_rot_xyz_euler(*jr), jp)
# 3 · Motion
jtype = str(ji.get("type", "fixed")).lower()
var = str(ji.get("variable", "")).lower()
axis = np.asarray(ji.get("axis", [1, 0, 0]), dtype=float)
val = state.get(var, 0.0)
if jtype == "revolute":
T_mot = make_T(_rot_axis_angle(axis, val), [0, 0, 0])
elif jtype == "linear":
n = float(np.linalg.norm(axis))
u = axis / n if n > 1e-12 else np.array([1.0, 0, 0])
T_mot = make_T(np.eye(3), u * val)
else:
T_mot = np.eye(4)
transforms[link_name] = T_parent @ T_m @ T_j @ T_mot
return transforms
# ── marker helpers ────────────────────────────────────────
@staticmethod
def marker_world(transforms: Dict[str, np.ndarray],
link_name: str,
local_pos: Sequence[float]) -> np.ndarray:
"""Transform a local marker position → world (mm)."""
return transform_point(transforms.get(link_name, np.eye(4)), local_pos)
def all_markers_world(self,
transforms: Dict[str, np.ndarray]
) -> Dict[int, Dict[str, Any]]:
"""
Returns
-------
dict marker_id -> {world_mm, local_mm, link, normal_world}
"""
result: Dict[int, Dict[str, Any]] = {}
for lname, ldata in self.links.items():
T = transforms.get(lname, np.eye(4))
R = T[:3, :3]
for m in ldata.get("markers", []):
mid = int(m.get("id", -1))
if mid < 0 or "position" not in m:
continue
loc = np.array(m["position"], dtype=float)
nor = np.array(m.get("normal", [0, 0, 1]), dtype=float)
result[mid] = {
"world_mm": transform_point(T, loc),
"local_mm": loc,
"link": lname,
"normal_world": (R @ nor) / max(np.linalg.norm(R @ nor), 1e-12),
}
return result
# ── x-axis invariant helpers (used by 4a) ────────────────
def link_x_at_zero_state(self, link_name: str) -> float:
"""
Return the world x-coordinate of the link-frame origin
when ALL joint variables are zero.
For any link reachable via only x-axis rotations (Arm1, Ellbow, Arm2),
this value is constant regardless of the actual revolute angles.
Adding the slider value x_mm gives the true link origin x:
link_origin_world_x = x_mm + link_x_at_zero_state(link_name)
And for any marker in that link:
marker_world_x = x_mm + link_x_at_zero_state(link_name) + marker_local_x
"""
T = self.compute({k: 0.0 for k in STATE_KEYS})
return float(T[link_name][0, 3])
def joint_origin_world(self,
link_name: str,
joint_state: Dict[str, float]) -> np.ndarray:
"""
World position of the pivot that link_name rotates / slides around.
"""
d = self.links[link_name]
parent = d.get("parent")
T_all = self.compute(joint_state)
T_parent = T_all.get(parent, np.eye(4)) if parent else np.eye(4)
mp = d.get("mountPosition", [0, 0, 0])
mr = d.get("mountRotation", [0, 0, 0])
T_m = make_T(_rot_xyz_euler(*mr), mp)
ji = d.get("jointToParent", {}) or {}
jp = ji.get("origin", [0, 0, 0])
jr = ji.get("rotation", [0, 0, 0])
T_j = make_T(_rot_xyz_euler(*jr), jp)
return transform_point(T_parent @ T_m @ T_j, [0, 0, 0])
def joint_axis_world(self,
link_name: str,
joint_state: Dict[str, float]) -> np.ndarray:
"""
Joint axis of link_name expressed in world frame.
"""
d = self.links[link_name]
parent = d.get("parent")
T_all = self.compute(joint_state)
T_parent = T_all.get(parent, np.eye(4)) if parent else np.eye(4)
mp = d.get("mountPosition", [0, 0, 0])
mr = d.get("mountRotation", [0, 0, 0])
T_m = make_T(_rot_xyz_euler(*mr), mp)
ji = d.get("jointToParent", {}) or {}
jp = ji.get("origin", [0, 0, 0])
jr = ji.get("rotation", [0, 0, 0])
T_j = make_T(_rot_xyz_euler(*jr), jp)
R_to_pivot = (T_parent @ T_m @ T_j)[:3, :3]
axis_local = np.asarray(ji.get("axis", [1, 0, 0]), dtype=float)
world = R_to_pivot @ axis_local
n = float(np.linalg.norm(world))
return world / n if n > 1e-12 else world
# ── utility ───────────────────────────────────────────────
def chain(self, link_name: str) -> List[str]:
"""Return chain from root to link_name (inclusive)."""
out, cur = [], link_name
while cur:
out.append(cur)
cur = self.links.get(cur, {}).get("parent")
return list(reversed(out))
def board_marker_world_positions(self, length_scale: float = 1.0) -> Dict[int, np.ndarray]:
"""
Return known world positions for all Board markers (in mm).
Board is the root, so its marker positions ARE world positions.
length_scale: 1/1000 if robot.json uses mm.
"""
board = self.links.get("Board", {})
result: Dict[int, np.ndarray] = {}
for m in board.get("markers", []):
mid = int(m.get("id", -1))
if mid >= 0 and "position" in m:
p = np.array(m["position"], dtype=float) * length_scale
result[mid] = p
return result

View File

@@ -0,0 +1,91 @@
{
"name": "9a",
"position": {
"x": 60,
"y": -2,
"z": 95,
"a": 200,
"b": 60,
"c": 9,
"e": 8
},
"rendering": {
"width": 1440,
"height": 1080,
"dofFStop": 11
},
"camera_positions": {
"a": [
-300,
-800,
800
],
"b": [
300,
-900,
1200
],
"c": [
300,
-900,
400
],
"d": [
700,
-800,
400
],
"e": [
1200,
-900,
400
],
"f": [
500,
-300,
1400
],
"g": [
-200,
200,
1400
]
},
"camera_targets": {
"a": [
210,
-100,
180
],
"b": [
310,
-80,
180
],
"c": [
210,
-100,
150
],
"d": [
210,
-100,
150
],
"e": [
210,
-100,
50
],
"f": [
200,
-200,
180
],
"g": [
200,
-200,
180
]
}
}

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.9 MiB

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.7 MiB

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.9 MiB

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

468
tests/data/robot.json Normal file
View File

@@ -0,0 +1,468 @@
{
"coordinateSystem": {"handedness": "right", "x": "right", "y": "backward", "z": "up"},
"units": {"length": "mm", "rotation": "degree"},
"vision_config": {"MarkerType": "DICT_4X4_250", "MarkerSize": 0.025},
"renderingInfo": {
"width": 1280,
"height": 720,
"renderDefaults": {"width": 1280, "height": 720, "dofFStop": 11},
"cameraPosition__1": [-10, -800, 500],
"cameraPosition__2": [-500, 300, 1200],
"cameraPosition__3": [-200, -900, 200],
"cameraPosition__4": [1200, 200, 300],
"cameraPosition_a": [-300, -800, 500],
"cameraPosition": [-200, 200, 1400],
"cameraPosition_c": [600, -500, 600],
"cameraTarget": [200, -200, 180],
"cameraUpVector": [0, 0, 1],
"lightPosition": [-500, -500, 500],
"lightTarget": [0, 0, 0],
"lightUpVector": [0, 0, 1],
"metric": "mm",
"showSkeleton": true,
"showMarkers": true,
"backgroundColor": [0.7, 0.85, 1.0],
"backgroundStrength": 0.2,
"sunEnergy": 0.35,
"areaEnergy": 120,
"exposure": -1.5,
"lensDirt": true,
"lensDirtStrength": 0.08,
"dofEnabled": true,
"dofFStop": 11.0,
"arucoDust": true,
"arucoDustStrength": 1.6,
"markerOffsetMaxMm": 4.0,
"markerOffsetSeed": 0,
"markerRotationMaxDeg": 3,
"motionBlur": true,
"motionBlurMaxPx": 5.5,
"focalErrorPct": 0.5,
"principalErrorPx": 3.0,
"residualDistortion": [0.02, -0.01],
"localizedBlur": false,
"localizedBlurStrength": 0.15,
"vignette": true,
"vignetteStrength": 0.08,
"sensorNoise": true,
"sensorNoiseStrength": 0.01,
"lensDistortion": true,
"lensDistortionStrength": 0.002,
"materials": {
"wood": {"baseColor": [0.72, 0.52, 0.33], "roughness": 0.85, "metallic": 0.0},
"plaWhite": {"baseColor": [0.95, 0.95, 0.95], "roughness": 0.45, "metallic": 0.0},
"steel": {"baseColor": [0.72, 0.72, 0.75], "roughness": 0.25, "metallic": 1.0},
"powderCoatBlue": {"baseColor": [0.15, 0.25, 0.7], "roughness": 0.55, "metallic": 0.0},
"defaultPlastic": {"baseColor": [0.95, 0.95, 0.95], "roughness": 0.4, "metallic": 0.0},
"skeletonRed": {"baseColor": [0.85, 0.2, 0.2], "roughness": 0.35, "metallic": 0.0},
"markerBlack": {"baseColor": [0.04, 0.04, 0.04], "roughness": 0.8, "metallic": 0.0}
},
"skeletonDefaults": {"radius": 4, "color": [0.85, 0.2, 0.2]},
"markerDefaults": {"size": 25, "thickness": 1, "color": [0.04, 0.04, 0.04]},
"defaultPosition": {"x": 80, "y": 20, "z": 80, "a": -120, "b": 23, "c": 9, "e": 3}
},
"defaultPosition__": {"x": 10, "y": 4, "z": 20, "a": 10, "b": 2, "c": 9, "e": 1},
"defaultPosition": {"x": 50, "y": 4, "z": 176, "a": 20, "b": 60, "c": 9, "e": 5},
"recognized": {"x": null, "y": null, "z": null, "a": null, "b": null, "c": null, "e": null},
"constraint_rules": {
"rigid_distance": {"enabled": true, "mode": "mst", "weight": 1.0},
"joint_axis_projection": {"enabled": true, "max_pairs": 2, "weight": 0.35},
"chain_axis_projection": {"enabled": false, "max_depth": 3, "max_pairs": 2, "weight": 0.15},
"axis_alignment_threshold": 0.95
},
"observation_weighting": {"enabled": true, "distance_weight": true, "marker_size_weight": true, "view_angle_weight": true},
"multiview_calculation": {
"combine_mode": "mean",
"size_ref_px": 50.0,
"border_ref_px": 120.0,
"center_ref_norm": 0.01,
"sharpness_ref": 2500.0,
"homography_ref": 0.18,
"size_factor": 0.3,
"aspect_factor": 0.3,
"border_factor": 0.01,
"center_factor": 0.01,
"sharpness_factor": 0.5,
"homography_factor": 0.2,
"normal_visibility_factor": 0.01,
"spin_factor": 0.3,
"weight_floor": 0.3
},
"pose_estimation": {
"method": "hybrid",
"marker_observation": "corner_pose",
"use_normals": true,
"normal_weight": 100.0,
"robust_loss": "huber",
"huber_delta_mm": 8.0,
"max_iterations": 200,
"min_cameras_per_marker": 2,
"finger_block_joints": ["b", "c", "e"],
"per_link_method": {}
},
"robot_test_poses": {
"4": {"x": 70, "y": 50, "z": -70, "a": 120, "b": 50, "c": 30, "e": 20},
"5": {"x": 180, "y": 86, "z": -120, "a": -60, "b": 22, "c": 91, "e": 10},
"6": {"x": 80, "y": 20, "z": 80, "a": -120, "b": 23, "c": 9, "e": 3},
"7": {"x": 30, "y": -2, "z": 95, "a": 20, "b": 23, "c": 9, "e": 9},
"8": {"x": 50, "y": -2, "z": 95, "a": 20, "b": 60, "c": 9, "e": 3},
"9": {"x": 60, "y": -2, "z": 95, "a": 200, "b": 60, "c": 9, "e": 8},
"9a": {
"x": 60,
"y": -2,
"z": 95,
"a": 200,
"b": 60,
"c": 9,
"e": 8,
"rendering": {"width": 1440, "height": 1080, "dofFStop": 11}
},
"9b": {
"x": 60,
"y": -2,
"z": 95,
"a": 200,
"b": 60,
"c": 9,
"e": 8,
"rendering": {"width": 4896, "height": 3264, "dofFStop": 5.6}
},
"10": {"x": 120, "y": 60, "z": -110, "a": 20, "b": 30, "c": 180, "e": 4},
"11": {"x": 50, "y": 4, "z": 176, "a": 20, "b": 60, "c": 9, "e": 5},
"12": {"x": 50, "y": 0, "z": 178, "a": 210, "b": 80, "c": 90, "e": 6}
},
"test_camera_positions": {
"a": [-300, -800, 800],
"b": [300, -900, 1200],
"c": [300, -900, 400],
"d": [700, -800, 400],
"e": [1200, -900, 400],
"f": [500, -300, 1400],
"g": [-200, 200, 1400]
},
"test_camera_targets": {
"a": [210, -100, 180],
"b": [310, -80, 180],
"c": [210, -100, 150],
"d": [210, -100, 150],
"e": [210, -100, 50],
"f": [200, -200, 180],
"g": [200, -200, 180]
},
"movements": {"x": null, "y": null, "z": null, "a": null, "b": null, "c": null, "e": null},
"state_pose_params": {
"numbers_of_Elements_to_consider_start": 3,
"numbers_of_Elements_to_consider_final": 5,
"solver_in_between_geometrical": false,
"solver_after_geometrical": false,
"geometric_passes_per_stage": 2,
"revolute_search_coarse_deg": 5.0,
"revolute_search_fine_deg": 1.0,
"root_pose_min_markers": 3,
"use_marker_normals_flip_tiebreak": true,
"normal_flip_weight": 0.05
},
"links": {
"Board": {
"parent": null,
"size": [1000, 200, 25],
"mountPosition": [0, 0, 0],
"mountRotation": [0, 0, 0],
"skeleton": {"from": [0, 0, 16], "to": [1000, 0, 16], "radius": 4, "color": [0.85, 0.2, 0.2]},
"markers": [
{"id": 210, "set": "Brett", "position": [20, -20, 0.3], "normal": [0, 0, 1]},
{"id": 211, "set": "Brett", "position": [250, -10, 0.3], "normal": [0, 0, 1]},
{"id": 215, "set": "Brett", "position": [250, -90, 0.3], "normal": [0, 0, 1]},
{"id": 214, "set": "Brett", "position": [350, -10, 0.3], "normal": [0, 0, 1]},
{"id": 208, "set": "Brett", "position": [350, -90, 0.3], "normal": [0, 0, 1]},
{"id": 206, "set": "Brett", "position": [650, -10, 0.3], "normal": [0, 0, 1]},
{"id": 205, "set": "Brett", "position": [750, -90, 0.3], "normal": [0, 0, 1]},
{"id": 207, "set": "Brett", "position": [750, -10, 0.3], "normal": [0, 0, 1]},
{"id": 217, "set": "Brett", "position": [650, -90, 0.3], "normal": [0, 0, 1]},
{
"id": 46,
"set": "A0",
"position": [536.71, 185.44, -27.3],
"normal": [0, 0, 1],
"spin": 90,
"info": "is placed on a white paper, A0_60Arucos_25mm_Seet223.pdf, with the following marker placements:"
},
{"id": 47, "set": "A0", "position": [344.23, -286.54, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 48, "set": "A0", "position": [688.69, -320.72, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 49, "set": "A0", "position": [1006.0, 158.33, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 50, "set": "A0", "position": [573.41, 211.86, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 51, "set": "A0", "position": [167.8, -172.08, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 52, "set": "A0", "position": [94.68, 208.66, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 53, "set": "A0", "position": [486.25, 212.24, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 54, "set": "A0", "position": [342.27, -330.59, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 55, "set": "A0", "position": [283.72, -262.58, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 56, "set": "A0", "position": [498.68, 168.67, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 57, "set": "A0", "position": [602.86, -364.05, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 58, "set": "A0", "position": [50.09, -218.11, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 59, "set": "A0", "position": [626.21, -278.75, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 60, "set": "A0", "position": [434.36, 283.81, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 61, "set": "A0", "position": [-22.42, 335.83, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 62, "set": "A0", "position": [404.7, -175.1, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 63, "set": "A0", "position": [777.4, -236.15, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 64, "set": "A0", "position": [-21.27, -188.23, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 65, "set": "A0", "position": [803.39, -297.37, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 66, "set": "A0", "position": [209.75, -363.23, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 67, "set": "A0", "position": [523.07, 267.04, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 68, "set": "A0", "position": [573.73, 170.64, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 69, "set": "A0", "position": [7.61, -281.21, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 70, "set": "A0", "position": [601.87, 300.33, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 71, "set": "A0", "position": [749.75, -284.01, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 72, "set": "A0", "position": [440.99, 194.32, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 73, "set": "A0", "position": [221.73, 333.11, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 74, "set": "A0", "position": [93.78, 144.5, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 75, "set": "A0", "position": [-25.7, 194.58, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 76, "set": "A0", "position": [685.21, 166.8, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 77, "set": "A0", "position": [18.19, 191.57, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 78, "set": "A0", "position": [823.11, -344.38, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 79, "set": "A0", "position": [312.3, -159.11, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 80, "set": "A0", "position": [863.59, -335.92, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 81, "set": "A0", "position": [132.14, 169.03, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 82, "set": "A0", "position": [219.16, 297.24, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 83, "set": "A0", "position": [44.16, 339.22, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 84, "set": "A0", "position": [407.49, 258.42, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 85, "set": "A0", "position": [504.58, -312.75, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 86, "set": "A0", "position": [362.89, 292.01, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 87, "set": "A0", "position": [943.63, -245.76, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 88, "set": "A0", "position": [765.87, 316.04, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 89, "set": "A0", "position": [988.02, -369.14, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 90, "set": "A0", "position": [643.17, 316.43, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 91, "set": "A0", "position": [723.35, 328.05, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 92, "set": "A0", "position": [645.09, -184.84, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 93, "set": "A0", "position": [934.88, 143.6, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 94, "set": "A0", "position": [875.7, 173.65, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 95, "set": "A0", "position": [186.04, -274.07, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 96, "set": "A0", "position": [369.77, -186.49, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 97, "set": "A0", "position": [304.35, -359.67, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 98, "set": "A0", "position": [575.27, 315.06, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 99, "set": "A0", "position": [959.16, -321.55, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 100, "set": "A0", "position": [803.25, 172.36, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 101, "set": "A0", "position": [117.7, 298.66, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 102, "set": "A0", "position": [649.69, -223.0, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 103, "set": "A0", "position": [105.71, -187.71, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 104, "set": "A0", "position": [826.71, 239.16, -27.3], "normal": [0, 0, 1], "spin": 90},
{"id": 105, "set": "A0", "position": [524.84, -266.25, -27.3], "normal": [0, 0, 1], "spin": 90}
],
"model": [
{
"stlFile": "surfaces/Board.stl",
"originOfModel": [0, 0, 0],
"rotationOfModelDegree": [0, 0, -90],
"material": "wood"
},
{
"stlFile": "surfaces/BoardRail.stl",
"originOfModel": [0, 0, 0],
"rotationOfModelDegree": [0, 0, -90],
"material": "steel"
}
]
},
"Base": {
"parent": "Board",
"size": [150, 200, 150],
"mountPosition": [0, 0, 0],
"mountRotation": [0, 0, 0],
"jointToParent": {
"name": "Slider",
"type": "linear",
"axis": [1, 0, 0],
"origin": [0, 0, 16],
"rotation": [0, 0, 0],
"variable": "x"
},
"skeleton": {"from": [0, 108, 45], "to": [110, 108, 45], "radius": 4, "color": [0.2, 0.8, 0.2]},
"markers": [],
"model": [
{
"stlFile": "surfaces/Base.stl",
"originOfModel": [-30, 0, -35],
"rotationOfModelDegree": [0, 0, 0],
"material": "plaWhite"
}
]
},
"Arm1": {
"parent": "Base",
"size": [70, 250, 70],
"mountPosition": [0, 0, 0],
"mountRotation": [0, 0, 0],
"jointToParent": {
"name": "Joint1",
"type": "revolute",
"axis": [-1, 0, 0],
"origin": [110, 108, 45],
"rotation": [0, 0, 0],
"variable": "y"
},
"skeleton": {"from": [0, 0, 0], "to": [0, -250, 0], "radius": 4, "color": [0.2, 0.2, 0.9]},
"markers": [
{"id": 198, "name": "aruco_198", "position": [0, -160, 35], "normal": [0, 0, 1], "size": 25, "spin": 0},
{"id": 229, "name": "aruco_229", "position": [0, -250, 35], "normal": [0, 0, 1], "size": 25, "spin": 0},
{"id": 242, "name": "aruco_242", "position": [0, -250, -35], "normal": [0, 0, -1], "size": 25, "spin": 0},
{"id": 243, "name": "aruco_243", "position": [0, -285, 0], "normal": [0, -1, 0], "size": 25, "spin": 0}
],
"model": [
{
"stlFile": "surfaces/Holm.stl",
"originOfModel__": [-25, 29, -28.5],
"originOfModel": [-29, 25, 28.5],
"rotationOfModelDegree__": [0, 0, 0],
"rotationOfModelDegree": [180, 0, -90],
"material": "powderCoatBlue"
}
]
},
"Ellbow": {
"parent": "Arm1",
"mountPosition": [0, 0, 0],
"mountRotation": [0, 0, 0],
"jointToParent": {
"name": "Joint2",
"type": "revolute",
"axis": [-1, 0, 0],
"origin": [0, -250, 0],
"rotation": [0, 0, 0],
"variable": "z"
},
"skeleton": {"from": [0, 0, 0], "to": [90, 0, 0], "radius": 4, "color": [0.9, 0.2, 0.2]},
"model": [
{
"stlFile": "surfaces/Ellebogen.stl",
"originOfModel": [90, 0, 0],
"rotationOfModelDegree": [0, -90, -90],
"material": "defaultPlastic"
}
],
"markers": [
{"id": 244, "name": "aruco_244", "position": [125, 0, 0], "normal": [1, 0, 0], "size": 25, "spin": 0},
{"id": 245, "name": "aruco_245", "position": [90, 0, -35], "normal": [0, 0, -1], "size": 25, "spin": 0},
{"id": 246, "name": "aruco_246", "position": [90, 0, 35], "normal": [0, 0, 1], "size": 25},
{"id": 247, "name": "aruco_247", "position": [52.5, 0, 35], "normal": [0, 0, 1], "size": 25},
{"id": 248, "name": "aruco_248", "position": [52.5, 0, -35], "normal": [0, 0, -1], "size": 25},
{"id": 232, "name": "aruco_232", "position": [90, 24.75, -24.75], "normal": [0, 1, -1], "size": 25},
{"id": 231, "name": "aruco_231", "position": [90, 24.75, 24.75], "normal": [0, 1, 1], "size": 25}
]
},
"Arm2": {
"parent": "Ellbow",
"mountPosition": [0, 0, 0],
"mountRotation": [0, 0, 0],
"jointToParent": {
"name": "Joint3",
"type": "revolute",
"axis": [0, -1, 0],
"origin": [90, 0, 0],
"rotation": [0, 0, 0],
"variable": "a"
},
"skeleton": {"from": [0, 0, 0], "to": [0, -250, 0], "radius": 4, "color": [0.95, 0.85, 0.2]},
"model": [
{
"stlFile": "surfaces/Unterarm.stl",
"originOfModel": [0, -250, 0],
"rotationOfModelDegree": [180, 0, -90],
"material": "defaultPlastic"
}
],
"markers": [
{"id": 120, "position": [24.75, -112, -24.75], "normal": [1, 0, -1]},
{"id": 122, "name": "aruco_122", "position": [-35, -112, 0], "normal": [-1, 0, 0]},
{"id": 218, "name": "aruco_218", "position": [35, -112, 0], "normal": [1, 0, 0]},
{"id": 113, "name": "aruco_113", "position": [0, -182, 30], "normal": [0, 0, 1]},
{"id": 114, "name": "aruco_114", "position": [24.75, -182, -24.75], "normal": [1, 0, -1]},
{"id": 115, "name": "aruco_115", "position": [-24.75, -182, -24.75], "normal": [-1, 0, -1]},
{"id": 124, "name": "aruco_124", "position": [-35, -219, 0], "normal": [-1, 0, 0]},
{"id": 219, "name": "aruco_219", "position": [35, -219, 0], "normal": [1, 0, 0]}
]
},
"Hand": {
"parent": "Arm2",
"mountPosition": [0, 0, 0],
"mountRotation": [0, 0, 0],
"jointToParent": {
"name": "Joint4",
"type": "revolute",
"axis": [1, 0, 0],
"origin": [0, -250, 0],
"rotation": [0, 0, 0],
"variable": "b"
},
"skeleton": {"from": [0, 0, 0], "to": [0, -35, 0], "radius": 4, "color": [0.95, 0.55, 0.15]}
},
"Palm": {
"parent": "Hand",
"mountPosition": [0, 0, 0],
"mountRotation": [0, 0, 0],
"jointToParent": {
"name": "Joint3",
"type": "revolute",
"axis": [0, -1, 0],
"origin": [0, 0, 0],
"rotation": [0, 0, 0],
"variable": "c"
},
"skeleton": {"from": [-50, -35, 0], "to": [50, -35, 0], "radius": 7, "color": [0.95, 0.2, 0.2]}
},
"FingerA": {
"parent": "Palm",
"size": [80, 60, 20],
"mountPosition": [0, 0, 0],
"mountRotation": [0, 0, 0],
"jointToParent": {
"name": "Slider",
"type": "linear",
"axis": [1, 0, 0],
"origin": [4, -35, 0],
"rotation": [0, 0, 0],
"variable": "e"
},
"skeleton": {"from": [0, 0, 0], "to": [0, -60, 0], "radius": 4, "color": [0.2, 0.8, 0.2]},
"markers": [
{"id": 40, "position": [12, -24, -17.1], "normal": [-10.98, 0, -23.56]},
{"id": 41, "position": [1.5, -2.2, 25.8], "normal": [0, -25.6, 9.5]},
{"id": 42, "position": [13.9, -40, 0], "normal": [1, -0.35, 0.4], "spin": 27}
],
"model": [
{
"stlFile": "surfaces/Finger.stl",
"originOfModel": [24, 0, -9.1],
"rotationOfModelDegree": [90, -90, 0],
"material": "defaultPlastic"
}
]
},
"FingerB": {
"parent": "Palm",
"size": [80, 60, 20],
"mountPosition": [0, 0, 0],
"mountRotation": [0, 0, 0],
"jointToParent": {
"name": "Slider",
"type": "linear",
"axis": [-1, 0, 0],
"origin": [-4, -35, 0],
"rotation": [0, 0, 0],
"variable": "e"
},
"skeleton": {"from": [0, 0, 0], "to": [0, -60, 0], "radius": 4, "color": [0.2, 0.8, 0.2]},
"markers": [
{"id": 43, "position": [-12, -24, 17.1], "normal": [10.98, 0, 23.56], "spin": 90},
{"id": 44, "position": [-1.5, -2.2, -25.8], "normal": [0, -25.6, -9.5], "spin": 90},
{"id": 45, "position": [-13.9, -40, 0], "normal": [-1, -0.35, -0.4], "spin": -27}
],
"model": [
{
"stlFile": "surfaces/Finger.stl",
"originOfModel": [-24, 0, 9.1],
"rotationOfModelDegree": [90, 90, 0],
"material": "defaultPlastic"
}
]
}
}
}

19
tests/fixtures/robot_minimal.json vendored Normal file
View File

@@ -0,0 +1,19 @@
{
"units": {"length": "mm", "angle": "deg"},
"links": [],
"pose_estimation": {
"method": "hybrid",
"marker_observation": "corner_pose",
"use_normals": true,
"normal_weight": 100.0,
"robust_loss": "huber",
"huber_delta_mm": 8.0,
"max_iterations": 200,
"min_cameras_per_marker": 2,
"per_link_method": {}
},
"vision_config": {
"aruco_dict": "DICT_4X4_250",
"marker_size_mm": 20.0
}
}

123
tests/test_e2e.py Normal file
View File

@@ -0,0 +1,123 @@
"""
End-to-End Tests für die Pipeline gegen echte Daten.
Diese Tests rufen die Pipeline direkt auf und validieren,
dass sinnvolle Ergebnisse erzeugt werden.
"""
import json
from pathlib import Path
import pytest
from scripts import estimate_from_dir
DATA = Path(__file__).parent / "data"
SCENE_9A = DATA / "Scene9a"
ROBOT_JSON = DATA / "robot.json"
REFERENCE_POSE = SCENE_9A / "pose.json"
def _normalize_angle(angle: float) -> float:
"""Normalisiere Winkel auf [-180, 180]."""
while angle > 180:
angle -= 360
while angle <= -180:
angle += 360
return angle
def _angle_distance(a1: float, a2: float) -> float:
"""Berechne die kürzeste Winkel-Differenz in Grad."""
a1_norm = _normalize_angle(a1)
a2_norm = _normalize_angle(a2)
diff = abs(a1_norm - a2_norm)
if diff > 180:
diff = 360 - diff
return diff
@pytest.mark.skipif(
not SCENE_9A.exists(),
reason="Scene9a Testdaten nicht vorhanden"
)
def test_estimate_from_scene9a():
"""
E2E-Test: Scene9a-Bilder mit allen Kameras a-g,
dazugehörige Intrinsiken und robot.json auswerten.
Validiert, dass:
- Pipeline ohne Fehler läuft
- Alle 7 Gelenke Werte erhalten
- Confidence-Werte für alle Gelenke vorhanden
- Plausible numerische Ergebnisse erzeugt werden
- Ergebnisse mit Referenzhaltung übereinstimmen (±5mm, ±5°)
"""
result = estimate_from_dir(
SCENE_9A,
robot_json=ROBOT_JSON,
)
# [*] Basis-Validierungen
assert result is not None
assert isinstance(result.joints, dict)
assert isinstance(result.confidence, dict)
# [*] Gelenke prüfen
expected_joints = {"x", "y", "z", "a", "b", "c", "e"}
assert set(result.joints.keys()) == expected_joints, \
f"Erwartet {expected_joints}, aber erhalten {set(result.joints.keys())}"
# [*] Alle Werte sollten numerisch sein und nicht NaN
for joint, value in result.joints.items():
assert isinstance(value, (int, float)), \
f"Gelenk {joint} sollte numerisch sein, ist aber {type(value)}"
assert not (isinstance(value, float) and value != value), \
f"Gelenk {joint} ist NaN"
# [*] Confidence-Werte prüfen
confidence_levels = {"high", "medium", "low", "none"}
for joint, conf in result.confidence.items():
assert conf in confidence_levels, \
f"Gelenk {joint} hat ungültiges Confidence {conf}"
# [*] Marker und Residuum
assert result.n_markers > 0, "Sollte mindestens einen Marker haben"
assert result.residual_rms >= 0, "RMS-Residuum sollte nicht negativ sein"
assert result.processing_ms > 0, "Verarbeitung sollte Zeit brauchen"
# [*] Fehlerbehandlung
assert isinstance(result.errors, list)
# [*] Vergleich mit Referenzwerten aus pose.json
if REFERENCE_POSE.exists():
ref_data = json.loads(REFERENCE_POSE.read_text())
ref_position = ref_data.get("position", {})
# Toleranzen: ±2mm für Linear, ±2° für Rotation
LINEAR_TOLERANCE = 2.0 # mm
ANGULAR_TOLERANCE = 2.0 # Grad
for joint in expected_joints:
estimated = result.joints[joint]
reference = ref_position.get(joint)
if reference is not None:
if joint in {"x", "y", "z"}:
# Linear-Achsen: Toleranz ±5mm
diff = abs(estimated - reference)
assert diff <= LINEAR_TOLERANCE, \
f"Gelenk {joint}: Abweichung {diff:.2f}mm > {LINEAR_TOLERANCE}mm " \
f"(Referenz: {reference}, Gemessen: {estimated:.2f})"
else:
# Rotations-Achsen: Toleranz ±5°
angle_diff = _angle_distance(estimated, reference)
assert angle_diff <= ANGULAR_TOLERANCE, \
f"Gelenk {joint}: Abweichung {angle_diff:.2f}° > {ANGULAR_TOLERANCE}° " \
f"(Referenz: {reference}°, Gemessen: {estimated:.2f}°)"
# [OK] Ausgeben zur manuellen Validierung
print(f"\n[OK] Scene9a erfolgreich verarbeitet")
print(f" Gelenke: {result.joints}")
print(f" Confidence: {result.confidence}")
print(f" Marker: {result.n_markers}, RMS: {result.residual_rms:.3f} mm")
print(f" Zeit: {result.processing_ms} ms")

57
tests/test_pipeline.py Normal file
View File

@@ -0,0 +1,57 @@
"""
Grundlegende Unit- und Fehler-Tests für scripts.
End-to-end-Tests gegen echte Bilder sind in tests/test_e2e.py
(erfordert fixtures/Scene*-Ordner, nicht im Repo).
"""
import os
from pathlib import Path
import pytest
from scripts import PipelineResult, __version__, estimate_from_dir
FIXTURES = Path(__file__).parent / "fixtures"
def test_version_format():
parts = __version__.split(".")
assert len(parts) == 3
assert all(p.isdigit() for p in parts)
def test_pipeline_result_dataclass():
r = PipelineResult(
joints={"x": 50.0, "y": -2.0, "z": 94.0, "a": 20.0, "b": 59.0, "c": 9.0, "e": 3.0},
confidence={"x": "high", "y": "high", "z": "high", "a": "high", "b": "low", "c": "low", "e": "low"},
n_markers=42,
residual_rms=1.45,
processing_ms=1200.0,
)
assert r.joints["x"] == 50.0
assert r.confidence["b"] == "low"
assert r.n_markers == 42
assert r.errors == []
def test_estimate_raises_on_empty_dir(tmp_path):
with pytest.raises(FileNotFoundError, match="render_"):
estimate_from_dir(tmp_path, robot_json=FIXTURES / "robot_minimal.json")
def test_estimate_raises_without_robot_json(tmp_path):
env_backup = os.environ.pop("ROBOT_JSON", None)
try:
with pytest.raises(ValueError, match="robot_json"):
estimate_from_dir(tmp_path, robot_json=None)
finally:
if env_backup is not None:
os.environ["ROBOT_JSON"] = env_backup
def test_robot_minimal_json_is_valid():
import json
data = json.loads((FIXTURES / "robot_minimal.json").read_text())
assert "pose_estimation" in data
assert "links" in data
assert data["pose_estimation"]["method"] == "hybrid"