Claude: über nacht arbeiten. Pipeline verbessern
This commit is contained in:
@@ -1,73 +1,145 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
9_evaluateMarker.py
|
||||
===================
|
||||
Compare reconstructed markers against ground-truth (render_*.json).
|
||||
|
||||
Reports, per link and overall:
|
||||
* 3D position error (mm)
|
||||
* orientation error (deg) between measured and GT normal — only meaningful
|
||||
when the detected file carries a MEASURED normal (aruco_marker_poses.json).
|
||||
|
||||
Backwards compatible: the original positional CLI
|
||||
python 9_evaluateMarker.py detected.json original.json
|
||||
still works and prints the familiar summary; --out adds a JSON report.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import math
|
||||
import argparse
|
||||
from collections import defaultdict
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
def calculate_distance(point1, point2):
|
||||
"""Berechnet den euklidischen Abstand zwischen zwei 3D-Punkten."""
|
||||
return math.sqrt(sum([(a - b) ** 2 for a, b in zip(point1, point2)]))
|
||||
|
||||
def analyze_marker_detection(detected_markers_file, original_markers_file):
|
||||
"""Analysiert die Markererkennung und gibt Erkennungsrate und Genauigkeit aus."""
|
||||
def dist3(a, b) -> float:
|
||||
return math.sqrt(sum((x - y) ** 2 for x, y in zip(a, b)))
|
||||
|
||||
with open(detected_markers_file, 'r') as f:
|
||||
detected_data = json.load(f)
|
||||
|
||||
with open(original_markers_file, 'r') as f:
|
||||
original_data = json.load(f)
|
||||
def angle_deg(a, b) -> Optional[float]:
|
||||
na = math.sqrt(sum(x * x for x in a))
|
||||
nb = math.sqrt(sum(x * x for x in b))
|
||||
if na < 1e-9 or nb < 1e-9:
|
||||
return None
|
||||
c = sum(x * y for x, y in zip(a, b)) / (na * nb)
|
||||
c = max(-1.0, min(1.0, c))
|
||||
ang = math.degrees(math.acos(c))
|
||||
return min(ang, 180.0 - ang) # flip-invariant
|
||||
|
||||
detected_marker_ids = {marker['marker_id'] for marker in detected_data['markers']}
|
||||
original_marker_ids = {marker['id'] for marker in original_data}
|
||||
|
||||
# Erkannte und nicht erkannte Marker
|
||||
recognized_markers = len(detected_marker_ids.intersection(original_marker_ids))
|
||||
missing_markers = len(original_marker_ids.difference(detected_marker_ids))
|
||||
|
||||
# Distanzen berechnen
|
||||
distances = []
|
||||
for original_marker in original_data:
|
||||
marker_id = original_marker['id']
|
||||
|
||||
# Gefundener Marker suchen
|
||||
detected_marker = next((m for m in detected_data['markers'] if m['marker_id'] == marker_id), None)
|
||||
|
||||
if detected_marker:
|
||||
distance = calculate_distance(original_marker['position_m'], detected_marker['position_m'])
|
||||
distances.append(distance)
|
||||
def _pct(values: List[float], q: float) -> float:
|
||||
if not values:
|
||||
return 0.0
|
||||
s = sorted(values)
|
||||
idx = max(0, min(len(s) - 1, int(q * len(s)) - 1))
|
||||
return s[idx]
|
||||
|
||||
if not distances:
|
||||
|
||||
def analyze(detected_file: str, original_file: str) -> Dict[str, Any]:
|
||||
detected = json.load(open(detected_file, "r", encoding="utf-8"))
|
||||
original = json.load(open(original_file, "r", encoding="utf-8"))
|
||||
|
||||
det_markers = detected.get("markers", detected if isinstance(detected, list) else [])
|
||||
det_by_id = {int(m.get("marker_id", m.get("id", -1))): m for m in det_markers}
|
||||
orig_by_id = {int(m["id"]): m for m in original}
|
||||
|
||||
det_ids = set(det_by_id) - {-1}
|
||||
orig_ids = set(orig_by_id)
|
||||
recognized = det_ids & orig_ids
|
||||
missing = orig_ids - det_ids
|
||||
|
||||
per_link_pos: Dict[str, List[float]] = defaultdict(list)
|
||||
per_link_ang: Dict[str, List[float]] = defaultdict(list)
|
||||
rows = []
|
||||
for mid in sorted(recognized):
|
||||
o = orig_by_id[mid]
|
||||
d = det_by_id[mid]
|
||||
link = o.get("link", d.get("link", "?"))
|
||||
pos_err = dist3(o["position_m"], d["position_m"]) * 1000.0 # mm
|
||||
per_link_pos[link].append(pos_err)
|
||||
ang_err = None
|
||||
if o.get("normal") is not None and d.get("normal") is not None:
|
||||
ang_err = angle_deg(o["normal"], d["normal"])
|
||||
if ang_err is not None:
|
||||
per_link_ang[link].append(ang_err)
|
||||
rows.append({"marker_id": mid, "link": link, "pos_err_mm": pos_err, "normal_err_deg": ang_err})
|
||||
|
||||
all_pos = [r["pos_err_mm"] for r in rows]
|
||||
all_ang = [r["normal_err_deg"] for r in rows if r["normal_err_deg"] is not None]
|
||||
|
||||
return {
|
||||
"n_original": len(orig_ids),
|
||||
"n_recognized": len(recognized),
|
||||
"n_missing": len(missing),
|
||||
"recognition_rate": (len(recognized) / len(orig_ids) * 100.0) if orig_ids else 0.0,
|
||||
"per_link": {
|
||||
ln: {
|
||||
"n": len(per_link_pos[ln]),
|
||||
"pos_mean_mm": sum(per_link_pos[ln]) / len(per_link_pos[ln]),
|
||||
"pos_max_mm": max(per_link_pos[ln]),
|
||||
"normal_mean_deg": (sum(per_link_ang[ln]) / len(per_link_ang[ln])) if per_link_ang.get(ln) else None,
|
||||
"normal_max_deg": max(per_link_ang[ln]) if per_link_ang.get(ln) else None,
|
||||
} for ln in sorted(per_link_pos, key=lambda k: -len(per_link_pos[k]))
|
||||
},
|
||||
"overall": {
|
||||
"pos_mean_mm": (sum(all_pos) / len(all_pos)) if all_pos else 0.0,
|
||||
"pos_p90_mm": _pct(all_pos, 0.9),
|
||||
"pos_max_mm": max(all_pos) if all_pos else 0.0,
|
||||
"normal_mean_deg": (sum(all_ang) / len(all_ang)) if all_ang else None,
|
||||
"normal_p90_deg": _pct(all_ang, 0.9) if all_ang else None,
|
||||
"normal_max_deg": max(all_ang) if all_ang else None,
|
||||
},
|
||||
"rows": rows,
|
||||
}
|
||||
|
||||
|
||||
def print_report(r: Dict[str, Any]) -> None:
|
||||
# familiar summary (backwards compatible wording)
|
||||
print(f"Erkannte Marker: {r['n_recognized']}")
|
||||
print(f"Nicht erkannte Marker: {r['n_missing']}")
|
||||
print(f"Gesamtzahl der Original-Marker: {r['n_original']}")
|
||||
print(f"Erkennungsrate: {r['recognition_rate']:.2f}%")
|
||||
o = r["overall"]
|
||||
print(f"Gemittelter 3D-Abstand: {o['pos_mean_mm']/1000.0:.4f}m")
|
||||
print(f"90%-Radius: {o['pos_p90_mm']/1000.0:.4f}m")
|
||||
print(f"Schlechtester Abstand: {o['pos_max_mm']/1000.0:.4f}m")
|
||||
if o["normal_mean_deg"] is not None:
|
||||
print(f"Normalen-Fehler: mean {o['normal_mean_deg']:.2f}deg / p90 {o['normal_p90_deg']:.2f}deg / max {o['normal_max_deg']:.2f}deg")
|
||||
|
||||
print("\nPro Glied:")
|
||||
print(f" {'link':>10} | {'n':>3} | {'pos mean/max [mm]':>18} | {'normal mean/max [deg]':>21}")
|
||||
print(" " + "-" * 60)
|
||||
for ln, s in r["per_link"].items():
|
||||
nd = f"{s['normal_mean_deg']:6.2f} /{s['normal_max_deg']:6.2f}" if s["normal_mean_deg"] is not None else " - "
|
||||
print(f" {ln:>10} | {s['n']:>3} | {s['pos_mean_mm']:7.2f} /{s['pos_max_mm']:7.2f} | {nd:>21}")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser(description="Analysiert die Markererkennung (Position + Orientierung).")
|
||||
ap.add_argument("detected_file", help="aruco_marker_poses.json oder aruco_positions_*.json")
|
||||
ap.add_argument("original_file", help="Ground-Truth render_*.json")
|
||||
ap.add_argument("--out", default=None, help="optional JSON-Report")
|
||||
args = ap.parse_args()
|
||||
|
||||
r = analyze(args.detected_file, args.original_file)
|
||||
if r["n_recognized"] == 0:
|
||||
print("Keine gemeinsamen Marker gefunden, um die Genauigkeit zu bewerten.")
|
||||
return
|
||||
|
||||
# Statistiken berechnen
|
||||
distances.sort()
|
||||
mean_distance = sum(distances) / len(distances)
|
||||
|
||||
# 90%-Radius
|
||||
n = len(distances)
|
||||
index_90 = int(0.9 * n) - 1
|
||||
radius_90 = distances[index_90]
|
||||
|
||||
# 80%-Radius
|
||||
index_80 = int(0.8 * n) - 1
|
||||
radius_80 = distances[index_80]
|
||||
|
||||
# Schlechtester Abstand
|
||||
worst_distance = distances[-1]
|
||||
|
||||
print(f"Erkannte Marker: {recognized_markers}")
|
||||
print(f"Nicht erkannte Marker: {missing_markers}")
|
||||
print(f"Gesamtzahl der Original-Marker: {len(original_marker_ids)}")
|
||||
print(f"Erkennungsrate: {recognized_markers / len(original_marker_ids) * 100:.2f}%")
|
||||
print(f"Gemittelter 3D-Abstand: {mean_distance:.4f}m")
|
||||
print(f"90%-Radius: {radius_90:.4f}m")
|
||||
print(f"80%-Radius: {radius_80:.4f}m")
|
||||
print(f"Schlechtester Abstand: {worst_distance:.4f}m")
|
||||
print_report(r)
|
||||
if args.out:
|
||||
json.dump(r, open(args.out, "w", encoding="utf-8"), indent=2)
|
||||
print(f"\n[INFO] wrote {args.out}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Analysiert die Markererkennung.")
|
||||
parser.add_argument("detected_file", help="Pfad zur JSON-Datei mit den erkannten Markern.")
|
||||
parser.add_argument("original_file", help="Pfad zur JSON-Datei mit den originalen Markern.")
|
||||
args = parser.parse_args()
|
||||
|
||||
analyze_marker_detection(args.detected_file, args.original_file)
|
||||
main()
|
||||
|
||||
BIN
pipeline/__pycache__/pose_estimation.cpython-311.pyc
Normal file
BIN
pipeline/__pycache__/pose_estimation.cpython-311.pyc
Normal file
Binary file not shown.
539
pipeline/pose_estimation.py
Normal file
539
pipeline/pose_estimation.py
Normal file
@@ -0,0 +1,539 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
pose_estimation.py
|
||||
==================
|
||||
Estimate robot joint angles (x, y, z, a, b, c, e) from triangulated marker
|
||||
poses, using the kinematic model in robot.json (via robot_fk.py).
|
||||
|
||||
Design
|
||||
------
|
||||
The estimator is parametrised over JOINT VARIABLES, not links. This handles the
|
||||
tricky cases of this robot family generically:
|
||||
* Links with zero own markers (Base/x, Hand/b, Palm/c) — their variable is
|
||||
observable only through descendant markers.
|
||||
* A variable shared by several links (FingerA & FingerB share 'e').
|
||||
* Occluded middle links — global BA reconstructs them from the fingers.
|
||||
|
||||
Four switchable methods (robot.json -> pose_estimation.method):
|
||||
sequential_vector : analytic per joint from marker-pair / normal vectors (fast)
|
||||
sequential_fk : block-wise least squares along the chain (robust, 1 marker ok)
|
||||
global_ba : all variables jointly, position + normal residuals, robust loss
|
||||
hybrid : sequential_fk init -> global_ba refine (default, most stable)
|
||||
|
||||
Observation input:
|
||||
marker_observation = "corner_pose" -> aruco_marker_poses.json (pos + measured normal)
|
||||
marker_observation = "center_point" -> aruco_positions_*.json (pos only)
|
||||
|
||||
Both the engine (estimate_pose) and a CLI (main) live here.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import math
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
from robot_fk import RobotFK, STATE_KEYS # noqa: E402
|
||||
|
||||
try:
|
||||
from scipy.optimize import least_squares
|
||||
HAVE_SCIPY = True
|
||||
except ImportError:
|
||||
HAVE_SCIPY = False
|
||||
|
||||
|
||||
# ==================================================================
|
||||
# Config
|
||||
# ==================================================================
|
||||
|
||||
DEFAULT_CFG: Dict[str, Any] = {
|
||||
"method": "hybrid",
|
||||
"marker_observation": "corner_pose",
|
||||
"use_normals": True,
|
||||
"normal_weight": 100.0,
|
||||
"robust_loss": "huber",
|
||||
"huber_delta_mm": 8.0,
|
||||
"max_iterations": 200,
|
||||
"min_cameras_per_marker": 2,
|
||||
"finger_block_joints": ["b", "c", "e"],
|
||||
"per_link_method": {},
|
||||
}
|
||||
|
||||
|
||||
def load_pose_cfg(robot_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
cfg = dict(DEFAULT_CFG)
|
||||
cfg.update(robot_data.get("pose_estimation", {}) or {})
|
||||
return cfg
|
||||
|
||||
|
||||
# ==================================================================
|
||||
# Observations
|
||||
# ==================================================================
|
||||
|
||||
def load_observations(path: str, use_normals: bool, min_cams: int = 2) -> Dict[int, Dict[str, Any]]:
|
||||
"""
|
||||
Load marker observations. Accepts aruco_marker_poses.json (with measured
|
||||
normal + num_cameras) or aruco_positions_*.json (position only).
|
||||
Returns: marker_id -> {pos_mm:(3,), normal:(3,)|None, link:str, n_cams:int}
|
||||
"""
|
||||
data = json.load(open(path, "r", encoding="utf-8"))
|
||||
out: Dict[int, Dict[str, Any]] = {}
|
||||
for m in data.get("markers", []):
|
||||
mid = int(m.get("marker_id", m.get("id", -1)))
|
||||
if mid < 0:
|
||||
continue
|
||||
n_cams = int(m.get("num_cameras", 99))
|
||||
if n_cams < min_cams:
|
||||
continue
|
||||
if "position_mm" in m:
|
||||
pos = np.array(m["position_mm"], dtype=float)
|
||||
elif "position_m" in m:
|
||||
pos = np.array(m["position_m"], dtype=float) * 1000.0
|
||||
else:
|
||||
continue
|
||||
nrm = None
|
||||
if use_normals and m.get("normal") is not None:
|
||||
nv = np.array(m["normal"], dtype=float)
|
||||
nn = np.linalg.norm(nv)
|
||||
if nn > 1e-9:
|
||||
nrm = nv / nn
|
||||
out[mid] = {"pos_mm": pos, "normal": nrm, "link": m.get("link", "?"), "n_cams": n_cams}
|
||||
return out
|
||||
|
||||
|
||||
# ==================================================================
|
||||
# Kinematic chain analysis
|
||||
# ==================================================================
|
||||
|
||||
def analyze_chain(fk: RobotFK) -> Dict[str, Any]:
|
||||
"""
|
||||
Derive, generically from the FK topology:
|
||||
ordered_vars : movable joint variables, root->tip order, de-duplicated
|
||||
var_links : variable -> list of links it drives
|
||||
link_markers : link -> [model marker ids]
|
||||
blocks : sequential estimation blocks; each block groups the
|
||||
zero-marker ancestor variables with the next marker-
|
||||
bearing joint variable, estimated from that link's own
|
||||
markers (+ siblings sharing the same variable).
|
||||
"""
|
||||
links = fk.links
|
||||
topo = fk._topo
|
||||
|
||||
link_markers: Dict[str, List[int]] = {}
|
||||
for ln, ld in links.items():
|
||||
ids = []
|
||||
for mk in ld.get("markers", []) or []:
|
||||
if "id" in mk and "position" in mk:
|
||||
ids.append(int(mk["id"]))
|
||||
link_markers[ln] = ids
|
||||
|
||||
link_var: Dict[str, str] = {}
|
||||
for ln, ld in links.items():
|
||||
j = ld.get("jointToParent", {}) or {}
|
||||
if str(j.get("type", "")).lower() in ("revolute", "linear"):
|
||||
v = str(j.get("variable", "")).lower()
|
||||
if v:
|
||||
link_var[ln] = v
|
||||
|
||||
var_type: Dict[str, str] = {}
|
||||
var_links: Dict[str, List[str]] = defaultdict(list)
|
||||
for ln, v in link_var.items():
|
||||
var_links[v].append(ln)
|
||||
var_type[v] = str(links[ln].get("jointToParent", {}).get("type", "")).lower()
|
||||
|
||||
ordered_vars: List[str] = []
|
||||
for ln in topo:
|
||||
if ln in link_var and link_var[ln] not in ordered_vars:
|
||||
ordered_vars.append(link_var[ln])
|
||||
|
||||
# ---- build blocks ----
|
||||
blocks: List[Dict[str, Any]] = []
|
||||
var_block: Dict[str, int] = {}
|
||||
pending: List[str] = []
|
||||
for ln in topo:
|
||||
if ln not in link_var:
|
||||
continue
|
||||
v = link_var[ln]
|
||||
own = link_markers.get(ln, [])
|
||||
if v in var_block:
|
||||
# shared variable already in a block -> add this link's markers there
|
||||
if own:
|
||||
blocks[var_block[v]]["markers"].extend(own)
|
||||
continue
|
||||
if own:
|
||||
bvars = []
|
||||
for x in pending + [v]:
|
||||
if x not in bvars and x not in var_block:
|
||||
bvars.append(x)
|
||||
blocks.append({"vars": bvars, "markers": list(own), "anchor": ln})
|
||||
for x in bvars:
|
||||
var_block[x] = len(blocks) - 1
|
||||
pending = []
|
||||
else:
|
||||
if v not in pending:
|
||||
pending.append(v)
|
||||
if pending:
|
||||
blocks.append({"vars": pending, "markers": [], "anchor": None})
|
||||
for x in pending:
|
||||
var_block[x] = len(blocks) - 1
|
||||
|
||||
return {
|
||||
"ordered_vars": ordered_vars,
|
||||
"var_type": var_type,
|
||||
"var_links": dict(var_links),
|
||||
"link_markers": link_markers,
|
||||
"blocks": blocks,
|
||||
}
|
||||
|
||||
|
||||
# ==================================================================
|
||||
# Residuals
|
||||
# ==================================================================
|
||||
|
||||
def model_markers(fk: RobotFK, state: Dict[str, float]) -> Dict[int, Dict[str, np.ndarray]]:
|
||||
T = fk.compute(state)
|
||||
return fk.all_markers_world(T) # mid -> {world_mm, normal_world, link, local_mm}
|
||||
|
||||
|
||||
def residual_vector(state: Dict[str, float], fk: RobotFK, obs: Dict[int, Dict[str, Any]],
|
||||
marker_ids: List[int], cfg: Dict[str, Any]) -> np.ndarray:
|
||||
"""Position (mm) + optional normal (scaled) residuals over the given markers."""
|
||||
model = model_markers(fk, state)
|
||||
res: List[float] = []
|
||||
w_n = float(cfg.get("normal_weight", 30.0))
|
||||
use_n = bool(cfg.get("use_normals", True))
|
||||
for mid in marker_ids:
|
||||
if mid not in model or mid not in obs:
|
||||
continue
|
||||
mm = model[mid]
|
||||
dp = np.asarray(mm["world_mm"], float) - obs[mid]["pos_mm"]
|
||||
res.extend(dp.tolist())
|
||||
if use_n and obs[mid]["normal"] is not None and "normal_world" in mm:
|
||||
dn = (np.asarray(mm["normal_world"], float) - obs[mid]["normal"]) * w_n
|
||||
res.extend(dn.tolist())
|
||||
return np.asarray(res, dtype=float)
|
||||
|
||||
|
||||
def _state_from_vec(var_names: List[str], vec: np.ndarray, base: Dict[str, float]) -> Dict[str, float]:
|
||||
s = dict(base)
|
||||
for name, val in zip(var_names, vec):
|
||||
s[name] = float(val)
|
||||
return s
|
||||
|
||||
|
||||
# ==================================================================
|
||||
# Method: global bundle adjustment
|
||||
# ==================================================================
|
||||
|
||||
def estimate_global_ba(fk: RobotFK, obs: Dict[int, Dict[str, Any]], var_names: List[str],
|
||||
x0: Dict[str, float], cfg: Dict[str, Any]) -> Dict[str, float]:
|
||||
if not HAVE_SCIPY:
|
||||
print("[WARN] scipy missing — global_ba skipped, returning init")
|
||||
return dict(x0)
|
||||
marker_ids = list(obs.keys())
|
||||
base = {k: 0.0 for k in STATE_KEYS}
|
||||
base.update(x0)
|
||||
vec0 = np.array([base.get(v, 0.0) for v in var_names], dtype=float)
|
||||
|
||||
def fun(vec):
|
||||
st = _state_from_vec(var_names, vec, base)
|
||||
return residual_vector(st, fk, obs, marker_ids, cfg)
|
||||
|
||||
loss = cfg.get("robust_loss", "huber")
|
||||
f_scale = float(cfg.get("huber_delta_mm", 8.0))
|
||||
try:
|
||||
sol = least_squares(fun, vec0, loss=loss, f_scale=f_scale,
|
||||
max_nfev=int(cfg.get("max_iterations", 200)) * max(1, len(var_names)))
|
||||
return _state_from_vec(var_names, sol.x, base)
|
||||
except Exception as exc:
|
||||
print(f"[WARN] global_ba failed: {exc}")
|
||||
return dict(base)
|
||||
|
||||
|
||||
# ==================================================================
|
||||
# Method: sequential block-wise FK fit
|
||||
# ==================================================================
|
||||
|
||||
def _multistart_values(vtype: str) -> List[float]:
|
||||
# revolute: scan the circle to escape local minima at large angles
|
||||
if vtype == "revolute":
|
||||
return [0.0, 60.0, 120.0, 180.0, 240.0, 300.0]
|
||||
return [0.0]
|
||||
|
||||
|
||||
def estimate_sequential_fk(fk: RobotFK, obs: Dict[int, Dict[str, Any]], chain: Dict[str, Any],
|
||||
cfg: Dict[str, Any]) -> Dict[str, float]:
|
||||
"""Estimate block by block along the chain, freezing already-solved variables."""
|
||||
state = {k: 0.0 for k in STATE_KEYS}
|
||||
var_type = chain["var_type"]
|
||||
|
||||
for block in chain["blocks"]:
|
||||
bvars = block["vars"]
|
||||
bmarkers = [m for m in block["markers"] if m in obs]
|
||||
if not bvars:
|
||||
continue
|
||||
if not bmarkers:
|
||||
# unobservable block: leave at 0, flag later
|
||||
continue
|
||||
|
||||
if not HAVE_SCIPY:
|
||||
continue
|
||||
|
||||
base = dict(state)
|
||||
|
||||
def fun(vec, _bvars=bvars, _bm=bmarkers, _base=base):
|
||||
st = _state_from_vec(_bvars, vec, _base)
|
||||
return residual_vector(st, fk, obs, _bm, cfg)
|
||||
|
||||
# multi-start over the first revolute variable in the block
|
||||
starts = [[0.0] * len(bvars)]
|
||||
lead_type = var_type.get(bvars[0], "linear")
|
||||
if lead_type == "revolute":
|
||||
starts = []
|
||||
for a0 in _multistart_values("revolute"):
|
||||
s = [0.0] * len(bvars)
|
||||
s[0] = a0
|
||||
starts.append(s)
|
||||
|
||||
best, best_cost = None, float("inf")
|
||||
for s0 in starts:
|
||||
try:
|
||||
sol = least_squares(fun, np.array(s0, dtype=float),
|
||||
loss=cfg.get("robust_loss", "huber"),
|
||||
f_scale=float(cfg.get("huber_delta_mm", 8.0)),
|
||||
max_nfev=200 * max(1, len(bvars)))
|
||||
if sol.cost < best_cost:
|
||||
best_cost, best = sol.cost, sol.x
|
||||
except Exception:
|
||||
continue
|
||||
if best is not None:
|
||||
for name, val in zip(bvars, best):
|
||||
state[name] = float(val)
|
||||
|
||||
# wrap revolute angles to (-180, 180]
|
||||
for v, vt in var_type.items():
|
||||
if vt == "revolute":
|
||||
state[v] = (state[v] + 180.0) % 360.0 - 180.0
|
||||
return state
|
||||
|
||||
|
||||
# ==================================================================
|
||||
# Method: sequential analytic vector (per revolute joint)
|
||||
# ==================================================================
|
||||
|
||||
def estimate_sequential_vector(fk: RobotFK, obs: Dict[int, Dict[str, Any]], chain: Dict[str, Any],
|
||||
cfg: Dict[str, Any]) -> Dict[str, float]:
|
||||
"""
|
||||
Analytic angle from marker geometry where possible. For revolute joints with
|
||||
>=2 markers on the link, use the perpendicular marker-pair vector. Falls back
|
||||
to the FK block solver for linear / zero-marker / single-marker cases, so it
|
||||
always returns a full state (still cheaper than full sequential_fk because
|
||||
well-populated joints are solved in closed form).
|
||||
"""
|
||||
state = {k: 0.0 for k in STATE_KEYS}
|
||||
var_type = chain["var_type"]
|
||||
link_markers = chain["link_markers"]
|
||||
var_links = chain["var_links"]
|
||||
|
||||
for block in chain["blocks"]:
|
||||
bvars = block["vars"]
|
||||
if len(bvars) == 1 and var_type.get(bvars[0]) == "revolute":
|
||||
v = bvars[0]
|
||||
ln = var_links[v][0]
|
||||
mids = [m for m in link_markers.get(ln, []) if m in obs]
|
||||
if len(mids) >= 2:
|
||||
# model vectors must be expressed in the WORLD frame at angle=0
|
||||
# (the link frame is already rotated by the parents y,z,...), so
|
||||
# use FK marker world positions with this joint set to 0.
|
||||
state_v0 = dict(state)
|
||||
state_v0[v] = 0.0
|
||||
model_v0 = model_markers(fk, state_v0)
|
||||
axis_world = fk.joint_axis_world(ln, state_v0)
|
||||
ang = _angle_from_pairs_world(mids, model_v0, obs, axis_world)
|
||||
if ang is not None:
|
||||
state[v] = ang
|
||||
continue
|
||||
# fallback: block FK fit for this single block
|
||||
_fit_single_block(fk, obs, block, var_type, cfg, state)
|
||||
|
||||
for v, vt in var_type.items():
|
||||
if vt == "revolute":
|
||||
state[v] = (state[v] + 180.0) % 360.0 - 180.0
|
||||
return state
|
||||
|
||||
|
||||
def _angle_from_pairs_world(mids: List[int], model_v0: Dict[int, Dict[str, np.ndarray]],
|
||||
obs: Dict[int, Dict[str, Any]], axis_world: np.ndarray) -> Optional[float]:
|
||||
from itertools import combinations
|
||||
a = np.asarray(axis_world, float)
|
||||
a = a / (np.linalg.norm(a) + 1e-12)
|
||||
angs, ws = [], []
|
||||
for i, j in combinations(mids, 2):
|
||||
if i not in model_v0 or j not in model_v0:
|
||||
continue
|
||||
vm = np.asarray(model_v0[j]["world_mm"], float) - np.asarray(model_v0[i]["world_mm"], float) # world @ angle 0
|
||||
vo = obs[j]["pos_mm"] - obs[i]["pos_mm"] # observed vector (world, mm)
|
||||
vm_p = vm - np.dot(vm, a) * a
|
||||
vo_p = vo - np.dot(vo, a) * a
|
||||
if np.linalg.norm(vm_p) < 5 or np.linalg.norm(vo_p) < 5:
|
||||
continue
|
||||
ang = math.atan2(float(np.dot(a, np.cross(vm_p, vo_p))), float(np.dot(vm_p, vo_p)))
|
||||
angs.append(ang)
|
||||
ws.append(np.linalg.norm(vm_p) * np.linalg.norm(vo_p))
|
||||
if not angs:
|
||||
return None
|
||||
s = sum(w * math.sin(x) for w, x in zip(ws, angs))
|
||||
c = sum(w * math.cos(x) for w, x in zip(ws, angs))
|
||||
return math.degrees(math.atan2(s, c))
|
||||
|
||||
|
||||
def _fit_single_block(fk, obs, block, var_type, cfg, state):
|
||||
if not HAVE_SCIPY:
|
||||
return
|
||||
bvars = block["vars"]
|
||||
bmarkers = [m for m in block["markers"] if m in obs]
|
||||
if not bvars or not bmarkers:
|
||||
return
|
||||
base = dict(state)
|
||||
|
||||
def fun(vec):
|
||||
return residual_vector(_state_from_vec(bvars, vec, base), fk, obs, bmarkers, cfg)
|
||||
|
||||
starts = [[0.0] * len(bvars)]
|
||||
if var_type.get(bvars[0]) == "revolute":
|
||||
starts = [[a0] + [0.0] * (len(bvars) - 1) for a0 in _multistart_values("revolute")]
|
||||
best, best_cost = None, float("inf")
|
||||
for s0 in starts:
|
||||
try:
|
||||
sol = least_squares(fun, np.array(s0, float), loss=cfg.get("robust_loss", "huber"),
|
||||
f_scale=float(cfg.get("huber_delta_mm", 8.0)), max_nfev=200 * max(1, len(bvars)))
|
||||
if sol.cost < best_cost:
|
||||
best_cost, best = sol.cost, sol.x
|
||||
except Exception:
|
||||
continue
|
||||
if best is not None:
|
||||
for name, val in zip(bvars, best):
|
||||
state[name] = float(val)
|
||||
|
||||
|
||||
# ==================================================================
|
||||
# Dispatch
|
||||
# ==================================================================
|
||||
|
||||
def observability(chain: Dict[str, Any], obs: Dict[int, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
|
||||
"""
|
||||
Per-variable confidence from how well its estimation block is determined.
|
||||
A block groups coupled variables (e.g. b,c,e on the fingers); confidence is
|
||||
driven by markers-per-variable in that block:
|
||||
high : >= 2 markers per variable (well over-determined)
|
||||
medium : >= 1 marker per variable
|
||||
low : fewer markers than variables (under-determined — distrust!)
|
||||
none : no markers at all (variable left at 0)
|
||||
"""
|
||||
info: Dict[str, Dict[str, Any]] = {}
|
||||
for block in chain["blocks"]:
|
||||
seen = [m for m in block["markers"] if m in obs]
|
||||
nvars = max(1, len(block["vars"]))
|
||||
ratio = len(seen) / nvars
|
||||
if len(seen) == 0:
|
||||
conf = "none"
|
||||
elif ratio >= 2.0:
|
||||
conf = "high"
|
||||
elif ratio >= 1.0:
|
||||
conf = "medium"
|
||||
else:
|
||||
conf = "low"
|
||||
for v in block["vars"]:
|
||||
info[v] = {"observable": len(seen) > 0, "n_markers": len(seen),
|
||||
"block_vars": len(block["vars"]), "confidence": conf,
|
||||
"block_anchor": block["anchor"]}
|
||||
return info
|
||||
|
||||
|
||||
def estimate_pose(fk: RobotFK, obs: Dict[int, Dict[str, Any]], cfg: Dict[str, Any]) -> Dict[str, Any]:
|
||||
chain = analyze_chain(fk)
|
||||
var_names = chain["ordered_vars"]
|
||||
method = str(cfg.get("method", "hybrid")).lower()
|
||||
obsv = observability(chain, obs)
|
||||
|
||||
if method == "sequential_vector":
|
||||
state = estimate_sequential_vector(fk, obs, chain, cfg)
|
||||
elif method == "sequential_fk":
|
||||
state = estimate_sequential_fk(fk, obs, chain, cfg)
|
||||
elif method == "global_ba":
|
||||
init = estimate_sequential_fk(fk, obs, chain, cfg) # cheap robust init
|
||||
state = estimate_global_ba(fk, obs, var_names, init, cfg)
|
||||
else: # hybrid (default)
|
||||
init = estimate_sequential_fk(fk, obs, chain, cfg)
|
||||
state = estimate_global_ba(fk, obs, var_names, init, cfg)
|
||||
|
||||
# final residual stats over all observed markers
|
||||
final_res = residual_vector(state, fk, obs, list(obs.keys()), cfg)
|
||||
rms = float(np.sqrt(np.mean(final_res ** 2))) if final_res.size else 0.0
|
||||
|
||||
return {"state": state, "method": method, "observability": obsv,
|
||||
"residual_rms": rms, "num_markers": len(obs)}
|
||||
|
||||
|
||||
# ==================================================================
|
||||
# CLI
|
||||
# ==================================================================
|
||||
|
||||
def main() -> None:
|
||||
ap = argparse.ArgumentParser(description="Estimate robot joint angles from marker poses")
|
||||
ap.add_argument("markers", help="aruco_marker_poses.json (corner_pose) or aruco_positions_*.json (center)")
|
||||
ap.add_argument("-robot", "--robot", required=True)
|
||||
ap.add_argument("-out", "--out", default=None)
|
||||
ap.add_argument("--method", default=None, help="override robot.json method")
|
||||
args = ap.parse_args()
|
||||
|
||||
robot_data = json.load(open(args.robot, "r", encoding="utf-8"))
|
||||
cfg = load_pose_cfg(robot_data)
|
||||
if args.method:
|
||||
cfg["method"] = args.method
|
||||
|
||||
fk = RobotFK(robot_data)
|
||||
obs = load_observations(args.markers, cfg.get("use_normals", True),
|
||||
int(cfg.get("min_cameras_per_marker", 2)))
|
||||
print(f"[INFO] method={cfg['method']} | observed markers={len(obs)} | use_normals={cfg.get('use_normals')}")
|
||||
|
||||
result = estimate_pose(fk, obs, cfg)
|
||||
st = result["state"]
|
||||
|
||||
print("\nEstimated joint values:")
|
||||
for v in ["x", "y", "z", "a", "b", "c", "e"]:
|
||||
ob = result["observability"].get(v, {})
|
||||
unit = "mm" if v in ("x", "e") else "deg"
|
||||
conf = ob.get("confidence", "?")
|
||||
tag = "" if ob.get("observable", False) else " [UNOBSERVABLE -> 0]"
|
||||
print(f" {v}: {st.get(v, 0.0):8.2f} {unit} (markers={ob.get('n_markers','?')}, conf={conf}){tag}")
|
||||
print(f"\n[INFO] residual RMS over {result['num_markers']} markers: {result['residual_rms']:.3f}")
|
||||
|
||||
out = {
|
||||
"schema_version": "1.0",
|
||||
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||||
"method": result["method"],
|
||||
"movements": {v: {"value": st.get(v, 0.0),
|
||||
"unit": "mm" if v in ("x", "e") else "deg",
|
||||
"observable": result["observability"].get(v, {}).get("observable", False),
|
||||
"confidence": result["observability"].get(v, {}).get("confidence", "none"),
|
||||
"n_markers": result["observability"].get(v, {}).get("n_markers", 0)}
|
||||
for v in ["x", "y", "z", "a", "b", "c", "e"]},
|
||||
"residual_rms": result["residual_rms"],
|
||||
"num_markers": result["num_markers"],
|
||||
}
|
||||
out_path = args.out or os.path.join(os.path.dirname(args.markers), "robot_state.json")
|
||||
json.dump(out, open(out_path, "w", encoding="utf-8"), indent=2)
|
||||
print(f"[INFO] wrote {out_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user