816 lines
36 KiB
Python
816 lines
36 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
pose_estimation.py
|
||
==================
|
||
Estimate robot joint angles (x, y, z, a, b, c, e) from triangulated marker
|
||
poses, using the kinematic model in robot.json (via robot_fk.py).
|
||
|
||
Design
|
||
------
|
||
The estimator is parametrised over JOINT VARIABLES, not links. This handles the
|
||
tricky cases of this robot family generically:
|
||
* Links with zero own markers (Base/x, Hand/b, Palm/c) — their variable is
|
||
observable only through descendant markers.
|
||
* A variable shared by several links (FingerA & FingerB share 'e').
|
||
* Occluded middle links — global BA reconstructs them from the fingers.
|
||
|
||
Four switchable methods (robot.json -> pose_estimation.method):
|
||
sequential_vector : analytic per joint from marker-pair / normal vectors (fast)
|
||
sequential_fk : block-wise least squares along the chain (robust, 1 marker ok)
|
||
global_ba : all variables jointly, position + normal residuals, robust loss
|
||
hybrid : sequential_fk init -> global_ba refine (default, most stable)
|
||
|
||
Observation input (robot.json -> pose_estimation.marker_observation):
|
||
"corner_pose" (default) -> aruco_marker_poses.json: 3 pos + 3 normal residuals/marker
|
||
"corner_points" -> aruco_marker_poses.json: 12 corner residuals for
|
||
robot-link markers (4 triangulated corners vs FK
|
||
corners; no separate normal), 1 center residual for
|
||
root-link (Board: floor/rail) markers whose spin is
|
||
uncalibrated. Robust loss acts per corner. Opt-in;
|
||
needs `corners_m`. Links via corner_point_links.
|
||
"center_point" -> aruco_positions_*.json: position only
|
||
|
||
Homing integration (appRobotHoming, see doc/Homing_5_Pose.md):
|
||
--from-state <json> seed/init state (flat {var: value}, or the
|
||
{"accumulated_state": {...}} shape written by
|
||
4b_revolute_angle.py) used as x0 for
|
||
global_ba/hybrid instead of the internal
|
||
estimate_sequential_fk() cold start. Missing
|
||
variables default to 0 and are estimated/flagged
|
||
normally. Without --from-state, behaviour is
|
||
unchanged (internal cold start, as before).
|
||
robot.json -> pose_estimation.fit_origin_link = "Arm1" one switch: the
|
||
named link's jointToParent.origin Y/Z is fit
|
||
TOGETHER WITH the normal pose in the same
|
||
global_ba solve (complements the geometric
|
||
multi-pose method in doc/Kalibrierung.md
|
||
Schritt [4]) and the result is adopted
|
||
automatically -- written back into robot.json
|
||
(surgical text patch, see patch_robot_json_origin()).
|
||
Off by default (key absent/null).
|
||
|
||
Unobservable joints (confidence "none") are written as value=null in the
|
||
output JSON — never a fabricated 0 (see movements.<var>.observable).
|
||
|
||
Both the engine (estimate_pose) and a CLI (main) live here.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import math
|
||
import os
|
||
import re
|
||
import sys
|
||
import time
|
||
from collections import defaultdict
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
|
||
import numpy as np
|
||
|
||
sys.path.insert(0, str(Path(__file__).parent))
|
||
from robot_fk import RobotFK, STATE_KEYS # noqa: E402
|
||
|
||
try:
|
||
from scipy.optimize import least_squares
|
||
HAVE_SCIPY = True
|
||
except ImportError:
|
||
HAVE_SCIPY = False
|
||
|
||
|
||
# ==================================================================
|
||
# Config
|
||
# ==================================================================
|
||
|
||
DEFAULT_CFG: Dict[str, Any] = {
|
||
"method": "hybrid",
|
||
"marker_observation": "corner_pose",
|
||
"use_normals": True,
|
||
"normal_weight": 100.0,
|
||
"use_marker_weight": False,
|
||
"robust_loss": "huber",
|
||
"huber_delta_mm": 8.0,
|
||
"max_iterations": 200,
|
||
"min_cameras_per_marker": 2,
|
||
"finger_block_joints": ["b", "c", "e"],
|
||
"per_link_method": {},
|
||
# Nur im marker_observation="corner_points"-Modus relevant: welche Links die
|
||
# 4 Eck-Residuen nutzen. None/absent = alle Nicht-Root-Links (= der Roboter);
|
||
# der Root-Link (Board mit Boden-/Rail-Markern) nutzt ein Center-Residuum.
|
||
# Hintergrund: nur die Roboter-Marker-Spins sind kalibriert/verifiziert; die
|
||
# Board/Rail-Spins nicht — deren Eckreihenfolge wäre unzuverlässig. Board ist
|
||
# zudem Root (Residuum konstant bzgl. der Gelenke). Explizite Liste möglich,
|
||
# z.B. ["Arm1","Ellbow","Arm2","Hand","Palm","FingerA","FingerB"].
|
||
"corner_point_links": None,
|
||
# One switch: if set to a link name (e.g. "Arm1"), that link's
|
||
# jointToParent.origin Y/Z is fit together with the normal pose (same
|
||
# global_ba solve) and the result is written back into robot.json
|
||
# automatically. None/absent = off, no behaviour change. See
|
||
# doc/Homing_5_Pose.md "Kalibrier-Switch".
|
||
"fit_origin_link": None,
|
||
}
|
||
|
||
|
||
def load_pose_cfg(robot_data: Dict[str, Any]) -> Dict[str, Any]:
|
||
cfg = dict(DEFAULT_CFG)
|
||
cfg.update(robot_data.get("pose_estimation", {}) or {})
|
||
return cfg
|
||
|
||
|
||
# ==================================================================
|
||
# Observations
|
||
# ==================================================================
|
||
|
||
def load_observations(path: str, use_normals: bool, min_cams: int = 2) -> Dict[int, Dict[str, Any]]:
|
||
"""
|
||
Load marker observations. Accepts aruco_marker_poses.json (with measured
|
||
normal + num_cameras + 4 triangulated corners) or aruco_positions_*.json
|
||
(position only).
|
||
Returns: marker_id -> {pos_mm:(3,), normal:(3,)|None, corners_mm:(4,3)|None,
|
||
link:str, n_cams:int, weight:float}
|
||
corners_mm (aus `corners_m`, m→mm) speist den marker_observation=
|
||
"corner_points"-Modus in residual_vector().
|
||
"""
|
||
data = json.load(open(path, "r", encoding="utf-8"))
|
||
out: Dict[int, Dict[str, Any]] = {}
|
||
for m in data.get("markers", []):
|
||
mid = int(m.get("marker_id", m.get("id", -1)))
|
||
if mid < 0:
|
||
continue
|
||
n_cams = int(m.get("num_cameras", 99))
|
||
if n_cams < min_cams:
|
||
continue
|
||
if "position_mm" in m:
|
||
pos = np.array(m["position_mm"], dtype=float)
|
||
elif "position_m" in m:
|
||
pos = np.array(m["position_m"], dtype=float) * 1000.0
|
||
else:
|
||
continue
|
||
nrm = None
|
||
if use_normals and m.get("normal") is not None:
|
||
nv = np.array(m["normal"], dtype=float)
|
||
nn = np.linalg.norm(nv)
|
||
if nn > 1e-9:
|
||
nrm = nv / nn
|
||
corners_mm = None
|
||
cm = m.get("corners_m")
|
||
if cm is not None:
|
||
arr = np.array(cm, dtype=float)
|
||
if arr.shape == (4, 3):
|
||
corners_mm = arr * 1000.0
|
||
out[mid] = {"pos_mm": pos, "normal": nrm, "corners_mm": corners_mm,
|
||
"link": m.get("link", "?"), "n_cams": n_cams,
|
||
"weight": float(m.get("weight", 1.0))}
|
||
return out
|
||
|
||
|
||
def load_seed_state(path: str) -> Dict[str, float]:
|
||
"""
|
||
Load a partial/full joint state to use as an optimisation seed (--from-state).
|
||
|
||
Accepts either a flat {variable: value} dict, or the
|
||
{"accumulated_state": {...}, ...} wrapper written by 4b_revolute_angle.py —
|
||
same unwrap rule as server/homingOrchestrator.js
|
||
(`stateData.accumulated_state ?? stateData`), so 4b's output files can be
|
||
passed in directly. Unknown keys are ignored; missing STATE_KEYS are simply
|
||
absent from the returned dict (caller defaults them, e.g. to 0.0).
|
||
"""
|
||
data = json.load(open(path, "r", encoding="utf-8"))
|
||
raw = data.get("accumulated_state", data) if isinstance(data, dict) else {}
|
||
return {k: float(v) for k, v in raw.items() if k in STATE_KEYS and v is not None}
|
||
|
||
|
||
# ==================================================================
|
||
# Kinematic chain analysis
|
||
# ==================================================================
|
||
|
||
def analyze_chain(fk: RobotFK) -> Dict[str, Any]:
|
||
"""
|
||
Derive, generically from the FK topology:
|
||
ordered_vars : movable joint variables, root->tip order, de-duplicated
|
||
var_links : variable -> list of links it drives
|
||
link_markers : link -> [model marker ids]
|
||
blocks : sequential estimation blocks; each block groups the
|
||
zero-marker ancestor variables with the next marker-
|
||
bearing joint variable, estimated from that link's own
|
||
markers (+ siblings sharing the same variable).
|
||
"""
|
||
links = fk.links
|
||
topo = fk._topo
|
||
|
||
link_markers: Dict[str, List[int]] = {}
|
||
for ln, ld in links.items():
|
||
ids = []
|
||
for mk in ld.get("markers", []) or []:
|
||
if "id" in mk and "position" in mk:
|
||
ids.append(int(mk["id"]))
|
||
link_markers[ln] = ids
|
||
|
||
link_var: Dict[str, str] = {}
|
||
for ln, ld in links.items():
|
||
j = ld.get("jointToParent", {}) or {}
|
||
if str(j.get("type", "")).lower() in ("revolute", "linear"):
|
||
v = str(j.get("variable", "")).lower()
|
||
if v:
|
||
link_var[ln] = v
|
||
|
||
var_type: Dict[str, str] = {}
|
||
var_links: Dict[str, List[str]] = defaultdict(list)
|
||
for ln, v in link_var.items():
|
||
var_links[v].append(ln)
|
||
var_type[v] = str(links[ln].get("jointToParent", {}).get("type", "")).lower()
|
||
|
||
ordered_vars: List[str] = []
|
||
for ln in topo:
|
||
if ln in link_var and link_var[ln] not in ordered_vars:
|
||
ordered_vars.append(link_var[ln])
|
||
|
||
# ---- build blocks ----
|
||
blocks: List[Dict[str, Any]] = []
|
||
var_block: Dict[str, int] = {}
|
||
pending: List[str] = []
|
||
for ln in topo:
|
||
if ln not in link_var:
|
||
continue
|
||
v = link_var[ln]
|
||
own = link_markers.get(ln, [])
|
||
if v in var_block:
|
||
# shared variable already in a block -> add this link's markers there
|
||
if own:
|
||
blocks[var_block[v]]["markers"].extend(own)
|
||
continue
|
||
if own:
|
||
bvars = []
|
||
for x in pending + [v]:
|
||
if x not in bvars and x not in var_block:
|
||
bvars.append(x)
|
||
blocks.append({"vars": bvars, "markers": list(own), "anchor": ln})
|
||
for x in bvars:
|
||
var_block[x] = len(blocks) - 1
|
||
pending = []
|
||
else:
|
||
if v not in pending:
|
||
pending.append(v)
|
||
if pending:
|
||
blocks.append({"vars": pending, "markers": [], "anchor": None})
|
||
for x in pending:
|
||
var_block[x] = len(blocks) - 1
|
||
|
||
# subtree_markers[L] = L's own markers + all descendants' markers. Lets
|
||
# observability() credit a block whose own link saw nothing this capture
|
||
# but whose CHILD link did (e.g. Ellbow has no visible markers, but Arm2's
|
||
# markers still constrain z through the chain — same idea as 4b's
|
||
# Fallback-1, just for confidence reporting here, not for the fit itself).
|
||
children: Dict[str, List[str]] = defaultdict(list)
|
||
for ln, ld in links.items():
|
||
p = ld.get("parent")
|
||
if p:
|
||
children[p].append(ln)
|
||
subtree_markers: Dict[str, List[int]] = {}
|
||
for ln in reversed(topo):
|
||
ids = list(link_markers.get(ln, []))
|
||
for c in children.get(ln, []):
|
||
ids.extend(subtree_markers.get(c, []))
|
||
subtree_markers[ln] = ids
|
||
|
||
return {
|
||
"ordered_vars": ordered_vars,
|
||
"var_type": var_type,
|
||
"var_links": dict(var_links),
|
||
"link_markers": link_markers,
|
||
"subtree_markers": subtree_markers,
|
||
"blocks": blocks,
|
||
}
|
||
|
||
|
||
# ==================================================================
|
||
# Residuals
|
||
# ==================================================================
|
||
|
||
def model_markers(fk: RobotFK, state: Dict[str, float]) -> Dict[int, Dict[str, np.ndarray]]:
|
||
T = fk.compute(state)
|
||
return fk.all_markers_world(T) # mid -> {world_mm, normal_world, link, local_mm}
|
||
|
||
|
||
def _resolve_corner_links(fk: RobotFK, cfg: Dict[str, Any]) -> set:
|
||
"""
|
||
Welche Links im "corner_points"-Modus die 4 Eck-Residuen nutzen.
|
||
Explizite Liste in cfg["corner_point_links"], sonst alle Nicht-Root-Links
|
||
(der Root-Link Board trägt die unkalibrierten Boden-/Rail-Marker und nutzt
|
||
ein Center-Residuum).
|
||
"""
|
||
explicit = cfg.get("corner_point_links")
|
||
if isinstance(explicit, list) and explicit:
|
||
return set(explicit)
|
||
links = fk.links
|
||
roots = {ln for ln, ld in links.items()
|
||
if not ld.get("parent") or ld.get("parent") not in links}
|
||
return set(links.keys()) - roots
|
||
|
||
|
||
def residual_vector(state: Dict[str, float], fk: RobotFK, obs: Dict[int, Dict[str, Any]],
|
||
marker_ids: List[int], cfg: Dict[str, Any]) -> np.ndarray:
|
||
"""
|
||
Residuen über die gegebenen Marker. Modus via pose_estimation.marker_observation:
|
||
|
||
"corner_pose" (Default): 3 Position (mm) + optional 3 Normale
|
||
(×normal_weight) je Marker — wie bisher.
|
||
"corner_points": 12 Eck-Residuen (4 Ecken × xyz, mm) NUR für Marker
|
||
auf den `corner_point_links` (z.B. Hand/Finger),
|
||
KEINE separate Normale (Orientierung steckt in den
|
||
Ecken). Alle übrigen Marker verhalten sich wie im
|
||
Default-Modus (Center + optionale Normale) — außer
|
||
dem Root-Link (Board: Boden-/Rail-Marker, Spin
|
||
unkalibriert), der nur Center bekommt ("ein Punkt
|
||
pro Marker"). So lassen sich Ecken gezielt für
|
||
Hand/Finger scharfschalten, ohne Arme/Board zu
|
||
verändern.
|
||
"""
|
||
model = model_markers(fk, state)
|
||
res: List[float] = []
|
||
use_mw = bool(cfg.get("use_marker_weight", False))
|
||
obs_mode = str(cfg.get("marker_observation", "corner_pose")).lower()
|
||
|
||
if obs_mode == "corner_points":
|
||
corner_links = _resolve_corner_links(fk, cfg)
|
||
roots = {ln for ln, ld in fk.links.items()
|
||
if not ld.get("parent") or ld.get("parent") not in fk.links}
|
||
w_n = float(cfg.get("normal_weight", 30.0))
|
||
use_n = bool(cfg.get("use_normals", True))
|
||
for mid in marker_ids:
|
||
if mid not in model or mid not in obs:
|
||
continue
|
||
mw = float(obs[mid].get("weight", 1.0)) if use_mw else 1.0
|
||
mm = model[mid]
|
||
link = mm.get("link")
|
||
oc = obs[mid].get("corners_mm")
|
||
mc = mm.get("corners_world")
|
||
if link in corner_links and oc is not None and mc is not None:
|
||
dc = (np.asarray(mc, float) - np.asarray(oc, float)) * mw # (4,3)
|
||
res.extend(dc.reshape(-1).tolist()) # 12 Werte
|
||
continue
|
||
# Nicht-Eck-Marker verhalten sich wie im Default-Modus: Center +
|
||
# optionale Normale — AUSSER auf dem Root-Link (Board: Boden-/Rail-
|
||
# Marker mit unkalibriertem Spin), der nur Center bekommt ("ein
|
||
# Punkt pro Marker"). So bleiben Arme/Board unverändert, wenn nur
|
||
# Hand/Finger über corner_point_links auf Ecken laufen.
|
||
dp = (np.asarray(mm["world_mm"], float) - obs[mid]["pos_mm"]) * mw
|
||
res.extend(dp.tolist())
|
||
if link not in roots and use_n and obs[mid]["normal"] is not None and "normal_world" in mm:
|
||
dn = (np.asarray(mm["normal_world"], float) - obs[mid]["normal"]) * w_n * mw
|
||
res.extend(dn.tolist())
|
||
return np.asarray(res, dtype=float)
|
||
|
||
# Default: Center (mm) + optionale Normale (skaliert)
|
||
w_n = float(cfg.get("normal_weight", 30.0))
|
||
use_n = bool(cfg.get("use_normals", True))
|
||
for mid in marker_ids:
|
||
if mid not in model or mid not in obs:
|
||
continue
|
||
mm = model[mid]
|
||
mw = float(obs[mid].get("weight", 1.0)) if use_mw else 1.0
|
||
dp = (np.asarray(mm["world_mm"], float) - obs[mid]["pos_mm"]) * mw
|
||
res.extend(dp.tolist())
|
||
if use_n and obs[mid]["normal"] is not None and "normal_world" in mm:
|
||
dn = (np.asarray(mm["normal_world"], float) - obs[mid]["normal"]) * w_n * mw
|
||
res.extend(dn.tolist())
|
||
return np.asarray(res, dtype=float)
|
||
|
||
|
||
def _state_from_vec(var_names: List[str], vec: np.ndarray, base: Dict[str, float]) -> Dict[str, float]:
|
||
s = dict(base)
|
||
for name, val in zip(var_names, vec):
|
||
s[name] = float(val)
|
||
return s
|
||
|
||
|
||
# ==================================================================
|
||
# Method: global bundle adjustment
|
||
# ==================================================================
|
||
|
||
def estimate_global_ba(fk: RobotFK, obs: Dict[int, Dict[str, Any]], var_names: List[str],
|
||
x0: Dict[str, float], cfg: Dict[str, Any]) -> Dict[str, float]:
|
||
if not HAVE_SCIPY:
|
||
print("[WARN] scipy missing — global_ba skipped, returning init")
|
||
return dict(x0)
|
||
marker_ids = list(obs.keys())
|
||
base = {k: 0.0 for k in STATE_KEYS}
|
||
base.update(x0)
|
||
|
||
# One switch (pose_estimation.fit_origin_link): also free that link's
|
||
# jointToParent.origin Y/Z as 2 extra parameters of THIS SAME solve, no
|
||
# separate pass. fk.links[...] is mutated in place -- compute() re-reads
|
||
# it fresh every call (robot_fk.py), so this takes effect immediately and
|
||
# is left adopted on success (main() writes it back into robot.json).
|
||
origin_link = cfg.get("fit_origin_link")
|
||
origin = fk.links.get(origin_link, {}).get("jointToParent", {}).get("origin") if origin_link else None
|
||
origin = origin if isinstance(origin, list) and len(origin) == 3 else None
|
||
origin_before = list(origin) if origin else None
|
||
n_state = len(var_names)
|
||
|
||
vec0 = np.array([base.get(v, 0.0) for v in var_names]
|
||
+ (origin_before[1:3] if origin else []), dtype=float)
|
||
|
||
def fun(vec):
|
||
st = _state_from_vec(var_names, vec[:n_state], base)
|
||
if origin:
|
||
origin[1], origin[2] = float(vec[n_state]), float(vec[n_state + 1])
|
||
return residual_vector(st, fk, obs, marker_ids, cfg)
|
||
|
||
loss = cfg.get("robust_loss", "huber")
|
||
f_scale = float(cfg.get("huber_delta_mm", 8.0))
|
||
try:
|
||
sol = least_squares(fun, vec0, loss=loss, f_scale=f_scale,
|
||
max_nfev=int(cfg.get("max_iterations", 200)) * max(1, len(vec0)))
|
||
state = _state_from_vec(var_names, sol.x[:n_state], base)
|
||
if origin:
|
||
origin[1], origin[2] = float(sol.x[n_state]), float(sol.x[n_state + 1])
|
||
print(f"[INFO] fit_origin_link={origin_link}: Y,Z {origin_before[1:3]} "
|
||
f"-> [{origin[1]:.3f}, {origin[2]:.3f}]")
|
||
return state
|
||
except Exception as exc:
|
||
print(f"[WARN] global_ba failed: {exc}")
|
||
if origin:
|
||
origin[1], origin[2] = origin_before[1], origin_before[2] # restore on failure
|
||
return dict(base)
|
||
|
||
|
||
# ==================================================================
|
||
# Method: sequential block-wise FK fit
|
||
# ==================================================================
|
||
|
||
def _multistart_values(vtype: str) -> List[float]:
|
||
# revolute: scan the circle to escape local minima at large angles
|
||
if vtype == "revolute":
|
||
return [0.0, 60.0, 120.0, 180.0, 240.0, 300.0]
|
||
return [0.0]
|
||
|
||
|
||
def estimate_sequential_fk(fk: RobotFK, obs: Dict[int, Dict[str, Any]], chain: Dict[str, Any],
|
||
cfg: Dict[str, Any], seed: Optional[Dict[str, float]] = None
|
||
) -> Dict[str, float]:
|
||
"""
|
||
Estimate block by block along the chain, freezing already-solved variables.
|
||
|
||
seed: optional partial/full state (e.g. from 4b_revolute_angle.py) to trust
|
||
as a starting point. A block is SKIPPED entirely (seed used as-is, no
|
||
re-fit) only if ALL of its variables are present in seed. Blocks with any
|
||
missing variable are still fit normally — including their own multi-start
|
||
— but using the seeded values of EARLIER blocks as fixed context instead
|
||
of 0. This keeps the local-minimum protection for whatever the seed does
|
||
NOT cover (see doc/Homing_5_Pose.md "Wichtige Einschraenkung"), while not
|
||
re-perturbing values the caller already trusts.
|
||
"""
|
||
state = {k: 0.0 for k in STATE_KEYS}
|
||
if seed:
|
||
state.update({k: v for k, v in seed.items() if k in STATE_KEYS})
|
||
var_type = chain["var_type"]
|
||
|
||
for block in chain["blocks"]:
|
||
bvars = block["vars"]
|
||
bmarkers = [m for m in block["markers"] if m in obs]
|
||
if not bvars:
|
||
continue
|
||
if seed and all(v in seed for v in bvars):
|
||
continue # fully seeded — trust it, don't re-fit
|
||
if not bmarkers:
|
||
# unobservable block: leave at seed/0, flag later
|
||
continue
|
||
|
||
if not HAVE_SCIPY:
|
||
continue
|
||
|
||
base = dict(state)
|
||
|
||
def fun(vec, _bvars=bvars, _bm=bmarkers, _base=base):
|
||
st = _state_from_vec(_bvars, vec, _base)
|
||
return residual_vector(st, fk, obs, _bm, cfg)
|
||
|
||
# multi-start over the first revolute variable in the block
|
||
starts = [[0.0] * len(bvars)]
|
||
lead_type = var_type.get(bvars[0], "linear")
|
||
if lead_type == "revolute":
|
||
starts = []
|
||
for a0 in _multistart_values("revolute"):
|
||
s = [0.0] * len(bvars)
|
||
s[0] = a0
|
||
starts.append(s)
|
||
|
||
best, best_cost = None, float("inf")
|
||
for s0 in starts:
|
||
try:
|
||
sol = least_squares(fun, np.array(s0, dtype=float),
|
||
loss=cfg.get("robust_loss", "huber"),
|
||
f_scale=float(cfg.get("huber_delta_mm", 8.0)),
|
||
max_nfev=200 * max(1, len(bvars)))
|
||
if sol.cost < best_cost:
|
||
best_cost, best = sol.cost, sol.x
|
||
except Exception:
|
||
continue
|
||
if best is not None:
|
||
for name, val in zip(bvars, best):
|
||
state[name] = float(val)
|
||
|
||
# wrap revolute angles to (-180, 180]
|
||
for v, vt in var_type.items():
|
||
if vt == "revolute":
|
||
state[v] = (state[v] + 180.0) % 360.0 - 180.0
|
||
return state
|
||
|
||
|
||
# ==================================================================
|
||
# Method: sequential analytic vector (per revolute joint)
|
||
# ==================================================================
|
||
|
||
def estimate_sequential_vector(fk: RobotFK, obs: Dict[int, Dict[str, Any]], chain: Dict[str, Any],
|
||
cfg: Dict[str, Any]) -> Dict[str, float]:
|
||
"""
|
||
Analytic angle from marker geometry where possible. For revolute joints with
|
||
>=2 markers on the link, use the perpendicular marker-pair vector. Falls back
|
||
to the FK block solver for linear / zero-marker / single-marker cases, so it
|
||
always returns a full state (still cheaper than full sequential_fk because
|
||
well-populated joints are solved in closed form).
|
||
"""
|
||
state = {k: 0.0 for k in STATE_KEYS}
|
||
var_type = chain["var_type"]
|
||
link_markers = chain["link_markers"]
|
||
var_links = chain["var_links"]
|
||
|
||
for block in chain["blocks"]:
|
||
bvars = block["vars"]
|
||
if len(bvars) == 1 and var_type.get(bvars[0]) == "revolute":
|
||
v = bvars[0]
|
||
ln = var_links[v][0]
|
||
mids = [m for m in link_markers.get(ln, []) if m in obs]
|
||
if len(mids) >= 2:
|
||
# model vectors must be expressed in the WORLD frame at angle=0
|
||
# (the link frame is already rotated by the parents y,z,...), so
|
||
# use FK marker world positions with this joint set to 0.
|
||
state_v0 = dict(state)
|
||
state_v0[v] = 0.0
|
||
model_v0 = model_markers(fk, state_v0)
|
||
axis_world = fk.joint_axis_world(ln, state_v0)
|
||
ang = _angle_from_pairs_world(mids, model_v0, obs, axis_world)
|
||
if ang is not None:
|
||
state[v] = ang
|
||
continue
|
||
# fallback: block FK fit for this single block
|
||
_fit_single_block(fk, obs, block, var_type, cfg, state)
|
||
|
||
for v, vt in var_type.items():
|
||
if vt == "revolute":
|
||
state[v] = (state[v] + 180.0) % 360.0 - 180.0
|
||
return state
|
||
|
||
|
||
def _angle_from_pairs_world(mids: List[int], model_v0: Dict[int, Dict[str, np.ndarray]],
|
||
obs: Dict[int, Dict[str, Any]], axis_world: np.ndarray) -> Optional[float]:
|
||
from itertools import combinations
|
||
a = np.asarray(axis_world, float)
|
||
a = a / (np.linalg.norm(a) + 1e-12)
|
||
angs, ws = [], []
|
||
for i, j in combinations(mids, 2):
|
||
if i not in model_v0 or j not in model_v0:
|
||
continue
|
||
vm = np.asarray(model_v0[j]["world_mm"], float) - np.asarray(model_v0[i]["world_mm"], float) # world @ angle 0
|
||
vo = obs[j]["pos_mm"] - obs[i]["pos_mm"] # observed vector (world, mm)
|
||
vm_p = vm - np.dot(vm, a) * a
|
||
vo_p = vo - np.dot(vo, a) * a
|
||
if np.linalg.norm(vm_p) < 5 or np.linalg.norm(vo_p) < 5:
|
||
continue
|
||
ang = math.atan2(float(np.dot(a, np.cross(vm_p, vo_p))), float(np.dot(vm_p, vo_p)))
|
||
angs.append(ang)
|
||
ws.append(np.linalg.norm(vm_p) * np.linalg.norm(vo_p))
|
||
if not angs:
|
||
return None
|
||
s = sum(w * math.sin(x) for w, x in zip(ws, angs))
|
||
c = sum(w * math.cos(x) for w, x in zip(ws, angs))
|
||
return math.degrees(math.atan2(s, c))
|
||
|
||
|
||
def _fit_single_block(fk, obs, block, var_type, cfg, state):
|
||
if not HAVE_SCIPY:
|
||
return
|
||
bvars = block["vars"]
|
||
bmarkers = [m for m in block["markers"] if m in obs]
|
||
if not bvars or not bmarkers:
|
||
return
|
||
base = dict(state)
|
||
|
||
def fun(vec):
|
||
return residual_vector(_state_from_vec(bvars, vec, base), fk, obs, bmarkers, cfg)
|
||
|
||
starts = [[0.0] * len(bvars)]
|
||
if var_type.get(bvars[0]) == "revolute":
|
||
starts = [[a0] + [0.0] * (len(bvars) - 1) for a0 in _multistart_values("revolute")]
|
||
best, best_cost = None, float("inf")
|
||
for s0 in starts:
|
||
try:
|
||
sol = least_squares(fun, np.array(s0, float), loss=cfg.get("robust_loss", "huber"),
|
||
f_scale=float(cfg.get("huber_delta_mm", 8.0)), max_nfev=200 * max(1, len(bvars)))
|
||
if sol.cost < best_cost:
|
||
best_cost, best = sol.cost, sol.x
|
||
except Exception:
|
||
continue
|
||
if best is not None:
|
||
for name, val in zip(bvars, best):
|
||
state[name] = float(val)
|
||
|
||
|
||
# ==================================================================
|
||
# Dispatch
|
||
# ==================================================================
|
||
|
||
def observability(chain: Dict[str, Any], obs: Dict[int, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
|
||
"""
|
||
Per-variable confidence from how well its estimation block is determined.
|
||
A block groups coupled variables (e.g. b,c,e on the fingers); confidence is
|
||
driven by markers-per-variable in that block:
|
||
high : >= 2 markers per variable (well over-determined)
|
||
medium : >= 1 marker per variable
|
||
low : fewer markers than variables (under-determined — distrust!),
|
||
OR no own markers seen but a child link's markers were
|
||
(indirect evidence through the chain, e.g. Ellbow via Arm2)
|
||
none : no markers at all, not even indirectly (variable left at 0)
|
||
"""
|
||
info: Dict[str, Dict[str, Any]] = {}
|
||
subtree_markers = chain.get("subtree_markers", {})
|
||
for block in chain["blocks"]:
|
||
seen = [m for m in block["markers"] if m in obs]
|
||
indirect = False
|
||
if not seen and block["anchor"]:
|
||
seen = [m for m in subtree_markers.get(block["anchor"], []) if m in obs]
|
||
indirect = bool(seen)
|
||
nvars = max(1, len(block["vars"]))
|
||
ratio = len(seen) / nvars
|
||
if len(seen) == 0:
|
||
conf = "none"
|
||
elif indirect:
|
||
conf = "low" # indirect/coupled through a child link, not direct
|
||
elif ratio >= 2.0:
|
||
conf = "high"
|
||
elif ratio >= 1.0:
|
||
conf = "medium"
|
||
else:
|
||
conf = "low"
|
||
for v in block["vars"]:
|
||
info[v] = {"observable": len(seen) > 0, "n_markers": len(seen),
|
||
"block_vars": len(block["vars"]), "confidence": conf,
|
||
"block_anchor": block["anchor"], "indirect": indirect}
|
||
return info
|
||
|
||
|
||
def estimate_pose(fk: RobotFK, obs: Dict[int, Dict[str, Any]], cfg: Dict[str, Any],
|
||
seed: Optional[Dict[str, float]] = None) -> Dict[str, Any]:
|
||
"""
|
||
seed: optional partial/full joint state (e.g. from load_seed_state(), the
|
||
4b_revolute_angle.py chain) to trust as a starting point for global_ba/
|
||
hybrid. Passed through to estimate_sequential_fk(), which skips re-fitting
|
||
any block that is FULLY covered by seed and otherwise still applies its
|
||
normal per-block multi-start — so variables the seed does NOT cover keep
|
||
the existing local-minimum protection (see doc/Homing_5_Pose.md "Wichtige
|
||
Einschraenkung") instead of silently defaulting to an unprotected 0.
|
||
sequential_vector ignores seed (no x0 input; left untouched on purpose —
|
||
it is the cheap analytic method, not the one this seeding targets).
|
||
"""
|
||
chain = analyze_chain(fk)
|
||
var_names = chain["ordered_vars"]
|
||
method = str(cfg.get("method", "hybrid")).lower()
|
||
obsv = observability(chain, obs)
|
||
|
||
if method == "sequential_vector":
|
||
state = estimate_sequential_vector(fk, obs, chain, cfg)
|
||
elif method == "sequential_fk":
|
||
state = estimate_sequential_fk(fk, obs, chain, cfg, seed=seed)
|
||
else: # global_ba / hybrid (default) — both use the same init->refine path
|
||
init = estimate_sequential_fk(fk, obs, chain, cfg, seed=seed)
|
||
state = estimate_global_ba(fk, obs, var_names, init, cfg)
|
||
|
||
# final residual stats over all observed markers
|
||
final_res = residual_vector(state, fk, obs, list(obs.keys()), cfg)
|
||
rms = float(np.sqrt(np.mean(final_res ** 2))) if final_res.size else 0.0
|
||
|
||
return {"state": state, "method": method, "observability": obsv,
|
||
"residual_rms": rms, "num_markers": len(obs)}
|
||
|
||
|
||
# ==================================================================
|
||
# CLI
|
||
# ==================================================================
|
||
|
||
def patch_robot_json_origin(robot_path: str, link_name: str, yz: Tuple[float, float]) -> bool:
|
||
"""
|
||
Surgically rewrite links.<link_name>.jointToParent.origin[1],[2] (Y,Z) in
|
||
the robot.json TEXT in place. robot.json has a hand-curated, compact
|
||
format (markers etc. one per line) -- a full json.load()+json.dump()
|
||
round-trip would reformat the whole file, so this only touches the one
|
||
"origin": [...] array belonging to <link_name> (X left untouched).
|
||
Returns True if a match was found and patched.
|
||
"""
|
||
with open(robot_path, "r", encoding="utf-8") as f:
|
||
text = f.read()
|
||
link_m = re.search(r'"%s"\s*:\s*\{' % re.escape(link_name), text)
|
||
if not link_m:
|
||
return False
|
||
origin_m = re.search(r'"origin"\s*:\s*\[([^\]]*)\]', text[link_m.end():])
|
||
if not origin_m:
|
||
return False
|
||
parts = [p.strip() for p in origin_m.group(1).split(",")]
|
||
if len(parts) != 3:
|
||
return False
|
||
parts[1] = f"{float(yz[0]):.4f}".rstrip("0").rstrip(".")
|
||
parts[2] = f"{float(yz[1]):.4f}".rstrip("0").rstrip(".")
|
||
new_array = f"[{parts[0]}, {parts[1]}, {parts[2]}]"
|
||
start = link_m.end() + origin_m.start()
|
||
end = link_m.end() + origin_m.end()
|
||
with open(robot_path, "w", encoding="utf-8") as f:
|
||
f.write(text[:start] + '"origin": ' + new_array + text[end:])
|
||
return True
|
||
|
||
|
||
def main() -> None:
|
||
ap = argparse.ArgumentParser(description="Estimate robot joint angles from marker poses")
|
||
ap.add_argument("markers", help="aruco_marker_poses.json (corner_pose) or aruco_positions_*.json (center)")
|
||
ap.add_argument("-robot", "--robot", required=True)
|
||
ap.add_argument("-out", "--out", default=None)
|
||
ap.add_argument("--method", default=None, help="override robot.json method")
|
||
ap.add_argument("--marker-observation", default=None, dest="marker_observation",
|
||
help="override robot.json marker_observation "
|
||
"(corner_pose | corner_points | center_point)")
|
||
ap.add_argument("--from-state", default=None, metavar="JSON",
|
||
help="Seed/init state (flat {var:value} or {accumulated_state:{...}} as "
|
||
"written by 4b_revolute_angle.py). Used as x0 for global_ba/hybrid "
|
||
"instead of the internal cold start. See doc/Homing_5_Pose.md.")
|
||
args = ap.parse_args()
|
||
|
||
robot_data = json.load(open(args.robot, "r", encoding="utf-8"))
|
||
cfg = load_pose_cfg(robot_data)
|
||
if args.method:
|
||
cfg["method"] = args.method
|
||
if args.marker_observation:
|
||
cfg["marker_observation"] = args.marker_observation
|
||
|
||
fk = RobotFK(robot_data)
|
||
obs = load_observations(args.markers, cfg.get("use_normals", True),
|
||
int(cfg.get("min_cameras_per_marker", 2)))
|
||
seed = load_seed_state(args.from_state) if args.from_state else None
|
||
print(f"[INFO] method={cfg['method']} | observed markers={len(obs)} | use_normals={cfg.get('use_normals')}"
|
||
+ (f" | seed={seed}" if seed else ""))
|
||
if cfg.get("fit_origin_link"):
|
||
print(f"[INFO] fit_origin_link={cfg['fit_origin_link']} -> robot.json wird bei Erfolg aktualisiert")
|
||
|
||
result = estimate_pose(fk, obs, cfg, seed=seed)
|
||
st = result["state"]
|
||
|
||
print("\nEstimated joint values:")
|
||
for v in ["x", "y", "z", "a", "b", "c", "e"]:
|
||
ob = result["observability"].get(v, {})
|
||
unit = "mm" if v in ("x", "e") else "deg"
|
||
conf = ob.get("confidence", "?")
|
||
tag = "" if ob.get("observable", False) else " [UNOBSERVABLE -> null]"
|
||
print(f" {v}: {st.get(v, 0.0):8.2f} {unit} (markers={ob.get('n_markers','?')}, conf={conf}){tag}")
|
||
print(f"\n[INFO] residual RMS over {result['num_markers']} markers: {result['residual_rms']:.3f}")
|
||
|
||
movements = {}
|
||
for v in ["x", "y", "z", "a", "b", "c", "e"]:
|
||
ob = result["observability"].get(v, {})
|
||
observable = ob.get("observable", False)
|
||
movements[v] = {
|
||
# Unobservable -> null, never a fabricated 0 (see module docstring).
|
||
"value": st.get(v, 0.0) if observable else None,
|
||
"unit": "mm" if v in ("x", "e") else "deg",
|
||
"observable": observable,
|
||
"confidence": ob.get("confidence", "none"),
|
||
"n_markers": ob.get("n_markers", 0),
|
||
}
|
||
out = {
|
||
"schema_version": "1.0",
|
||
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||
"method": result["method"],
|
||
"seeded": seed is not None,
|
||
"movements": movements,
|
||
"residual_rms": result["residual_rms"],
|
||
"num_markers": result["num_markers"],
|
||
}
|
||
out_path = args.out or os.path.join(os.path.dirname(args.markers), "robot_state.json")
|
||
json.dump(out, open(out_path, "w", encoding="utf-8"), indent=2)
|
||
print(f"[INFO] wrote {out_path}")
|
||
|
||
# "Uebernehmen": fit_origin_link's Y/Z is already adopted on `fk` (see
|
||
# estimate_global_ba) -- write it back into robot.json itself.
|
||
origin_link = cfg.get("fit_origin_link")
|
||
if origin_link:
|
||
origin = fk.links.get(origin_link, {}).get("jointToParent", {}).get("origin")
|
||
if isinstance(origin, list) and len(origin) == 3:
|
||
if patch_robot_json_origin(args.robot, origin_link, (origin[1], origin[2])):
|
||
print(f"[INFO] robot.json aktualisiert: {origin_link}.jointToParent.origin "
|
||
f"= [{origin[0]}, {origin[1]:.3f}, {origin[2]:.3f}]")
|
||
else:
|
||
print(f"[WARN] {origin_link}.jointToParent.origin in {args.robot} nicht gefunden — "
|
||
f"nicht aktualisiert (Wert nur in {out_path} sichtbar)")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|