760 lines
32 KiB
Python
760 lines
32 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
pose_estimation.py
|
|
==================
|
|
Estimate robot joint angles (x, y, z, a, b, c, e) from triangulated marker
|
|
poses, using the kinematic model in robot.json (via robot_fk.py).
|
|
|
|
Design
|
|
------
|
|
The estimator is parametrised over JOINT VARIABLES, not links. This handles the
|
|
tricky cases of this robot family generically:
|
|
* Links with zero own markers (Base/x, Hand/b, Palm/c) — their variable is
|
|
observable only through descendant markers.
|
|
* A variable shared by several links (FingerA & FingerB share 'e').
|
|
* Occluded middle links — global BA reconstructs them from the fingers.
|
|
|
|
Four switchable methods (robot.json -> pose_estimation.method):
|
|
sequential_vector : analytic per joint from marker-pair / normal vectors (fast)
|
|
sequential_fk : block-wise least squares along the chain (robust, 1 marker ok)
|
|
global_ba : all variables jointly, position + normal residuals, robust loss
|
|
hybrid : sequential_fk init -> global_ba refine (default, most stable)
|
|
|
|
Observation input:
|
|
marker_observation = "corner_pose" -> aruco_marker_poses.json (pos + measured normal)
|
|
marker_observation = "center_point" -> aruco_positions_*.json (pos only)
|
|
|
|
Homing integration (appRobotHoming, see doc/Homing_5_Pose.md):
|
|
--from-state <json> seed/init state (flat {var: value}, or the
|
|
{"accumulated_state": {...}} shape written by
|
|
4b_revolute_angle.py) used as x0 for
|
|
global_ba/hybrid instead of the internal
|
|
estimate_sequential_fk() cold start. Missing
|
|
variables default to 0 and are estimated/flagged
|
|
normally. Without --from-state, behaviour is
|
|
unchanged (internal cold start, as before).
|
|
--calibrate-origin <Link> special mode: instead of estimating the full
|
|
pose, fit <Link>'s own joint value TOGETHER WITH
|
|
its jointToParent.origin Y/Z from that link's own
|
|
markers (complements the geometric multi-pose
|
|
method in doc/Kalibrierung.md Schritt [4]).
|
|
Writes a *_origin_calibration.json report; never
|
|
modifies robot.json.
|
|
|
|
Unobservable joints (confidence "none") are written as value=null in the
|
|
output JSON — never a fabricated 0 (see movements.<var>.observable).
|
|
|
|
Both the engine (estimate_pose) and a CLI (main) live here.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
import os
|
|
import sys
|
|
import time
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import numpy as np
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from robot_fk import RobotFK, STATE_KEYS # noqa: E402
|
|
|
|
try:
|
|
from scipy.optimize import least_squares
|
|
HAVE_SCIPY = True
|
|
except ImportError:
|
|
HAVE_SCIPY = False
|
|
|
|
|
|
# ==================================================================
|
|
# Config
|
|
# ==================================================================
|
|
|
|
DEFAULT_CFG: Dict[str, Any] = {
|
|
"method": "hybrid",
|
|
"marker_observation": "corner_pose",
|
|
"use_normals": True,
|
|
"normal_weight": 100.0,
|
|
"robust_loss": "huber",
|
|
"huber_delta_mm": 8.0,
|
|
"max_iterations": 200,
|
|
"min_cameras_per_marker": 2,
|
|
"finger_block_joints": ["b", "c", "e"],
|
|
"per_link_method": {},
|
|
}
|
|
|
|
|
|
def load_pose_cfg(robot_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
cfg = dict(DEFAULT_CFG)
|
|
cfg.update(robot_data.get("pose_estimation", {}) or {})
|
|
return cfg
|
|
|
|
|
|
# ==================================================================
|
|
# Observations
|
|
# ==================================================================
|
|
|
|
def load_observations(path: str, use_normals: bool, min_cams: int = 2) -> Dict[int, Dict[str, Any]]:
|
|
"""
|
|
Load marker observations. Accepts aruco_marker_poses.json (with measured
|
|
normal + num_cameras) or aruco_positions_*.json (position only).
|
|
Returns: marker_id -> {pos_mm:(3,), normal:(3,)|None, link:str, n_cams:int}
|
|
"""
|
|
data = json.load(open(path, "r", encoding="utf-8"))
|
|
out: Dict[int, Dict[str, Any]] = {}
|
|
for m in data.get("markers", []):
|
|
mid = int(m.get("marker_id", m.get("id", -1)))
|
|
if mid < 0:
|
|
continue
|
|
n_cams = int(m.get("num_cameras", 99))
|
|
if n_cams < min_cams:
|
|
continue
|
|
if "position_mm" in m:
|
|
pos = np.array(m["position_mm"], dtype=float)
|
|
elif "position_m" in m:
|
|
pos = np.array(m["position_m"], dtype=float) * 1000.0
|
|
else:
|
|
continue
|
|
nrm = None
|
|
if use_normals and m.get("normal") is not None:
|
|
nv = np.array(m["normal"], dtype=float)
|
|
nn = np.linalg.norm(nv)
|
|
if nn > 1e-9:
|
|
nrm = nv / nn
|
|
out[mid] = {"pos_mm": pos, "normal": nrm, "link": m.get("link", "?"), "n_cams": n_cams}
|
|
return out
|
|
|
|
|
|
def load_seed_state(path: str) -> Dict[str, float]:
|
|
"""
|
|
Load a partial/full joint state to use as an optimisation seed (--from-state).
|
|
|
|
Accepts either a flat {variable: value} dict, or the
|
|
{"accumulated_state": {...}, ...} wrapper written by 4b_revolute_angle.py —
|
|
same unwrap rule as server/homingOrchestrator.js
|
|
(`stateData.accumulated_state ?? stateData`), so 4b's output files can be
|
|
passed in directly. Unknown keys are ignored; missing STATE_KEYS are simply
|
|
absent from the returned dict (caller defaults them, e.g. to 0.0).
|
|
"""
|
|
data = json.load(open(path, "r", encoding="utf-8"))
|
|
raw = data.get("accumulated_state", data) if isinstance(data, dict) else {}
|
|
return {k: float(v) for k, v in raw.items() if k in STATE_KEYS and v is not None}
|
|
|
|
|
|
# ==================================================================
|
|
# Kinematic chain analysis
|
|
# ==================================================================
|
|
|
|
def analyze_chain(fk: RobotFK) -> Dict[str, Any]:
|
|
"""
|
|
Derive, generically from the FK topology:
|
|
ordered_vars : movable joint variables, root->tip order, de-duplicated
|
|
var_links : variable -> list of links it drives
|
|
link_markers : link -> [model marker ids]
|
|
blocks : sequential estimation blocks; each block groups the
|
|
zero-marker ancestor variables with the next marker-
|
|
bearing joint variable, estimated from that link's own
|
|
markers (+ siblings sharing the same variable).
|
|
"""
|
|
links = fk.links
|
|
topo = fk._topo
|
|
|
|
link_markers: Dict[str, List[int]] = {}
|
|
for ln, ld in links.items():
|
|
ids = []
|
|
for mk in ld.get("markers", []) or []:
|
|
if "id" in mk and "position" in mk:
|
|
ids.append(int(mk["id"]))
|
|
link_markers[ln] = ids
|
|
|
|
link_var: Dict[str, str] = {}
|
|
for ln, ld in links.items():
|
|
j = ld.get("jointToParent", {}) or {}
|
|
if str(j.get("type", "")).lower() in ("revolute", "linear"):
|
|
v = str(j.get("variable", "")).lower()
|
|
if v:
|
|
link_var[ln] = v
|
|
|
|
var_type: Dict[str, str] = {}
|
|
var_links: Dict[str, List[str]] = defaultdict(list)
|
|
for ln, v in link_var.items():
|
|
var_links[v].append(ln)
|
|
var_type[v] = str(links[ln].get("jointToParent", {}).get("type", "")).lower()
|
|
|
|
ordered_vars: List[str] = []
|
|
for ln in topo:
|
|
if ln in link_var and link_var[ln] not in ordered_vars:
|
|
ordered_vars.append(link_var[ln])
|
|
|
|
# ---- build blocks ----
|
|
blocks: List[Dict[str, Any]] = []
|
|
var_block: Dict[str, int] = {}
|
|
pending: List[str] = []
|
|
for ln in topo:
|
|
if ln not in link_var:
|
|
continue
|
|
v = link_var[ln]
|
|
own = link_markers.get(ln, [])
|
|
if v in var_block:
|
|
# shared variable already in a block -> add this link's markers there
|
|
if own:
|
|
blocks[var_block[v]]["markers"].extend(own)
|
|
continue
|
|
if own:
|
|
bvars = []
|
|
for x in pending + [v]:
|
|
if x not in bvars and x not in var_block:
|
|
bvars.append(x)
|
|
blocks.append({"vars": bvars, "markers": list(own), "anchor": ln})
|
|
for x in bvars:
|
|
var_block[x] = len(blocks) - 1
|
|
pending = []
|
|
else:
|
|
if v not in pending:
|
|
pending.append(v)
|
|
if pending:
|
|
blocks.append({"vars": pending, "markers": [], "anchor": None})
|
|
for x in pending:
|
|
var_block[x] = len(blocks) - 1
|
|
|
|
return {
|
|
"ordered_vars": ordered_vars,
|
|
"var_type": var_type,
|
|
"var_links": dict(var_links),
|
|
"link_markers": link_markers,
|
|
"blocks": blocks,
|
|
}
|
|
|
|
|
|
# ==================================================================
|
|
# Residuals
|
|
# ==================================================================
|
|
|
|
def model_markers(fk: RobotFK, state: Dict[str, float]) -> Dict[int, Dict[str, np.ndarray]]:
|
|
T = fk.compute(state)
|
|
return fk.all_markers_world(T) # mid -> {world_mm, normal_world, link, local_mm}
|
|
|
|
|
|
def residual_vector(state: Dict[str, float], fk: RobotFK, obs: Dict[int, Dict[str, Any]],
|
|
marker_ids: List[int], cfg: Dict[str, Any]) -> np.ndarray:
|
|
"""Position (mm) + optional normal (scaled) residuals over the given markers."""
|
|
model = model_markers(fk, state)
|
|
res: List[float] = []
|
|
w_n = float(cfg.get("normal_weight", 30.0))
|
|
use_n = bool(cfg.get("use_normals", True))
|
|
for mid in marker_ids:
|
|
if mid not in model or mid not in obs:
|
|
continue
|
|
mm = model[mid]
|
|
dp = np.asarray(mm["world_mm"], float) - obs[mid]["pos_mm"]
|
|
res.extend(dp.tolist())
|
|
if use_n and obs[mid]["normal"] is not None and "normal_world" in mm:
|
|
dn = (np.asarray(mm["normal_world"], float) - obs[mid]["normal"]) * w_n
|
|
res.extend(dn.tolist())
|
|
return np.asarray(res, dtype=float)
|
|
|
|
|
|
def _state_from_vec(var_names: List[str], vec: np.ndarray, base: Dict[str, float]) -> Dict[str, float]:
|
|
s = dict(base)
|
|
for name, val in zip(var_names, vec):
|
|
s[name] = float(val)
|
|
return s
|
|
|
|
|
|
# ==================================================================
|
|
# Method: global bundle adjustment
|
|
# ==================================================================
|
|
|
|
def estimate_global_ba(fk: RobotFK, obs: Dict[int, Dict[str, Any]], var_names: List[str],
|
|
x0: Dict[str, float], cfg: Dict[str, Any]) -> Dict[str, float]:
|
|
if not HAVE_SCIPY:
|
|
print("[WARN] scipy missing — global_ba skipped, returning init")
|
|
return dict(x0)
|
|
marker_ids = list(obs.keys())
|
|
base = {k: 0.0 for k in STATE_KEYS}
|
|
base.update(x0)
|
|
vec0 = np.array([base.get(v, 0.0) for v in var_names], dtype=float)
|
|
|
|
def fun(vec):
|
|
st = _state_from_vec(var_names, vec, base)
|
|
return residual_vector(st, fk, obs, marker_ids, cfg)
|
|
|
|
loss = cfg.get("robust_loss", "huber")
|
|
f_scale = float(cfg.get("huber_delta_mm", 8.0))
|
|
try:
|
|
sol = least_squares(fun, vec0, loss=loss, f_scale=f_scale,
|
|
max_nfev=int(cfg.get("max_iterations", 200)) * max(1, len(var_names)))
|
|
return _state_from_vec(var_names, sol.x, base)
|
|
except Exception as exc:
|
|
print(f"[WARN] global_ba failed: {exc}")
|
|
return dict(base)
|
|
|
|
|
|
# ==================================================================
|
|
# Method: sequential block-wise FK fit
|
|
# ==================================================================
|
|
|
|
def _multistart_values(vtype: str) -> List[float]:
|
|
# revolute: scan the circle to escape local minima at large angles
|
|
if vtype == "revolute":
|
|
return [0.0, 60.0, 120.0, 180.0, 240.0, 300.0]
|
|
return [0.0]
|
|
|
|
|
|
def estimate_sequential_fk(fk: RobotFK, obs: Dict[int, Dict[str, Any]], chain: Dict[str, Any],
|
|
cfg: Dict[str, Any], seed: Optional[Dict[str, float]] = None
|
|
) -> Dict[str, float]:
|
|
"""
|
|
Estimate block by block along the chain, freezing already-solved variables.
|
|
|
|
seed: optional partial/full state (e.g. from 4b_revolute_angle.py) to trust
|
|
as a starting point. A block is SKIPPED entirely (seed used as-is, no
|
|
re-fit) only if ALL of its variables are present in seed. Blocks with any
|
|
missing variable are still fit normally — including their own multi-start
|
|
— but using the seeded values of EARLIER blocks as fixed context instead
|
|
of 0. This keeps the local-minimum protection for whatever the seed does
|
|
NOT cover (see doc/Homing_5_Pose.md "Wichtige Einschraenkung"), while not
|
|
re-perturbing values the caller already trusts.
|
|
"""
|
|
state = {k: 0.0 for k in STATE_KEYS}
|
|
if seed:
|
|
state.update({k: v for k, v in seed.items() if k in STATE_KEYS})
|
|
var_type = chain["var_type"]
|
|
|
|
for block in chain["blocks"]:
|
|
bvars = block["vars"]
|
|
bmarkers = [m for m in block["markers"] if m in obs]
|
|
if not bvars:
|
|
continue
|
|
if seed and all(v in seed for v in bvars):
|
|
continue # fully seeded — trust it, don't re-fit
|
|
if not bmarkers:
|
|
# unobservable block: leave at seed/0, flag later
|
|
continue
|
|
|
|
if not HAVE_SCIPY:
|
|
continue
|
|
|
|
base = dict(state)
|
|
|
|
def fun(vec, _bvars=bvars, _bm=bmarkers, _base=base):
|
|
st = _state_from_vec(_bvars, vec, _base)
|
|
return residual_vector(st, fk, obs, _bm, cfg)
|
|
|
|
# multi-start over the first revolute variable in the block
|
|
starts = [[0.0] * len(bvars)]
|
|
lead_type = var_type.get(bvars[0], "linear")
|
|
if lead_type == "revolute":
|
|
starts = []
|
|
for a0 in _multistart_values("revolute"):
|
|
s = [0.0] * len(bvars)
|
|
s[0] = a0
|
|
starts.append(s)
|
|
|
|
best, best_cost = None, float("inf")
|
|
for s0 in starts:
|
|
try:
|
|
sol = least_squares(fun, np.array(s0, dtype=float),
|
|
loss=cfg.get("robust_loss", "huber"),
|
|
f_scale=float(cfg.get("huber_delta_mm", 8.0)),
|
|
max_nfev=200 * max(1, len(bvars)))
|
|
if sol.cost < best_cost:
|
|
best_cost, best = sol.cost, sol.x
|
|
except Exception:
|
|
continue
|
|
if best is not None:
|
|
for name, val in zip(bvars, best):
|
|
state[name] = float(val)
|
|
|
|
# wrap revolute angles to (-180, 180]
|
|
for v, vt in var_type.items():
|
|
if vt == "revolute":
|
|
state[v] = (state[v] + 180.0) % 360.0 - 180.0
|
|
return state
|
|
|
|
|
|
# ==================================================================
|
|
# Method: sequential analytic vector (per revolute joint)
|
|
# ==================================================================
|
|
|
|
def estimate_sequential_vector(fk: RobotFK, obs: Dict[int, Dict[str, Any]], chain: Dict[str, Any],
|
|
cfg: Dict[str, Any]) -> Dict[str, float]:
|
|
"""
|
|
Analytic angle from marker geometry where possible. For revolute joints with
|
|
>=2 markers on the link, use the perpendicular marker-pair vector. Falls back
|
|
to the FK block solver for linear / zero-marker / single-marker cases, so it
|
|
always returns a full state (still cheaper than full sequential_fk because
|
|
well-populated joints are solved in closed form).
|
|
"""
|
|
state = {k: 0.0 for k in STATE_KEYS}
|
|
var_type = chain["var_type"]
|
|
link_markers = chain["link_markers"]
|
|
var_links = chain["var_links"]
|
|
|
|
for block in chain["blocks"]:
|
|
bvars = block["vars"]
|
|
if len(bvars) == 1 and var_type.get(bvars[0]) == "revolute":
|
|
v = bvars[0]
|
|
ln = var_links[v][0]
|
|
mids = [m for m in link_markers.get(ln, []) if m in obs]
|
|
if len(mids) >= 2:
|
|
# model vectors must be expressed in the WORLD frame at angle=0
|
|
# (the link frame is already rotated by the parents y,z,...), so
|
|
# use FK marker world positions with this joint set to 0.
|
|
state_v0 = dict(state)
|
|
state_v0[v] = 0.0
|
|
model_v0 = model_markers(fk, state_v0)
|
|
axis_world = fk.joint_axis_world(ln, state_v0)
|
|
ang = _angle_from_pairs_world(mids, model_v0, obs, axis_world)
|
|
if ang is not None:
|
|
state[v] = ang
|
|
continue
|
|
# fallback: block FK fit for this single block
|
|
_fit_single_block(fk, obs, block, var_type, cfg, state)
|
|
|
|
for v, vt in var_type.items():
|
|
if vt == "revolute":
|
|
state[v] = (state[v] + 180.0) % 360.0 - 180.0
|
|
return state
|
|
|
|
|
|
def _angle_from_pairs_world(mids: List[int], model_v0: Dict[int, Dict[str, np.ndarray]],
|
|
obs: Dict[int, Dict[str, Any]], axis_world: np.ndarray) -> Optional[float]:
|
|
from itertools import combinations
|
|
a = np.asarray(axis_world, float)
|
|
a = a / (np.linalg.norm(a) + 1e-12)
|
|
angs, ws = [], []
|
|
for i, j in combinations(mids, 2):
|
|
if i not in model_v0 or j not in model_v0:
|
|
continue
|
|
vm = np.asarray(model_v0[j]["world_mm"], float) - np.asarray(model_v0[i]["world_mm"], float) # world @ angle 0
|
|
vo = obs[j]["pos_mm"] - obs[i]["pos_mm"] # observed vector (world, mm)
|
|
vm_p = vm - np.dot(vm, a) * a
|
|
vo_p = vo - np.dot(vo, a) * a
|
|
if np.linalg.norm(vm_p) < 5 or np.linalg.norm(vo_p) < 5:
|
|
continue
|
|
ang = math.atan2(float(np.dot(a, np.cross(vm_p, vo_p))), float(np.dot(vm_p, vo_p)))
|
|
angs.append(ang)
|
|
ws.append(np.linalg.norm(vm_p) * np.linalg.norm(vo_p))
|
|
if not angs:
|
|
return None
|
|
s = sum(w * math.sin(x) for w, x in zip(ws, angs))
|
|
c = sum(w * math.cos(x) for w, x in zip(ws, angs))
|
|
return math.degrees(math.atan2(s, c))
|
|
|
|
|
|
def _fit_single_block(fk, obs, block, var_type, cfg, state):
|
|
if not HAVE_SCIPY:
|
|
return
|
|
bvars = block["vars"]
|
|
bmarkers = [m for m in block["markers"] if m in obs]
|
|
if not bvars or not bmarkers:
|
|
return
|
|
base = dict(state)
|
|
|
|
def fun(vec):
|
|
return residual_vector(_state_from_vec(bvars, vec, base), fk, obs, bmarkers, cfg)
|
|
|
|
starts = [[0.0] * len(bvars)]
|
|
if var_type.get(bvars[0]) == "revolute":
|
|
starts = [[a0] + [0.0] * (len(bvars) - 1) for a0 in _multistart_values("revolute")]
|
|
best, best_cost = None, float("inf")
|
|
for s0 in starts:
|
|
try:
|
|
sol = least_squares(fun, np.array(s0, float), loss=cfg.get("robust_loss", "huber"),
|
|
f_scale=float(cfg.get("huber_delta_mm", 8.0)), max_nfev=200 * max(1, len(bvars)))
|
|
if sol.cost < best_cost:
|
|
best_cost, best = sol.cost, sol.x
|
|
except Exception:
|
|
continue
|
|
if best is not None:
|
|
for name, val in zip(bvars, best):
|
|
state[name] = float(val)
|
|
|
|
|
|
# ==================================================================
|
|
# Dispatch
|
|
# ==================================================================
|
|
|
|
def observability(chain: Dict[str, Any], obs: Dict[int, Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
|
|
"""
|
|
Per-variable confidence from how well its estimation block is determined.
|
|
A block groups coupled variables (e.g. b,c,e on the fingers); confidence is
|
|
driven by markers-per-variable in that block:
|
|
high : >= 2 markers per variable (well over-determined)
|
|
medium : >= 1 marker per variable
|
|
low : fewer markers than variables (under-determined — distrust!)
|
|
none : no markers at all (variable left at 0)
|
|
"""
|
|
info: Dict[str, Dict[str, Any]] = {}
|
|
for block in chain["blocks"]:
|
|
seen = [m for m in block["markers"] if m in obs]
|
|
nvars = max(1, len(block["vars"]))
|
|
ratio = len(seen) / nvars
|
|
if len(seen) == 0:
|
|
conf = "none"
|
|
elif ratio >= 2.0:
|
|
conf = "high"
|
|
elif ratio >= 1.0:
|
|
conf = "medium"
|
|
else:
|
|
conf = "low"
|
|
for v in block["vars"]:
|
|
info[v] = {"observable": len(seen) > 0, "n_markers": len(seen),
|
|
"block_vars": len(block["vars"]), "confidence": conf,
|
|
"block_anchor": block["anchor"]}
|
|
return info
|
|
|
|
|
|
# ==================================================================
|
|
# Mode: joint-origin calibration (--calibrate-origin)
|
|
# ==================================================================
|
|
|
|
def estimate_origin_calibration(fk: RobotFK, obs: Dict[int, Dict[str, Any]],
|
|
link_name: str, cfg: Dict[str, Any],
|
|
seed: Optional[Dict[str, float]] = None,
|
|
free_axes: Tuple[int, ...] = (1, 2)) -> Dict[str, Any]:
|
|
"""
|
|
Fit `link_name`'s OWN joint variable together with its
|
|
`jointToParent.origin` components (default: indices 1,2 = Y,Z) from that
|
|
link's own markers, in a single robust least-squares solve. All other
|
|
joint variables are held fixed at `seed` (or 0) — this assumes the rest of
|
|
the chain (in particular a slider `x` seed, if relevant) is already
|
|
trustworthy, same precondition as the existing geometric method.
|
|
|
|
Complements doc/Kalibrierung.md Schritt [4] ("Arm1 Y-Rotationsachse"),
|
|
which fits the axis from a dedicated 3-pose capture using marker *centres*
|
|
only (circle fit). This fits from a single capture's marker corner poses
|
|
(position + measured normal, same residual as estimate_pose), reusing
|
|
whatever Homing run data is already on hand instead of a separate capture
|
|
session — useful for ANY revolute/linear joint's origin, not just Arm1/y.
|
|
|
|
Never writes robot.json. `fk.links[link_name]["jointToParent"]["origin"]`
|
|
is mutated transiently during the solve (RobotFK.compute() re-reads it
|
|
fresh on every call — see robot_fk.py) and always restored before
|
|
returning, success or not.
|
|
|
|
Returns a report dict; result["status"] is one of:
|
|
"ok" | "scipy_missing" | "insufficient_markers" | "unknown_link" | "failed"
|
|
"""
|
|
if link_name not in fk.links:
|
|
return {"link": link_name, "status": "unknown_link"}
|
|
|
|
chain = analyze_chain(fk)
|
|
link_var = next((v for v, links in chain["var_links"].items() if link_name in links), None)
|
|
if link_var is None:
|
|
return {"link": link_name, "status": "unknown_link",
|
|
"detail": "link has no movable jointToParent"}
|
|
|
|
own_markers = [m for m in chain["link_markers"].get(link_name, []) if m in obs]
|
|
joint = fk.links[link_name].get("jointToParent", {}) or {}
|
|
origin = joint.get("origin", [0.0, 0.0, 0.0])
|
|
if not isinstance(origin, list):
|
|
origin = list(origin)
|
|
joint["origin"] = origin
|
|
origin_before = list(origin)
|
|
var_type = chain["var_type"].get(link_var, "linear")
|
|
|
|
result: Dict[str, Any] = {
|
|
"link": link_name, "joint_variable": link_var,
|
|
"joint_unit": "mm" if var_type == "linear" else "deg",
|
|
"origin_before_mm": origin_before, "free_axes": list(free_axes),
|
|
"n_markers": len(own_markers), "status": "skipped",
|
|
}
|
|
if not HAVE_SCIPY:
|
|
result["status"] = "scipy_missing"
|
|
return result
|
|
if len(own_markers) < 2:
|
|
result["status"] = "insufficient_markers"
|
|
return result
|
|
|
|
base = {k: 0.0 for k in STATE_KEYS}
|
|
if seed:
|
|
base.update({k: v for k, v in seed.items() if k in STATE_KEYS})
|
|
|
|
def fun(vec):
|
|
st = dict(base)
|
|
st[link_var] = vec[0]
|
|
for i, ax in enumerate(free_axes):
|
|
origin[ax] = vec[1 + i]
|
|
return residual_vector(st, fk, obs, own_markers, cfg)
|
|
|
|
starts = [base.get(link_var, 0.0)] if var_type != "revolute" else _multistart_values("revolute")
|
|
best, best_cost = None, float("inf")
|
|
try:
|
|
for a0 in starts:
|
|
vec0 = np.array([a0] + [origin_before[ax] for ax in free_axes], dtype=float)
|
|
try:
|
|
sol = least_squares(fun, vec0, loss=cfg.get("robust_loss", "huber"),
|
|
f_scale=float(cfg.get("huber_delta_mm", 8.0)),
|
|
max_nfev=int(cfg.get("max_iterations", 200)) * 3)
|
|
if sol.cost < best_cost:
|
|
best_cost, best = sol.cost, sol.x
|
|
except Exception:
|
|
continue
|
|
finally:
|
|
for ax in free_axes:
|
|
origin[ax] = origin_before[ax] # always restore — report-only tool
|
|
|
|
if best is None:
|
|
result["status"] = "failed"
|
|
return result
|
|
|
|
fitted_joint = float(best[0])
|
|
if var_type == "revolute":
|
|
fitted_joint = (fitted_joint + 180.0) % 360.0 - 180.0
|
|
fitted_origin = list(origin_before)
|
|
for i, ax in enumerate(free_axes):
|
|
fitted_origin[ax] = float(best[1 + i])
|
|
|
|
final_state = dict(base)
|
|
final_state[link_var] = fitted_joint
|
|
for i, ax in enumerate(free_axes):
|
|
origin[ax] = fitted_origin[ax]
|
|
final_res = residual_vector(final_state, fk, obs, own_markers, cfg)
|
|
for ax in free_axes:
|
|
origin[ax] = origin_before[ax] # restore again after the check above
|
|
|
|
result.update({
|
|
"status": "ok",
|
|
"joint_value": fitted_joint,
|
|
"origin_after_mm": fitted_origin,
|
|
"origin_delta_mm": [round(b - a, 4) for a, b in zip(origin_before, fitted_origin)],
|
|
"residual_rms": float(np.sqrt(np.mean(final_res ** 2))) if final_res.size else 0.0,
|
|
"note": "robot.json NOT modified — apply via Kalibrierung-Tab "
|
|
"\"Joint-Origin Y/Z übernehmen\" (editRobot.js) if this looks good.",
|
|
})
|
|
return result
|
|
|
|
|
|
def estimate_pose(fk: RobotFK, obs: Dict[int, Dict[str, Any]], cfg: Dict[str, Any],
|
|
seed: Optional[Dict[str, float]] = None) -> Dict[str, Any]:
|
|
"""
|
|
seed: optional partial/full joint state (e.g. from load_seed_state(), the
|
|
4b_revolute_angle.py chain) to trust as a starting point for global_ba/
|
|
hybrid. Passed through to estimate_sequential_fk(), which skips re-fitting
|
|
any block that is FULLY covered by seed and otherwise still applies its
|
|
normal per-block multi-start — so variables the seed does NOT cover keep
|
|
the existing local-minimum protection (see doc/Homing_5_Pose.md "Wichtige
|
|
Einschraenkung") instead of silently defaulting to an unprotected 0.
|
|
sequential_vector ignores seed (no x0 input; left untouched on purpose —
|
|
it is the cheap analytic method, not the one this seeding targets).
|
|
"""
|
|
chain = analyze_chain(fk)
|
|
var_names = chain["ordered_vars"]
|
|
method = str(cfg.get("method", "hybrid")).lower()
|
|
obsv = observability(chain, obs)
|
|
|
|
if method == "sequential_vector":
|
|
state = estimate_sequential_vector(fk, obs, chain, cfg)
|
|
elif method == "sequential_fk":
|
|
state = estimate_sequential_fk(fk, obs, chain, cfg, seed=seed)
|
|
else: # global_ba / hybrid (default) — both use the same init->refine path
|
|
init = estimate_sequential_fk(fk, obs, chain, cfg, seed=seed)
|
|
state = estimate_global_ba(fk, obs, var_names, init, cfg)
|
|
|
|
# final residual stats over all observed markers
|
|
final_res = residual_vector(state, fk, obs, list(obs.keys()), cfg)
|
|
rms = float(np.sqrt(np.mean(final_res ** 2))) if final_res.size else 0.0
|
|
|
|
return {"state": state, "method": method, "observability": obsv,
|
|
"residual_rms": rms, "num_markers": len(obs)}
|
|
|
|
|
|
# ==================================================================
|
|
# CLI
|
|
# ==================================================================
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description="Estimate robot joint angles from marker poses")
|
|
ap.add_argument("markers", help="aruco_marker_poses.json (corner_pose) or aruco_positions_*.json (center)")
|
|
ap.add_argument("-robot", "--robot", required=True)
|
|
ap.add_argument("-out", "--out", default=None)
|
|
ap.add_argument("--method", default=None, help="override robot.json method")
|
|
ap.add_argument("--from-state", default=None, metavar="JSON",
|
|
help="Seed/init state (flat {var:value} or {accumulated_state:{...}} as "
|
|
"written by 4b_revolute_angle.py). Used as x0 for global_ba/hybrid "
|
|
"instead of the internal cold start. See doc/Homing_5_Pose.md.")
|
|
ap.add_argument("--calibrate-origin", default=None, metavar="LINK",
|
|
help="Instead of estimating the full pose, fit LINK's own joint value "
|
|
"together with its jointToParent.origin Y/Z from LINK's own markers. "
|
|
"Writes a *_origin_calibration.json report; never modifies robot.json.")
|
|
args = ap.parse_args()
|
|
|
|
robot_data = json.load(open(args.robot, "r", encoding="utf-8"))
|
|
cfg = load_pose_cfg(robot_data)
|
|
if args.method:
|
|
cfg["method"] = args.method
|
|
|
|
fk = RobotFK(robot_data)
|
|
obs = load_observations(args.markers, cfg.get("use_normals", True),
|
|
int(cfg.get("min_cameras_per_marker", 2)))
|
|
seed = load_seed_state(args.from_state) if args.from_state else None
|
|
print(f"[INFO] method={cfg['method']} | observed markers={len(obs)} | use_normals={cfg.get('use_normals')}"
|
|
+ (f" | seed={seed}" if seed else ""))
|
|
|
|
# ── Mode: joint-origin calibration ──────────────────────────────────────
|
|
if args.calibrate_origin:
|
|
calib = estimate_origin_calibration(fk, obs, args.calibrate_origin, cfg, seed=seed)
|
|
print(f"\nOrigin calibration for link={calib['link']} status={calib['status']}")
|
|
if calib["status"] == "ok":
|
|
unit = calib["joint_unit"]
|
|
print(f" joint {calib['joint_variable']}: {calib['joint_value']:.2f} {unit}")
|
|
print(f" origin before: {calib['origin_before_mm']}")
|
|
print(f" origin after: {calib['origin_after_mm']} (delta {calib['origin_delta_mm']} mm)")
|
|
print(f" residual RMS over {calib['n_markers']} markers: {calib['residual_rms']:.3f}")
|
|
print(f" {calib['note']}")
|
|
else:
|
|
print(f" (no fit — {calib.get('detail', calib['status'])}, n_markers={calib.get('n_markers', 0)})")
|
|
out_path = args.out or os.path.join(
|
|
os.path.dirname(args.markers), f"{args.calibrate_origin}_origin_calibration.json")
|
|
json.dump(calib, open(out_path, "w", encoding="utf-8"), indent=2)
|
|
print(f"[INFO] wrote {out_path}")
|
|
return
|
|
|
|
# ── Mode: full pose estimation (default) ────────────────────────────────
|
|
result = estimate_pose(fk, obs, cfg, seed=seed)
|
|
st = result["state"]
|
|
|
|
print("\nEstimated joint values:")
|
|
for v in ["x", "y", "z", "a", "b", "c", "e"]:
|
|
ob = result["observability"].get(v, {})
|
|
unit = "mm" if v in ("x", "e") else "deg"
|
|
conf = ob.get("confidence", "?")
|
|
tag = "" if ob.get("observable", False) else " [UNOBSERVABLE -> null]"
|
|
print(f" {v}: {st.get(v, 0.0):8.2f} {unit} (markers={ob.get('n_markers','?')}, conf={conf}){tag}")
|
|
print(f"\n[INFO] residual RMS over {result['num_markers']} markers: {result['residual_rms']:.3f}")
|
|
|
|
movements = {}
|
|
for v in ["x", "y", "z", "a", "b", "c", "e"]:
|
|
ob = result["observability"].get(v, {})
|
|
observable = ob.get("observable", False)
|
|
movements[v] = {
|
|
# Unobservable -> null, never a fabricated 0 (see module docstring).
|
|
"value": st.get(v, 0.0) if observable else None,
|
|
"unit": "mm" if v in ("x", "e") else "deg",
|
|
"observable": observable,
|
|
"confidence": ob.get("confidence", "none"),
|
|
"n_markers": ob.get("n_markers", 0),
|
|
}
|
|
out = {
|
|
"schema_version": "1.0",
|
|
"created_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
|
"method": result["method"],
|
|
"seeded": seed is not None,
|
|
"movements": movements,
|
|
"residual_rms": result["residual_rms"],
|
|
"num_markers": result["num_markers"],
|
|
}
|
|
out_path = args.out or os.path.join(os.path.dirname(args.markers), "robot_state.json")
|
|
json.dump(out, open(out_path, "w", encoding="utf-8"), indent=2)
|
|
print(f"[INFO] wrote {out_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|