Files
appRobotHoming/scripts/5_pose_estimation.py
2026-06-25 19:58:23 +02:00

816 lines
36 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
pose_estimation.py
==================
Estimate robot joint angles (x, y, z, a, b, c, e) from triangulated marker
poses, using the kinematic model in robot.json (via robot_fk.py).
Design
------
The estimator is parametrised over JOINT VARIABLES, not links. This handles the
tricky cases of this robot family generically:
* Links with zero own markers (Base/x, Hand/b, Palm/c) — their variable is
observable only through descendant markers.
* A variable shared by several links (FingerA & FingerB share 'e').
* Occluded middle links — global BA reconstructs them from the fingers.
Four switchable methods (robot.json -> pose_estimation.method):
sequential_vector : analytic per joint from marker-pair / normal vectors (fast)
sequential_fk : block-wise least squares along the chain (robust, 1 marker ok)
global_ba : all variables jointly, position + normal residuals, robust loss
hybrid : sequential_fk init -> global_ba refine (default, most stable)
Observation input (robot.json -> pose_estimation.marker_observation):
"corner_pose" (default) -> aruco_marker_poses.json: 3 pos + 3 normal residuals/marker
"corner_points" -> aruco_marker_poses.json: 12 corner residuals for
robot-link markers (4 triangulated corners vs FK
corners; no separate normal), 1 center residual for
root-link (Board: floor/rail) markers whose spin is
uncalibrated. Robust loss acts per corner. Opt-in;
needs `corners_m`. Links via corner_point_links.
"center_point" -> aruco_positions_*.json: position only
Homing integration (appRobotHoming, see doc/Homing_5_Pose.md):
--from-state <json> seed/init state (flat {var: value}, or the
{"accumulated_state": {...}} shape written by
4b_revolute_angle.py) used as x0 for
global_ba/hybrid instead of the internal
estimate_sequential_fk() cold start. Missing
variables default to 0 and are estimated/flagged
normally. Without --from-state, behaviour is
unchanged (internal cold start, as before).
robot.json -> pose_estimation.fit_origin_link = "Arm1" one switch: the
named link's jointToParent.origin Y/Z is fit
TOGETHER WITH the normal pose in the same
global_ba solve (complements the geometric
multi-pose method in doc/Kalibrierung.md
Schritt [4]) and the result is adopted
automatically -- written back into robot.json
(surgical text patch, see patch_robot_json_origin()).
Off by default (key absent/null).
Unobservable joints (confidence "none") are written as value=null in the
output JSON — never a fabricated 0 (see movements.<var>.observable).
Both the engine (estimate_pose) and a CLI (main) live here.
"""
from __future__ import annotations
import argparse
import json
import math
import os
import re
import sys
import time
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
sys.path.insert(0, str(Path(__file__).parent))
from robot_fk import RobotFK, STATE_KEYS # noqa: E402
try:
from scipy.optimize import least_squares
HAVE_SCIPY = True
except ImportError:
HAVE_SCIPY = False
# ==================================================================
# Config
# ==================================================================
DEFAULT_CFG: Dict[str, Any] = {
"method": "hybrid",
"marker_observation": "corner_pose",
"use_normals": True,
"normal_weight": 100.0,
"use_marker_weight": False,
"robust_loss": "huber",
"huber_delta_mm": 8.0,
"max_iterations": 200,
"min_cameras_per_marker": 2,
"finger_block_joints": ["b", "c", "e"],
"per_link_method": {},
# Nur im marker_observation="corner_points"-Modus relevant: welche Links die
# 4 Eck-Residuen nutzen. None/absent = alle Nicht-Root-Links (= der Roboter);
# der Root-Link (Board mit Boden-/Rail-Markern) nutzt ein Center-Residuum.
# Hintergrund: nur die Roboter-Marker-Spins sind kalibriert/verifiziert; die
# Board/Rail-Spins nicht — deren Eckreihenfolge wäre unzuverlässig. Board ist
# zudem Root (Residuum konstant bzgl. der Gelenke). Explizite Liste möglich,
# z.B. ["Arm1","Ellbow","Arm2","Hand","Palm","FingerA","FingerB"].
"corner_point_links": None,
# One switch: if set to a link name (e.g. "Arm1"), that link's
# jointToParent.origin Y/Z is fit together with the normal pose (same
# global_ba solve) and the result is written back into robot.json
# automatically. None/absent = off, no behaviour change. See
# doc/Homing_5_Pose.md "Kalibrier-Switch".
"fit_origin_link": None,
}
def load_pose_cfg(robot_data: Dict[str, Any]) -> Dict[str, Any]:
cfg = dict(DEFAULT_CFG)
cfg.update(robot_data.get("pose_estimation", {}) or {})
return cfg
# ==================================================================
# Observations
# ==================================================================
def load_observations(path: str, use_normals: bool, min_cams: int = 2) -> Dict[int, Dict[str, Any]]:
"""
Load marker observations. Accepts aruco_marker_poses.json (with measured
normal + num_cameras + 4 triangulated corners) or aruco_positions_*.json
(position only).
Returns: marker_id -> {pos_mm:(3,), normal:(3,)|None, corners_mm:(4,3)|None,
link:str, n_cams:int, weight:float}
corners_mm (aus `corners_m`, m→mm) speist den marker_observation=
"corner_points"-Modus in residual_vector().
"""
data = json.load(open(path, "r", encoding="utf-8"))
out: Dict[int, Dict[str, Any]] = {}
for m in data.get("markers", []):
mid = int(m.get("marker_id", m.get("id", -1)))
if mid < 0:
continue
n_cams = int(m.get("num_cameras", 99))
if n_cams < min_cams:
continue
if "position_mm" in m:
pos = np.array(m["position_mm"], dtype=float)
elif "position_m" in m:
pos = np.array(m["position_m"], dtype=float) * 1000.0
else:
continue
nrm = None
if use_normals and m.get("normal") is not None:
nv = np.array(m["normal"], dtype=float)
nn = np.linalg.norm(nv)
if nn > 1e-9:
nrm = nv / nn
corners_mm = None
cm = m.get("corners_m")
if cm is not None:
arr = np.array(cm, dtype=float)
if arr.shape == (4, 3):
corners_mm = arr * 1000.0
out[mid] = {"pos_mm": pos, "normal": nrm, "corners_mm": corners_mm,
"link": m.get("link", "?"), "n_cams": n_cams,
"weight": float(m.get("weight", 1.0))}
return out
def load_seed_state(path: str) -> Dict[str, float]:
"""
Load a partial/full joint state to use as an optimisation seed (--from-state).
Accepts either a flat {variable: value} dict, or the
{"accumulated_state": {...}, ...} wrapper written by 4b_revolute_angle.py —
same unwrap rule as server/homingOrchestrator.js
(`stateData.accumulated_state ?? stateData`), so 4b's output files can be
passed in directly. Unknown keys are ignored; missing STATE_KEYS are simply
absent from the returned dict (caller defaults them, e.g. to 0.0).
"""
data = json.load(open(path, "r", encoding="utf-8"))
raw = data.get("accumulated_state", data) if isinstance(data, dict) else {}
return {k: float(v) for k, v in raw.items() if k in STATE_KEYS and v is not None}
# ==================================================================
# Kinematic chain analysis
# ==================================================================
def analyze_chain(fk: RobotFK) -> Dict[str, Any]:
"""
Derive, generically from the FK topology:
ordered_vars : movable joint variables, root->tip order, de-duplicated
var_links : variable -> list of links it drives
link_markers : link -> [model marker ids]
blocks : sequential estimation blocks; each block groups the
zero-marker ancestor variables with the next marker-
bearing joint variable, estimated from that link's own
markers (+ siblings sharing the same variable).
"""
links = fk.links
topo = fk._topo
link_markers: Dict[str, List[int]] = {}
for ln, ld in links.items():
ids = []
for mk in ld.get("markers", []) or []:
if "id" in mk and "position" in mk:
ids.append(int(mk["id"]))
link_markers[ln] = ids
link_var: Dict[str, str] = {}
for ln, ld in links.items():
j = ld.get("jointToParent", {}) or {}
if str(j.get("type", "")).lower() in ("revolute", "linear"):
v = str(j.get("variable", "")).lower()
if v:
link_var[ln] = v
var_type: Dict[str, str] = {}
var_links: Dict[str, List[str]] = defaultdict(list)
for ln, v in link_var.items():
var_links[v].append(ln)
var_type[v] = str(links[ln].get("jointToParent", {}).get("type", "")).lower()
ordered_vars: List[str] = []
for ln in topo:
if ln in link_var and link_var[ln] not in ordered_vars:
ordered_vars.append(link_var[ln])
# ---- build blocks ----
blocks: List[Dict[str, Any]] = []
var_block: Dict[str, int] = {}
pending: List[str] = []
for ln in topo:
if ln not in link_var:
continue
v = link_var[ln]
own = link_markers.get(ln, [])
if v in var_block:
# shared variable already in a block -> add this link's markers there
if own:
blocks[var_block[v]]["markers"].extend(own)
continue
if own:
bvars = []
for x in pending + [v]:
if x not in bvars and x not in var_block:
bvars.append(x)
blocks.append({"vars": bvars, "markers": list(own), "anchor": ln})
for x in bvars:
var_block[x] = len(blocks) - 1
pending = []
else:
if v not in pending:
pending.append(v)
if pending:
blocks.append({"vars": pending, "markers": [], "anchor": None})
for x in pending:
var_block[x] = len(blocks) - 1
# subtree_markers[L] = L's own markers + all descendants' markers. Lets
# observability() credit a block whose own link saw nothing this capture
# but whose CHILD link did (e.g. Ellbow has no visible markers, but Arm2's
# markers still constrain z through the chain — same idea as 4b's
# Fallback-1, just for confidence reporting here, not for the fit itself).
children: Dict[str, List[str]] = defaultdict(list)
for ln, ld in links.items():
p = ld.get("parent")
if p:
children[p].append(ln)
subtree_markers: Dict[str, List[int]] = {}
for ln in reversed(topo):
ids = list(link_markers.get(ln, []))
for c in children.get(ln, []):
ids.extend(subtree_markers.get(c, []))
subtree_markers[ln] = ids
return {
"ordered_vars": ordered_vars,
"var_type": var_type,
"var_links": dict(var_links),
"link_markers": link_markers,
"subtree_markers": subtree_markers,
"blocks": blocks,
}
# ==================================================================
# Residuals
# ==================================================================
def model_markers(fk: RobotFK, state: Dict[str, float]) -> Dict[int, Dict[str, np.ndarray]]:
T = fk.compute(state)
return fk.all_markers_world(T) # mid -> {world_mm, normal_world, link, local_mm}
def _resolve_corner_links(fk: RobotFK, cfg: Dict[str, Any]) -> set:
"""
Welche Links im "corner_points"-Modus die 4 Eck-Residuen nutzen.
Explizite Liste in cfg["corner_point_links"], sonst alle Nicht-Root-Links
(der Root-Link Board trägt die unkalibrierten Boden-/Rail-Marker und nutzt
ein Center-Residuum).
"""
explicit = cfg.get("corner_point_links")
if isinstance(explicit, list) and explicit:
return set(explicit)
links = fk.links
roots = {ln for ln, ld in links.items()
if not ld.get("parent") or ld.get("parent") not in links}
return set(links.keys()) - roots
def residual_vector(state: Dict[str, float], fk: RobotFK, obs: Dict[int, Dict[str, Any]],
marker_ids: List[int], cfg: Dict[str, Any]) -> np.ndarray:
"""
Residuen über die gegebenen Marker. Modus via pose_estimation.marker_observation:
"corner_pose" (Default): 3 Position (mm) + optional 3 Normale
(×normal_weight) je Marker — wie bisher.
"corner_points": 12 Eck-Residuen (4 Ecken × xyz, mm) NUR für Marker
auf den `corner_point_links` (z.B. Hand/Finger),
KEINE separate Normale (Orientierung steckt in den
Ecken). Alle übrigen Marker verhalten sich wie im
Default-Modus (Center + optionale Normale) — außer
dem Root-Link (Board: Boden-/Rail-Marker, Spin
unkalibriert), der nur Center bekommt ("ein Punkt
pro Marker"). So lassen sich Ecken gezielt für
Hand/Finger scharfschalten, ohne Arme/Board zu
verändern.
"""
model = model_markers(fk, state)
res: List[float] = []
use_mw = bool(cfg.get("use_marker_weight", False))
obs_mode = str(cfg.get("marker_observation", "corner_pose")).lower()
if obs_mode == "corner_points":
corner_links = _resolve_corner_links(fk, cfg)
roots = {ln for ln, ld in fk.links.items()
if not ld.get("parent") or ld.get("parent") not in fk.links}
w_n = float(cfg.get("normal_weight", 30.0))
use_n = bool(cfg.get("use_normals", True))
for mid in marker_ids:
if mid not in model or mid not in obs:
continue
mw = float(obs[mid].get("weight", 1.0)) if use_mw else 1.0
mm = model[mid]
link = mm.get("link")
oc = obs[mid].get("corners_mm")
mc = mm.get("corners_world")
if link in corner_links and oc is not None and mc is not None:
dc = (np.asarray(mc, float) - np.asarray(oc, float)) * mw # (4,3)
res.extend(dc.reshape(-1).tolist()) # 12 Werte
continue
# Nicht-Eck-Marker verhalten sich wie im Default-Modus: Center +
# optionale Normale — AUSSER auf dem Root-Link (Board: Boden-/Rail-
# Marker mit unkalibriertem Spin), der nur Center bekommt ("ein
# Punkt pro Marker"). So bleiben Arme/Board unverändert, wenn nur
# Hand/Finger über corner_point_links auf Ecken laufen.
dp = (np.asarray(mm["world_mm"], float) - obs[mid]["pos_mm"]) * mw
res.extend(dp.tolist())
if link not in roots and use_n and obs[mid]["normal"] is not None and "normal_world" in mm:
dn = (np.asarray(mm["normal_world"], float) - obs[mid]["normal"]) * w_n * mw
res.extend(dn.tolist())
return np.asarray(res, dtype=float)
# Default: Center (mm) + optionale Normale (skaliert)
w_n = float(cfg.get("normal_weight", 30.0))
use_n = bool(cfg.get("use_normals", True))
for mid in marker_ids:
if mid not in model or mid not in obs:
continue
mm = model[mid]
mw = float(obs[mid].get("weight", 1.0)) if use_mw else 1.0
dp = (np.asarray(mm["world_mm"], float) - obs[mid]["pos_mm"]) * mw
res.extend(dp.tolist())
if use_n and obs[mid]["normal"] is not None and "normal_world" in mm:
dn = (np.asarray(mm["normal_world"], float) - obs[mid]["normal"]) * w_n * mw
res.extend(dn.tolist())
return np.asarray(res, dtype=float)
def _state_from_vec(var_names: List[str], vec: np.ndarray, base: Dict[str, float]) -> Dict[str, float]:
s = dict(base)
for name, val in zip(var_names, vec):
s[name] = float(val)
return s
# ==================================================================
# Method: global bundle adjustment
# ==================================================================
def estimate_global_ba(fk: RobotFK, obs: Dict[int, Dict[str, Any]], var_names: List[str],
x0: Dict[str, float], cfg: Dict[str, Any]) -> Dict[str, float]:
if not HAVE_SCIPY:
print("[WARN] scipy missing — global_ba skipped, returning init")
return dict(x0)
marker_ids = list(obs.keys())
base = {k: 0.0 for k in STATE_KEYS}
base.update(x0)
# One switch (pose_estimation.fit_origin_link): also free that link's
# jointToParent.origin Y/Z as 2 extra parameters of THIS SAME solve, no
# separate pass. fk.links[...] is mutated in place -- compute() re-reads
# it fresh every call (robot_fk.py), so this takes effect immediately and
# is left adopted on success (main() writes it back into robot.json).
origin_link = cfg.get("fit_origin_link")
origin = fk.links.get(origin_link, {}).get("jointToParent", {}).get("origin") if origin_link else None
origin = origin if isinstance(origin, list) and len(origin) == 3 else None
origin_before = list(origin) if origin else None
n_state = len(var_names)
vec0 = np.array([base.get(v, 0.0) for v in var_names]
+ (origin_before[1:3] if origin else []), dtype=float)
def fun(vec):
st = _state_from_vec(var_names, vec[:n_state], base)
if origin:
origin[1], origin[2] = float(vec[n_state]), float(vec[n_state + 1])
return residual_vector(st, fk, obs, marker_ids, cfg)
loss = cfg.get("robust_loss", "huber")
f_scale = float(cfg.get("huber_delta_mm", 8.0))
try:
sol = least_squares(fun, vec0, loss=loss, f_scale=f_scale,
max_nfev=int(cfg.get("max_iterations", 200)) * max(1, len(vec0)))
state = _state_from_vec(var_names, sol.x[:n_state], base)
if origin:
origin[1], origin[2] = float(sol.x[n_state]), float(sol.x[n_state + 1])
print(f"[INFO] fit_origin_link={origin_link}: Y,Z {origin_before[1:3]} "
f"-> [{origin[1]:.3f}, {origin[2]:.3f}]")
return state
except Exception as exc:
print(f"[WARN] global_ba failed: {exc}")
if origin:
origin[1], origin[2] = origin_before[1], origin_before[2] # restore on failure
return dict(base)
# ==================================================================
# Method: sequential block-wise FK fit
# ==================================================================
def _multistart_values(vtype: str) -> List[float]:
# revolute: scan the circle to escape local minima at large angles
if vtype == "revolute":
return [0.0, 60.0, 120.0, 180.0, 240.0, 300.0]
return [0.0]
def estimate_sequential_fk(fk: RobotFK, obs: Dict[int, Dict[str, Any]], chain: Dict[str, Any],
cfg: Dict[str, Any], seed: Optional[Dict[str, float]] = None
) -> Dict[str, float]:
"""
Estimate block by block along the chain, freezing already-solved variables.
seed: optional partial/full state (e.g. from 4b_revolute_angle.py) to trust
as a starting point. A block is SKIPPED entirely (seed used as-is, no
re-fit) only if ALL of its variables are present in seed. Blocks with any
missing variable are still fit normally — including their own multi-start
— but using the seeded values of EARLIER blocks as fixed context instead
of 0. This keeps the local-minimum protection for whatever the seed does
NOT cover (see doc/Homing_5_Pose.md "Wichtige Einschraenkung"), while not
re-perturbing values the caller already trusts.
"""
state = {k: 0.0 for k in STATE_KEYS}
if seed:
state.update({k: v for k, v in seed.items() if k in STATE_KEYS})
var_type = chain["var_type"]
for block in chain["blocks"]:
bvars = block["vars"]
bmarkers = [m for m in block["markers"] if m in obs]
if not bvars:
continue
if seed and all(v in seed for v in bvars):
continue # fully seeded — trust it, don't re-fit
if not bmarkers:
# unobservable block: leave at seed/0, flag later
continue
if not HAVE_SCIPY:
continue
base = dict(state)
def fun(vec, _bvars=bvars, _bm=bmarkers, _base=base):
st = _state_from_vec(_bvars, vec, _base)
return residual_vector(st, fk, obs, _bm, cfg)
# multi-start over the first revolute variable in the block
starts = [[0.0] * len(bvars)]
lead_type = var_type.get(bvars[0], "linear")
if lead_type == "revolute":
starts = []
for a0 in _multistart_values("revolute"):
s = [0.0] * len(bvars)
s[0] = a0
starts.append(s)
best, best_cost = None, float("inf")
for s0 in starts:
try:
sol = least_squares(fun, np.array(s0, dtype=float),
loss=cfg.get("robust_loss", "huber"),
f_scale=float(cfg.get("huber_delta_mm", 8.0)),
max_nfev=200 * max(1, len(bvars)))
if sol.cost < best_cost:
best_cost, best = sol.cost, sol.x
except Exception:
continue
if best is not None:
for name, val in zip(bvars, best):
state[name] = float(val)
# wrap revolute angles to (-180, 180]
for v, vt in var_type.items():
if vt == "revolute":
state[v] = (state[v] + 180.0) % 360.0 - 180.0
return state
# ==================================================================
# Method: sequential analytic vector (per revolute joint)
# ==================================================================
def estimate_sequential_vector(fk: RobotFK, obs: Dict[int, Dict[str, Any]], chain: Dict[str, Any],
cfg: Dict[str, Any]) -> Dict[str, float]:
"""
Analytic angle from marker geometry where possible. For revolute joints with
>=2 markers on the link, use the perpendicular marker-pair vector. Falls back
to the FK block solver for linear / zero-marker / single-marker cases, so it
always returns a full state (still cheaper than full sequential_fk because
well-populated joints are solved in closed form).
"""
state = {k: 0.0 for k in STATE_KEYS}
var_type = chain["var_type"]
link_markers = chain["link_markers"]
var_links = chain["var_links"]
for block in chain["blocks"]:
bvars = block["vars"]
if len(bvars) == 1 and var_type.get(bvars[0]) == "revolute":
v = bvars[0]
ln = var_links[v][0]
mids = [m for m in link_markers.get(ln, []) if m in obs]
if len(mids) >= 2:
# model vectors must be expressed in the WORLD frame at angle=0
# (the link frame is already rotated by the parents y,z,...), so
# use FK marker world positions with this joint set to 0.
state_v0 = dict(state)
state_v0[v] = 0.0
model_v0 = model_markers(fk, state_v0)
axis_world = fk.joint_axis_world(ln, state_v0)
ang = _angle_from_pairs_world(mids, model_v0, obs, axis_world)
if ang is not None:
state[v] = ang
continue
# fallback: block FK fit for this single block
_fit_single_block(fk, obs, block, var_type, cfg, state)
for v, vt in var_type.items():
if vt == "revolute":
state[v] = (state[v] + 180.0) % 360.0 - 180.0
return state
def _angle_from_pairs_world(mids: List[int], model_v0: Dict[int, Dict[str, np.ndarray]],
obs: Dict[int, Dict[str, Any]], axis_world: np.ndarray) -> Optional[float]:
from itertools import combinations
a = np.asarray(axis_world, float)
a = a / (np.linalg.norm(a) + 1e-12)
angs, ws = [], []
for i, j in combinations(mids, 2):
if i not in model_v0 or j not in model_v0:
continue
vm = np.asarray(model_v0[j]["world_mm"], float) - np.asarray(model_v0[i]["world_mm"], float) # world @ angle 0
vo = obs[j]["pos_mm"] - obs[i]["pos_mm"] # observed vector (world, mm)
vm_p = vm - np.dot(vm, a) * a
vo_p = vo - np.dot(vo, a) * a
if np.linalg.norm(vm_p) < 5 or np.linalg.norm(vo_p) < 5:
continue
ang = math.atan2(float(np.dot(a, np.cross(vm_p, vo_p))), float(np.dot(vm_p, vo_p)))
angs.append(ang)
ws.append(np.linalg.norm(vm_p) * np.linalg.norm(vo_p))
if not angs:
return None
s = sum(w * math.sin(x) for w, x in zip(ws, angs))
c = sum(w * math.cos(x) for w, x in zip(ws, angs))
return math.degrees(math.atan2(s, c))
def _fit_single_block(fk, obs, block, var_type, cfg, state):
if not HAVE_SCIPY:
return
bvars = block["vars"]
bmarkers = [m for m in block["markers"] if m in obs]
if not bvars or not bmarkers:
return
base = dict(state)
def fun(vec):
return residual_vector(_state_from_vec(bvars, vec, base), fk, obs, bmarkers, cfg)
starts = [[0.0] * len(bvars)]
if var_type.get(bvars[0]) == "revolute":
starts = [[a0] + [0.0] * (len(bvars) - 1) for a0 in _multistart_values("revolute")]
best, best_cost = None, float("inf")
for s0 in starts:
try:
sol = least_squares(fun, np.array(s0, float), loss=cfg.get("robust_loss", "huber"),
f_scale=float(cfg.get("huber_delta_mm", 8.0)), max_nfev=200 * max(1, len(bvars)))
if sol.cost < best_cost:
best_cost, best = sol.cost, sol.x
except Exception:
continue
if best is not None:
for name, val in zip(bvars, best):
state[name] = float(val)
# ==================================================================
# Dispatch
# ==================================================================
def observability(chain: Dict[str, Any], obs: Dict[int, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""
Per-variable confidence from how well its estimation block is determined.
A block groups coupled variables (e.g. b,c,e on the fingers); confidence is
driven by markers-per-variable in that block:
high : >= 2 markers per variable (well over-determined)
medium : >= 1 marker per variable
low : fewer markers than variables (under-determined — distrust!),
OR no own markers seen but a child link's markers were
(indirect evidence through the chain, e.g. Ellbow via Arm2)
none : no markers at all, not even indirectly (variable left at 0)
"""
info: Dict[str, Dict[str, Any]] = {}
subtree_markers = chain.get("subtree_markers", {})
for block in chain["blocks"]:
seen = [m for m in block["markers"] if m in obs]
indirect = False
if not seen and block["anchor"]:
seen = [m for m in subtree_markers.get(block["anchor"], []) if m in obs]
indirect = bool(seen)
nvars = max(1, len(block["vars"]))
ratio = len(seen) / nvars
if len(seen) == 0:
conf = "none"
elif indirect:
conf = "low" # indirect/coupled through a child link, not direct
elif ratio >= 2.0:
conf = "high"
elif ratio >= 1.0:
conf = "medium"
else:
conf = "low"
for v in block["vars"]:
info[v] = {"observable": len(seen) > 0, "n_markers": len(seen),
"block_vars": len(block["vars"]), "confidence": conf,
"block_anchor": block["anchor"], "indirect": indirect}
return info
def estimate_pose(fk: RobotFK, obs: Dict[int, Dict[str, Any]], cfg: Dict[str, Any],
seed: Optional[Dict[str, float]] = None) -> Dict[str, Any]:
"""
seed: optional partial/full joint state (e.g. from load_seed_state(), the
4b_revolute_angle.py chain) to trust as a starting point for global_ba/
hybrid. Passed through to estimate_sequential_fk(), which skips re-fitting
any block that is FULLY covered by seed and otherwise still applies its
normal per-block multi-start — so variables the seed does NOT cover keep
the existing local-minimum protection (see doc/Homing_5_Pose.md "Wichtige
Einschraenkung") instead of silently defaulting to an unprotected 0.
sequential_vector ignores seed (no x0 input; left untouched on purpose —
it is the cheap analytic method, not the one this seeding targets).
"""
chain = analyze_chain(fk)
var_names = chain["ordered_vars"]
method = str(cfg.get("method", "hybrid")).lower()
obsv = observability(chain, obs)
if method == "sequential_vector":
state = estimate_sequential_vector(fk, obs, chain, cfg)
elif method == "sequential_fk":
state = estimate_sequential_fk(fk, obs, chain, cfg, seed=seed)
else: # global_ba / hybrid (default) — both use the same init->refine path
init = estimate_sequential_fk(fk, obs, chain, cfg, seed=seed)
state = estimate_global_ba(fk, obs, var_names, init, cfg)
# final residual stats over all observed markers
final_res = residual_vector(state, fk, obs, list(obs.keys()), cfg)
rms = float(np.sqrt(np.mean(final_res ** 2))) if final_res.size else 0.0
return {"state": state, "method": method, "observability": obsv,
"residual_rms": rms, "num_markers": len(obs)}
# ==================================================================
# CLI
# ==================================================================
def patch_robot_json_origin(robot_path: str, link_name: str, yz: Tuple[float, float]) -> bool:
"""
Surgically rewrite links.<link_name>.jointToParent.origin[1],[2] (Y,Z) in
the robot.json TEXT in place. robot.json has a hand-curated, compact
format (markers etc. one per line) -- a full json.load()+json.dump()
round-trip would reformat the whole file, so this only touches the one
"origin": [...] array belonging to <link_name> (X left untouched).
Returns True if a match was found and patched.
"""
with open(robot_path, "r", encoding="utf-8") as f:
text = f.read()
link_m = re.search(r'"%s"\s*:\s*\{' % re.escape(link_name), text)
if not link_m:
return False
origin_m = re.search(r'"origin"\s*:\s*\[([^\]]*)\]', text[link_m.end():])
if not origin_m:
return False
parts = [p.strip() for p in origin_m.group(1).split(",")]
if len(parts) != 3:
return False
parts[1] = f"{float(yz[0]):.4f}".rstrip("0").rstrip(".")
parts[2] = f"{float(yz[1]):.4f}".rstrip("0").rstrip(".")
new_array = f"[{parts[0]}, {parts[1]}, {parts[2]}]"
start = link_m.end() + origin_m.start()
end = link_m.end() + origin_m.end()
with open(robot_path, "w", encoding="utf-8") as f:
f.write(text[:start] + '"origin": ' + new_array + text[end:])
return True
def main() -> None:
ap = argparse.ArgumentParser(description="Estimate robot joint angles from marker poses")
ap.add_argument("markers", help="aruco_marker_poses.json (corner_pose) or aruco_positions_*.json (center)")
ap.add_argument("-robot", "--robot", required=True)
ap.add_argument("-out", "--out", default=None)
ap.add_argument("--method", default=None, help="override robot.json method")
ap.add_argument("--marker-observation", default=None, dest="marker_observation",
help="override robot.json marker_observation "
"(corner_pose | corner_points | center_point)")
ap.add_argument("--from-state", default=None, metavar="JSON",
help="Seed/init state (flat {var:value} or {accumulated_state:{...}} as "
"written by 4b_revolute_angle.py). Used as x0 for global_ba/hybrid "
"instead of the internal cold start. See doc/Homing_5_Pose.md.")
args = ap.parse_args()
robot_data = json.load(open(args.robot, "r", encoding="utf-8"))
cfg = load_pose_cfg(robot_data)
if args.method:
cfg["method"] = args.method
if args.marker_observation:
cfg["marker_observation"] = args.marker_observation
fk = RobotFK(robot_data)
obs = load_observations(args.markers, cfg.get("use_normals", True),
int(cfg.get("min_cameras_per_marker", 2)))
seed = load_seed_state(args.from_state) if args.from_state else None
print(f"[INFO] method={cfg['method']} | observed markers={len(obs)} | use_normals={cfg.get('use_normals')}"
+ (f" | seed={seed}" if seed else ""))
if cfg.get("fit_origin_link"):
print(f"[INFO] fit_origin_link={cfg['fit_origin_link']} -> robot.json wird bei Erfolg aktualisiert")
result = estimate_pose(fk, obs, cfg, seed=seed)
st = result["state"]
print("\nEstimated joint values:")
for v in ["x", "y", "z", "a", "b", "c", "e"]:
ob = result["observability"].get(v, {})
unit = "mm" if v in ("x", "e") else "deg"
conf = ob.get("confidence", "?")
tag = "" if ob.get("observable", False) else " [UNOBSERVABLE -> null]"
print(f" {v}: {st.get(v, 0.0):8.2f} {unit} (markers={ob.get('n_markers','?')}, conf={conf}){tag}")
print(f"\n[INFO] residual RMS over {result['num_markers']} markers: {result['residual_rms']:.3f}")
movements = {}
for v in ["x", "y", "z", "a", "b", "c", "e"]:
ob = result["observability"].get(v, {})
observable = ob.get("observable", False)
movements[v] = {
# Unobservable -> null, never a fabricated 0 (see module docstring).
"value": st.get(v, 0.0) if observable else None,
"unit": "mm" if v in ("x", "e") else "deg",
"observable": observable,
"confidence": ob.get("confidence", "none"),
"n_markers": ob.get("n_markers", 0),
}
out = {
"schema_version": "1.0",
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"method": result["method"],
"seeded": seed is not None,
"movements": movements,
"residual_rms": result["residual_rms"],
"num_markers": result["num_markers"],
}
out_path = args.out or os.path.join(os.path.dirname(args.markers), "robot_state.json")
json.dump(out, open(out_path, "w", encoding="utf-8"), indent=2)
print(f"[INFO] wrote {out_path}")
# "Uebernehmen": fit_origin_link's Y/Z is already adopted on `fk` (see
# estimate_global_ba) -- write it back into robot.json itself.
origin_link = cfg.get("fit_origin_link")
if origin_link:
origin = fk.links.get(origin_link, {}).get("jointToParent", {}).get("origin")
if isinstance(origin, list) and len(origin) == 3:
if patch_robot_json_origin(args.robot, origin_link, (origin[1], origin[2])):
print(f"[INFO] robot.json aktualisiert: {origin_link}.jointToParent.origin "
f"= [{origin[0]}, {origin[1]:.3f}, {origin[2]:.3f}]")
else:
print(f"[WARN] {origin_link}.jointToParent.origin in {args.robot} nicht gefunden — "
f"nicht aktualisiert (Wert nur in {out_path} sichtbar)")
if __name__ == "__main__":
main()