130 lines
3.0 KiB
Python
Executable File
130 lines
3.0 KiB
Python
Executable File
import os
|
|
import time
|
|
import json
|
|
import re
|
|
from ultralytics import YOLO
|
|
|
|
SNAPSHOT_DIR = "/snapshots"
|
|
|
|
# only process images from last N minutes
|
|
MAX_AGE_MINUTES = 3
|
|
MAX_AGE_MS = MAX_AGE_MINUTES * 60 * 1000
|
|
|
|
# remember processed files (for runtime)
|
|
PROCESSED = set()
|
|
|
|
# strict filename pattern
|
|
PATTERN = re.compile(r"^snapshot_video\d+_(\d+)\.jpg$")
|
|
|
|
model = YOLO("yolov8m.pt")
|
|
#model = YOLO("yolov8s-pose.pt")
|
|
|
|
|
|
def extract_timestamp(filename):
|
|
match = PATTERN.match(filename)
|
|
if match:
|
|
return int(match.group(1))
|
|
return None
|
|
|
|
|
|
def get_recent_files():
|
|
now_ms = int(time.time() * 1000)
|
|
files = []
|
|
|
|
for f in os.listdir(SNAPSHOT_DIR):
|
|
ts = extract_timestamp(f)
|
|
if ts is None:
|
|
continue
|
|
|
|
# filter by age
|
|
if now_ms - ts > MAX_AGE_MS:
|
|
continue
|
|
|
|
files.append((f, ts))
|
|
|
|
# newest first
|
|
files.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
return [f[0] for f in files]
|
|
|
|
|
|
def is_file_stable(path):
|
|
"""Avoid processing partially written files"""
|
|
try:
|
|
size1 = os.path.getsize(path)
|
|
time.sleep(0.1)
|
|
size2 = os.path.getsize(path)
|
|
return size1 == size2
|
|
except:
|
|
return False
|
|
|
|
|
|
def detect_and_save(file):
|
|
full_path = os.path.join(SNAPSHOT_DIR, file)
|
|
|
|
if not is_file_stable(full_path):
|
|
return
|
|
|
|
results = model(full_path, imgsz=960)
|
|
|
|
detections = []
|
|
|
|
for r in results:
|
|
boxes = r.boxes
|
|
if boxes is None:
|
|
continue
|
|
|
|
for b in boxes:
|
|
detections.append({
|
|
"class": model.names[int(b.cls)],
|
|
"confidence": float(b.conf),
|
|
"bbox_xyxy": b.xyxy[0].tolist()
|
|
})
|
|
|
|
# ✅ --- NEW: create annotated image ---
|
|
annotated_img = r.plot() # returns image with boxes drawn
|
|
|
|
out_img_file = file.replace(".jpg", "_yolo.jpg")
|
|
out_img_path = os.path.join(SNAPSHOT_DIR, out_img_file)
|
|
|
|
# save using OpenCV
|
|
import cv2
|
|
cv2.imwrite(out_img_path, annotated_img)
|
|
|
|
|
|
out_file = file.replace(".jpg", "_yolo.json")
|
|
out_path = os.path.join(SNAPSHOT_DIR, out_file)
|
|
|
|
with open(out_path, "w") as f:
|
|
json.dump(detections, f, indent=2)
|
|
|
|
print(f"Processed {file} → {out_file}")
|
|
|
|
|
|
while True:
|
|
try:
|
|
files = get_recent_files()
|
|
|
|
for file in files[:10]: # limit per loop
|
|
if file in PROCESSED:
|
|
continue
|
|
|
|
json_file = file.replace(".jpg", "_yolo.json")
|
|
json_path = os.path.join(SNAPSHOT_DIR, json_file)
|
|
|
|
# skip if already processed (persistent)
|
|
if os.path.exists(json_path):
|
|
PROCESSED.add(file)
|
|
continue
|
|
|
|
detect_and_save(file)
|
|
PROCESSED.add(file)
|
|
|
|
# cleanup memory
|
|
if len(PROCESSED) > 1000:
|
|
PROCESSED.clear()
|
|
|
|
except Exception as e:
|
|
print("Error:", e)
|
|
|
|
time.sleep(1) |