Files
appRobotRender/pipeline/1_detect_aruco_observations.py
2026-06-19 06:44:46 +02:00

636 lines
14 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import os
import hashlib
import time
import uuid
from typing import Dict, Any
import cv2
import numpy as np
# ------------------------------------------------------------
# Utilities
# ------------------------------------------------------------
def resolve_path(path):
path = os.path.expanduser(path)
# Absoluter Pfad → direkt verwenden
if os.path.isabs(path):
return path
# Relativer Pfad → absolut machen (auf Basis aktuellem cwd)
return os.path.abspath(path)
def load_intrinsics_npz(npz_path: str):
data = np.load(npz_path)
for k in ('camera_matrix', 'mtx', 'K'):
if k in data:
K = data[k].astype(np.float32)
break
else:
raise KeyError('Camera matrix not found in npz')
for k in ('dist_coeffs', 'dist', 'D'):
if k in data:
D = data[k].astype(np.float32).reshape(-1, 1)
break
else:
D = np.zeros((5, 1), dtype=np.float32)
return K, D
# ------------------------------------------------------------
def load_robot_vision_config(robot_json_path: str):
robot_json_path = resolve_path(robot_json_path)
with open(robot_json_path, 'r', encoding='utf-8') as f:
robot = json.load(f)
vision_config = robot.get('vision_config', {})
marker_type = vision_config.get('MarkerType', 'DICT_4X4_250')
marker_size = float(vision_config.get('MarkerSize', 0.025))
return {
'MarkerType': marker_type,
'MarkerSize': marker_size
}
# ------------------------------------------------------------
def get_aruco_detector(dict_name: str):
mapping = {
'DICT_4X4_250': cv2.aruco.DICT_4X4_250,
'DICT_5X5_100': cv2.aruco.DICT_5X5_100,
'DICT_6X6_250': cv2.aruco.DICT_6X6_250,
'DICT_ARUCO_ORIGINAL': cv2.aruco.DICT_ARUCO_ORIGINAL,
}
dict_id = mapping.get(dict_name, cv2.aruco.DICT_4X4_250)
dictionary = cv2.aruco.getPredefinedDictionary(dict_id)
try:
params = cv2.aruco.DetectorParameters()
except Exception:
params = cv2.aruco.DetectorParameters_create()
try:
detector = cv2.aruco.ArucoDetector(dictionary, params)
return detector, None
except Exception:
return None, (dictionary, params)
# ------------------------------------------------------------
def detect_markers(image, detector_tuple):
detector, fallback = detector_tuple
if detector is not None:
corners, ids, rejected = detector.detectMarkers(image)
else:
dictionary, params = fallback
corners, ids, rejected = cv2.aruco.detectMarkers(
image,
dictionary,
parameters=params
)
return corners, ids, rejected
# ------------------------------------------------------------
def hash_file(path):
sha = hashlib.sha256()
with open(path, 'rb') as f:
while True:
chunk = f.read(1024 * 1024)
if not chunk:
break
sha.update(chunk)
return sha.hexdigest()
# ------------------------------------------------------------
def polygon_mask(shape, polygon):
mask = np.zeros(shape, dtype=np.uint8)
cv2.fillConvexPoly(
mask,
polygon.astype(np.int32),
255
)
return mask
# ------------------------------------------------------------
def shrink_polygon(points, scale=0.80):
center = np.mean(points, axis=0)
shrunk = center + (points - center) * scale
return shrunk.astype(np.float32)
# ------------------------------------------------------------
def compute_sharpness(gray_image, polygon):
shrunk = shrink_polygon(polygon, scale=0.80)
mask = polygon_mask(gray_image.shape, shrunk)
pixels = gray_image[mask == 255]
if pixels.size == 0:
return 0.0
temp = np.zeros_like(gray_image)
temp[mask == 255] = gray_image[mask == 255]
lap = cv2.Laplacian(temp, cv2.CV_64F)
values = lap[mask == 255]
if values.size == 0:
return 0.0
return float(values.var())
# ------------------------------------------------------------
def compute_contrast(gray_image, polygon):
shrunk = shrink_polygon(polygon, scale=0.80)
mask = polygon_mask(gray_image.shape, shrunk)
pixels = gray_image[mask == 255]
if pixels.size == 0:
return {
'p05': 0.0,
'p95': 0.0,
'dynamic_range': 0.0,
'mean_gray': 0.0,
'std_gray': 0.0
}
p05 = float(np.percentile(pixels, 5))
p95 = float(np.percentile(pixels, 95))
return {
'p05': p05,
'p95': p95,
'dynamic_range': float(p95 - p05),
'mean_gray': float(np.mean(pixels)),
'std_gray': float(np.std(pixels))
}
# ------------------------------------------------------------
def compute_edge_ratio(corners):
edge_lengths = []
for k in range(4):
p1 = corners[k]
p2 = corners[(k + 1) % 4]
edge_lengths.append(
float(np.linalg.norm(p1 - p2))
)
edge_ratio = (
max(edge_lengths) /
max(1e-6, min(edge_lengths))
)
return edge_ratio, edge_lengths
# ------------------------------------------------------------
def compute_geometry_metrics(center, corners, width, height):
image_center = np.array(
[width / 2.0, height / 2.0],
dtype=np.float32
)
dist_center = np.linalg.norm(center - image_center)
max_dist = np.linalg.norm(image_center)
distance_center_norm = float(
dist_center / max(1e-6, max_dist)
)
min_x = np.min(corners[:, 0])
max_x = np.max(corners[:, 0])
min_y = np.min(corners[:, 1])
max_y = np.max(corners[:, 1])
border_distance_px = float(min(
min_x,
min_y,
width - max_x,
height - max_y
))
return {
'distance_to_center_norm': distance_center_norm,
'distance_to_border_px': border_distance_px
}
# ------------------------------------------------------------
def compute_confidence(
area_px,
sharpness,
edge_ratio,
dynamic_range,
border_distance_px
):
score = 1.0
# area
score *= min(1.0, area_px / 1500.0)
# sharpness
score *= min(1.0, sharpness / 120.0)
# edge distortion
score *= 1.0 / max(1.0, edge_ratio)
# contrast
score *= min(1.0, dynamic_range / 80.0)
# border distance
score *= min(1.0, max(0.0, border_distance_px) / 50.0)
score = max(0.0, min(1.0, score))
return float(score)
# ------------------------------------------------------------
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'-i',
'--image',
required=True
)
parser.add_argument(
'-npz',
'--intrinsics',
required=True
)
parser.add_argument(
'-robot',
'--robot',
required=True
)
parser.add_argument(
'-cameraId',
'--cameraId',
required=True,
type=str
)
parser.add_argument(
'-outDir',
'--outDir',
required=True
)
parser.add_argument(
'--saveDebugImage',
action='store_true',
help='Speichert ein Debug-JPG mit eingezeichneten Marker-Rahmen'
)
args = parser.parse_args()
out_dir = resolve_path(args.outDir)
os.makedirs(out_dir, exist_ok=True)
# --------------------------------------------------------
# Load robot vision config
# --------------------------------------------------------
vision_config = load_robot_vision_config(args.robot)
marker_type = vision_config['MarkerType']
marker_size = vision_config['MarkerSize']
# --------------------------------------------------------
# Load image
# --------------------------------------------------------
image_path = resolve_path(args.image)
image = cv2.imread(image_path)
if image is None:
raise RuntimeError(f'Cannot read image: {args.image}')
gray = cv2.cvtColor(
image,
cv2.COLOR_BGR2GRAY
)
height, width = gray.shape[:2]
# --------------------------------------------------------
# Intrinsics
# --------------------------------------------------------
intrinsics_path = resolve_path(args.intrinsics)
K, D = load_intrinsics_npz(intrinsics_path)
# --------------------------------------------------------
# Detection
# --------------------------------------------------------
detector_tuple = get_aruco_detector(marker_type)
corners_list, ids, rejected = detect_markers(
gray,
detector_tuple
)
# ids_raw: original numpy array für drawDetectedMarkers
ids_raw = ids
detections = []
# --------------------------------------------------------
# Valid detections
# --------------------------------------------------------
if ids is not None:
ids = ids.flatten().tolist()
for i, marker_id in enumerate(ids):
corners = corners_list[i].reshape((4, 2)).astype(np.float32)
center = corners.mean(axis=0)
area_px = float(
cv2.contourArea(corners)
)
perimeter_px = float(
cv2.arcLength(corners, True)
)
edge_ratio, edge_lengths = compute_edge_ratio(corners)
sharpness = compute_sharpness(
gray,
corners
)
contrast = compute_contrast(
gray,
corners
)
geometry = compute_geometry_metrics(
center,
corners,
width,
height
)
confidence = compute_confidence(
area_px=area_px,
sharpness=sharpness,
edge_ratio=edge_ratio,
dynamic_range=contrast['dynamic_range'],
border_distance_px=geometry['distance_to_border_px']
)
detection = {
'observation_id': str(uuid.uuid4()),
'type': 'aruco',
'marker_id': int(marker_id),
'marker_size_m': marker_size,
'image_points_px': corners.tolist(),
'center_px': center.tolist(),
'quality': {
'area_px': area_px,
'perimeter_px': perimeter_px,
'sharpness': {
'laplacian_var': sharpness
},
'contrast': contrast,
'geometry': geometry,
'edge_ratio': edge_ratio,
'edge_lengths_px': edge_lengths
},
'confidence': confidence
}
detections.append(detection)
# --------------------------------------------------------
# Rejected candidates
# --------------------------------------------------------
rejected_candidates = []
if rejected is not None:
for candidate in rejected:
pts = candidate.reshape((-1, 2)).astype(np.float32)
center = pts.mean(axis=0)
area_px = float(
cv2.contourArea(pts)
)
rejected_candidates.append({
'image_points_px': pts.tolist(),
'center_px': center.tolist(),
'area_px': area_px
})
# --------------------------------------------------------
# Final output
# --------------------------------------------------------
output = {
'schema_version': '1.0',
'created_utc': time.strftime(
'%Y-%m-%dT%H:%M:%SZ',
time.gmtime()
),
'vision_config': {
'MarkerType': marker_type,
'MarkerSize': marker_size
},
'camera': {
'camera_id': args.cameraId,
'intrinsics_file': os.path.abspath(args.intrinsics),
'camera_matrix': K.tolist(),
'distortion_coefficients': D.reshape(-1).tolist()
},
'image': {
'image_file': os.path.abspath(args.image),
'image_sha256': hash_file(args.image),
'width_px': int(width),
'height_px': int(height)
},
'aruco': {
'dictionary': marker_type,
'num_detected_markers': len(detections),
'num_rejected_candidates': len(rejected_candidates)
},
'detections': detections,
'rejected_candidates': rejected_candidates
}
# --------------------------------------------------------
# Output path
# --------------------------------------------------------
input_filename = os.path.basename(args.image)
input_base = os.path.splitext(input_filename)[0]
out_json = os.path.join(
out_dir,
f'{input_base}_aruco_detection.json'
)
# --------------------------------------------------------
# Save JSON
# --------------------------------------------------------
with open(out_json, 'w', encoding='utf-8') as f:
json.dump(
output,
f,
indent=2
)
print(f'Saved: {out_json}')
# --------------------------------------------------------
# Debug-Bild mit Marker-Rahmen
# --------------------------------------------------------
if args.saveDebugImage:
debug_img = image.copy()
if corners_list and ids_raw is not None:
cv2.aruco.drawDetectedMarkers(debug_img, corners_list, ids_raw)
debug_path = os.path.join(
out_dir,
f'{input_base}_debug.jpg'
)
cv2.imwrite(debug_path, debug_img)
print(f'Saved debug: {debug_path}')
# ------------------------------------------------------------
if __name__ == '__main__':
main()