// programs/videoServer.js 'use strict'; const fs = require('fs'); const { spawn } = require('child_process'); const WebSocket = require('ws'); class JpegFrameSplitter { constructor(onFrame) { this.onFrame = onFrame; this.buffer = Buffer.alloc(0); } push(chunk) { if (!chunk || !chunk.length) return; this.buffer = Buffer.concat([this.buffer, chunk]); let start = this.buffer.indexOf(Buffer.from([0xFF, 0xD8])); while (start !== -1) { const end = this.buffer.indexOf(Buffer.from([0xFF, 0xD9]), start + 2); if (end === -1) break; const frame = this.buffer.slice(start, end + 2); try { this.onFrame(frame); } catch {} this.buffer = this.buffer.slice(end + 2); start = this.buffer.indexOf(Buffer.from([0xFF, 0xD8])); } if (this.buffer.length > 8 * 1024 * 1024) { const nextSOI = this.buffer.indexOf(Buffer.from([0xFF, 0xD8])); this.buffer = nextSOI !== -1 ? this.buffer.slice(nextSOI) : Buffer.alloc(0); } } } class FFmpegStreamer { /** * devicePath: '/dev/videoX' * options: { * name, width, height, fps, quality, * input: { format, fps, size, useWallclock, threadQueueSize, channel }, * tryFormats: [ 'mjpeg', undefined, 'yuyv422', 'rgb24' ] * } */ constructor(devicePath, options = {}) { this.devicePath = devicePath; this.name = options.name || devicePath; this.opts = { width: options.width ?? undefined, height: options.height ?? undefined, fps: options.fps ?? 20, quality: options.quality ?? 5, input: { format: options.input?.format, fps: options.input?.fps, size: options.input?.size, useWallclock: options.input?.useWallclock ?? true, threadQueueSize: options.input?.threadQueueSize ?? 64, channel: options.input?.channel, }, tryFormats: (options.tryFormats || [options.input?.format, 'yuyv422', 'mjpeg', 'rgb24']) .filter((v, i, a) => a.indexOf(v) === i), }; this.proc = null; this.clients = new Set(); this.startedAt = null; this.latestFrame = null; this.splitter = null; this.formatIdx = 0; this.currentFormat = this.opts.tryFormats[this.formatIdx]; this._restarting = false; this._backoffMs = 500; this._maxBackoffMs = 8000; this._stderrBuf = []; this._stderrMaxLines = 8; this._quickFailCount = 0; this._quickFailLimit = 6; this._suspendedUntil = 0; } get running() { return !!this.proc; } _scaling() { return Number(this.opts.width) > 0 && Number(this.opts.height) > 0; } _buildFfmpegArgs() { const outFps = this.opts.fps; const quality = this.opts.quality; const scaling = this._scaling(); const inFmt = this.currentFormat; const inFps = this.opts.input.fps; const inSize = this.opts.input.size; const useWallclock = this.opts.input.useWallclock; const tqs = this.opts.input.threadQueueSize; const inChannel = this.opts.input.channel; const args = [ '-hide_banner', '-loglevel', 'error', '-nostdin', '-f', 'video4linux2', ...(tqs ? ['-thread_queue_size', String(tqs)] : []), ...(inFmt ? ['-input_format', String(inFmt)] : []), ...(inFps ? ['-framerate', String(inFps)] : []), ...(inSize ? ['-video_size', String(inSize)] : []), ...(typeof inChannel === 'number' ? ['-channel', String(inChannel)] : []), ...(useWallclock ? ['-use_wallclock_as_timestamps', '1'] : []), '-i', this.devicePath, //'-fflags', 'nobuffer', '-flags', 'low_delay', '-an', '-sn', '-fflags', 'nobuffer', '-an', '-sn', ]; if (inFmt === 'mjpeg' && !scaling) { args.push('-vsync', 'passthrough', '-c:v', 'copy', '-f', 'mjpeg', 'pipe:1'); return args; } if (scaling) { args.push('-vf', `scale=${Number(this.opts.width)}:${Number(this.opts.height)}`); args.push('-pix_fmt', 'yuvj422p'); // für mjpeg-Encoder robust } if (outFps) args.push('-r', String(outFps)); args.push('-f', 'mjpeg', '-q:v', String(quality), 'pipe:1'); return args; } _logStderr(d) { const s = d.toString().trim(); if (!s) return; this._stderrBuf.push(s); if (this._stderrBuf.length > this._stderrMaxLines) this._stderrBuf.shift(); } start() { if (this.proc) return; if (Date.now() < this._suspendedUntil) { const wait = this._suspendedUntil - Date.now(); console.warn(`[FFmpeg] ${this.name} suspended for ${wait}ms due to repeated failures`); return setTimeout(() => this.start(), wait); } const args = this._buildFfmpegArgs(); console.log(`[FFmpeg] Start ${this.devicePath} (${this.name}) :: ${args.join(' ')}`); this._stderrBuf = []; this.proc = spawn('ffmpeg', args, { stdio: ['ignore', 'pipe', 'pipe'], detached: true }); this.startedAt = Date.now(); this.splitter = new JpegFrameSplitter((frame) => { this.latestFrame = frame; this._broadcast(frame); }); this.proc.stdout.on('data', (chunk) => this.splitter?.push(chunk)); this.proc.stderr.on('data', (d) => this._logStderr(d)); this.proc.on('exit', (code, signal) => { console.warn(`[FFmpeg] ${this.devicePath} exited code=${code} sig=${signal}`); if (this._stderrBuf.length) console.warn(`[FFmpeg] ${this.name} last errors:\n - ${this._stderrBuf.join('\n - ')}`); this.proc = null; const quick = (Date.now() - (this.startedAt || Date.now())) < 2000; this.startedAt = null; if (quick && !this._restarting) { this._quickFailCount++; this.formatIdx = (this.formatIdx + 1) % this.opts.tryFormats.length; this.currentFormat = this.opts.tryFormats[this.formatIdx]; console.warn(`[FFmpeg] ${this.name}: quick failure -> trying next format: ${this.currentFormat}`); } else { this._quickFailCount = 0; } if (this._quickFailCount >= this._quickFailLimit) { this._suspendedUntil = Date.now() + 60000; // 60s pause this._quickFailCount = 0; console.error(`[FFmpeg] ${this.name}: too many quick failures; suspending restarts for 60s`); } const delay = this._restarting ? 300 : Math.min(this._backoffMs, this._maxBackoffMs); setTimeout(() => { if (!this._restarting) this._backoffMs = Math.min(this._backoffMs * 2, this._maxBackoffMs); else this._backoffMs = 500; this.start(); }, delay); this._restarting = false; }); } _killProcessGroup(signal = 'SIGTERM') { if (!this.proc) return; try { if (process.platform !== 'win32') process.kill(-this.proc.pid, signal); else this.proc.kill(signal); } catch {} } stop() { if (!this.proc) return; this._restarting = false; this._killProcessGroup('SIGTERM'); } restart(newOpts = {}) { this._restarting = true; if (newOpts.input) this.opts.input = { ...this.opts.input, ...newOpts.input }; this.opts = { ...this.opts, ...newOpts, input: this.opts.input }; if (newOpts.input && Object.prototype.hasOwnProperty.call(newOpts.input, 'format')) { const idx = this.opts.tryFormats.indexOf(this.opts.input.format); if (idx >= 0) { this.formatIdx = idx; this.currentFormat = this.opts.tryFormats[this.formatIdx]; } } if (this.proc) this._killProcessGroup('SIGTERM'); else { this._restarting = false; this.start(); } } attach(ws) { this.clients.add(ws); if (this.latestFrame && ws.readyState === WebSocket.OPEN) { try { ws.send(this.latestFrame, { binary: true }); } catch {} } ws.on('close', () => this.clients.delete(ws)); } snapshot(toFile) { if (!this.latestFrame) throw new Error('No frame available yet'); fs.writeFileSync(toFile, this.latestFrame); return toFile; } _broadcast(frame) { if (!this.clients.size) return; for (const ws of this.clients) { if (ws.readyState !== WebSocket.OPEN) continue; if (ws.bufferedAmount > 512 * 1024) continue; // drop if back-pressured try { ws.send(frame, { binary: true }); } catch {} } } /** * Nimmt einen Snapshot in hoher Auflösung auf, unabhängig vom Stream. * Startet kurz einen separaten ffmpeg-Prozess und speichert 1 Frame als JPEG. * * @param {string} toFile - Pfad zur Zieldatei (z.B. '/tmp/snap.jpg') * @param {object} [opts] * @param {string} [opts.size] - 'WxH' z.B. '1280x960' (Default: opts.input.size) * @param {string} [opts.format] - z.B. 'mjpeg' | 'yuyv422' (Default: opts.input.format) * @param {number} [opts.quality] - FFmpeg JPEG-Qualität 2..31 (kleiner = besser). Default: 2 * @param {number} [opts.timeoutMs] - Abbruch nach ms. Default: 3000 */ async snapshotHighRes(toFile, { size, format, quality = 2, timeoutMs = 3000 } = {}) { return new Promise((resolve, reject) => { const inFmt = format ?? this.opts.input.format ?? 'mjpeg'; const inSize = size ?? this.opts.input.size; // wenn undefined, nimmt ffmpeg die Kamera-Default const fps = Math.min(5, this.opts.input.fps || 5); // niedrig reicht für Einzelbild const args = [ '-hide_banner', '-loglevel', 'error', '-f', 'video4linux2', ...(inFmt ? ['-input_format', String(inFmt)] : []), ...(fps ? ['-framerate', String(fps)] : []), ...(inSize ? ['-video_size', String(inSize)] : []), '-i', this.devicePath, '-frames:v', '1', '-q:v', String(quality), '-y', toFile, ]; const p = spawn('ffmpeg', args, { stdio: ['ignore', 'ignore', 'pipe'] }); let stderr = ''; const t = setTimeout(() => { try { p.kill('SIGKILL'); } catch {} reject(new Error(`snapshotHighRes timeout after ${timeoutMs}ms`)); }, timeoutMs); p.stderr.on('data', d => { stderr += d.toString(); }); p.on('close', code => { clearTimeout(t); if (code === 0) resolve(toFile); else reject(new Error(`ffmpeg exited ${code}: ${stderr.trim()}`)); }); }); } } module.exports = { FFmpegStreamer };