import os import time import json import re from ultralytics import YOLO SNAPSHOT_DIR = "/snapshots" # only process images from last N minutes MAX_AGE_MINUTES = 3 MAX_AGE_MS = MAX_AGE_MINUTES * 60 * 1000 # remember processed files (for runtime) PROCESSED = set() # strict filename pattern PATTERN = re.compile(r"^snapshot_video\d+_(\d+)\.jpg$") model = YOLO("yolov8m.pt") #model = YOLO("yolov8s-pose.pt") def extract_timestamp(filename): match = PATTERN.match(filename) if match: return int(match.group(1)) return None def get_recent_files(): now_ms = int(time.time() * 1000) files = [] for f in os.listdir(SNAPSHOT_DIR): ts = extract_timestamp(f) if ts is None: continue # filter by age if now_ms - ts > MAX_AGE_MS: continue files.append((f, ts)) # newest first files.sort(key=lambda x: x[1], reverse=True) return [f[0] for f in files] def is_file_stable(path): """Avoid processing partially written files""" try: size1 = os.path.getsize(path) time.sleep(0.1) size2 = os.path.getsize(path) return size1 == size2 except: return False def detect_and_save(file): full_path = os.path.join(SNAPSHOT_DIR, file) if not is_file_stable(full_path): return results = model(full_path, imgsz=960) detections = [] for r in results: boxes = r.boxes if boxes is None: continue for b in boxes: detections.append({ "class": model.names[int(b.cls)], "confidence": float(b.conf), "bbox_xyxy": b.xyxy[0].tolist() }) # ✅ --- NEW: create annotated image --- annotated_img = r.plot() # returns image with boxes drawn out_img_file = file.replace(".jpg", "_yolo.jpg") out_img_path = os.path.join(SNAPSHOT_DIR, out_img_file) # save using OpenCV import cv2 cv2.imwrite(out_img_path, annotated_img) out_file = file.replace(".jpg", "_yolo.json") out_path = os.path.join(SNAPSHOT_DIR, out_file) with open(out_path, "w") as f: json.dump(detections, f, indent=2) print(f"Processed {file} → {out_file}") while True: try: files = get_recent_files() for file in files[:10]: # limit per loop if file in PROCESSED: continue json_file = file.replace(".jpg", "_yolo.json") json_path = os.path.join(SNAPSHOT_DIR, json_file) # skip if already processed (persistent) if os.path.exists(json_path): PROCESSED.add(file) continue detect_and_save(file) PROCESSED.add(file) # cleanup memory if len(PROCESSED) > 1000: PROCESSED.clear() except Exception as e: print("Error:", e) time.sleep(1)