// programs/videoServer.js 'use strict'; const fs = require('fs'); const { spawn } = require('child_process'); const WebSocket = require('ws'); class JpegFrameSplitter { constructor(onFrame) { this.onFrame = onFrame; this.buffer = Buffer.alloc(0); } push(chunk) { if (!chunk || !chunk.length) return; this.buffer = Buffer.concat([this.buffer, chunk]); let start = this.buffer.indexOf(Buffer.from([0xFF, 0xD8])); while (start !== -1) { const end = this.buffer.indexOf(Buffer.from([0xFF, 0xD9]), start + 2); if (end === -1) break; const frame = this.buffer.slice(start, end + 2); try { this.onFrame(frame); } catch {} this.buffer = this.buffer.slice(end + 2); start = this.buffer.indexOf(Buffer.from([0xFF, 0xD8])); } if (this.buffer.length > 8 * 1024 * 1024) { const nextSOI = this.buffer.indexOf(Buffer.from([0xFF, 0xD8])); this.buffer = nextSOI !== -1 ? this.buffer.slice(nextSOI) : Buffer.alloc(0); } } } class FFmpegStreamer { /** * devicePath: '/dev/videoX' * options: { * name, width, height, fps, quality, * input: { format, fps, size, useWallclock, threadQueueSize, channel }, * tryFormats: [ 'mjpeg', undefined, 'yuyv422', 'rgb24' ] * } */ constructor(devicePath, options = {}) { this.devicePath = devicePath; this.name = options.name || devicePath; this.opts = { width: options.width ?? undefined, height: options.height ?? undefined, fps: options.fps ?? 20, quality: options.quality ?? 5, input: { format: options.input?.format, fps: options.input?.fps, size: options.input?.size, useWallclock: options.input?.useWallclock ?? true, threadQueueSize: options.input?.threadQueueSize ?? 64, channel: options.input?.channel, }, tryFormats: (options.tryFormats || [options.input?.format, 'yuyv422', 'mjpeg', 'rgb24']) .filter((v, i, a) => a.indexOf(v) === i), }; this.proc = null; this.clients = new Set(); this.startedAt = null; this.latestFrame = null; this.splitter = null; this.formatIdx = 0; this.currentFormat = this.opts.tryFormats[this.formatIdx]; this._restarting = false; this._backoffMs = 500; this._maxBackoffMs = 8000; this._stderrBuf = []; this._stderrMaxLines = 8; this._quickFailCount = 0; this._quickFailLimit = 6; this._suspendedUntil = 0; } get running() { return !!this.proc; } _scaling() { return Number(this.opts.width) > 0 && Number(this.opts.height) > 0; } _buildFfmpegArgs() { const outFps = this.opts.fps; const quality = this.opts.quality; const scaling = this._scaling(); const inFmt = this.currentFormat; const inFps = this.opts.input.fps; const inSize = this.opts.input.size; const useWallclock = this.opts.input.useWallclock; const tqs = this.opts.input.threadQueueSize; const inChannel = this.opts.input.channel; const args = [ '-hide_banner', '-loglevel', 'error', '-nostdin', '-f', 'video4linux2', ...(tqs ? ['-thread_queue_size', String(tqs)] : []), ...(inFmt ? ['-input_format', String(inFmt)] : []), ...(inFps ? ['-framerate', String(inFps)] : []), ...(inSize ? ['-video_size', String(inSize)] : []), ...(typeof inChannel === 'number' ? ['-channel', String(inChannel)] : []), ...(useWallclock ? ['-use_wallclock_as_timestamps', '1'] : []), '-i', this.devicePath, '-fflags', 'nobuffer', '-flags', 'low_delay', '-an', '-sn', ]; if (inFmt === 'mjpeg' && !scaling) { args.push('-vsync', 'passthrough', '-c:v', 'copy', '-f', 'mjpeg', 'pipe:1'); return args; } if (scaling) args.push('-vf', `scale=${Number(this.opts.width)}:${Number(this.opts.height)}`); if (outFps) args.push('-r', String(outFps)); args.push('-f', 'mjpeg', '-q:v', String(quality), 'pipe:1'); return args; } _logStderr(d) { const s = d.toString().trim(); if (!s) return; this._stderrBuf.push(s); if (this._stderrBuf.length > this._stderrMaxLines) this._stderrBuf.shift(); } start() { if (this.proc) return; if (Date.now() < this._suspendedUntil) { const wait = this._suspendedUntil - Date.now(); console.warn(`[FFmpeg] ${this.name} suspended for ${wait}ms due to repeated failures`); return setTimeout(() => this.start(), wait); } const args = this._buildFfmpegArgs(); console.log(`[FFmpeg] Start ${this.devicePath} (${this.name}) :: ${args.join(' ')}`); this._stderrBuf = []; this.proc = spawn('ffmpeg', args, { stdio: ['ignore', 'pipe', 'pipe'], detached: true }); this.startedAt = Date.now(); this.splitter = new JpegFrameSplitter((frame) => { this.latestFrame = frame; this._broadcast(frame); }); this.proc.stdout.on('data', (chunk) => this.splitter?.push(chunk)); this.proc.stderr.on('data', (d) => this._logStderr(d)); this.proc.on('exit', (code, signal) => { console.warn(`[FFmpeg] ${this.devicePath} exited code=${code} sig=${signal}`); if (this._stderrBuf.length) console.warn(`[FFmpeg] ${this.name} last errors:\n - ${this._stderrBuf.join('\n - ')}`); this.proc = null; const quick = (Date.now() - (this.startedAt || Date.now())) < 2000; this.startedAt = null; if (quick && !this._restarting) { this._quickFailCount++; this.formatIdx = (this.formatIdx + 1) % this.opts.tryFormats.length; this.currentFormat = this.opts.tryFormats[this.formatIdx]; console.warn(`[FFmpeg] ${this.name}: quick failure -> trying next format: ${this.currentFormat}`); } else { this._quickFailCount = 0; } if (this._quickFailCount >= this._quickFailLimit) { this._suspendedUntil = Date.now() + 60000; // 60s pause this._quickFailCount = 0; console.error(`[FFmpeg] ${this.name}: too many quick failures; suspending restarts for 60s`); } const delay = this._restarting ? 300 : Math.min(this._backoffMs, this._maxBackoffMs); setTimeout(() => { if (!this._restarting) this._backoffMs = Math.min(this._backoffMs * 2, this._maxBackoffMs); else this._backoffMs = 500; this.start(); }, delay); this._restarting = false; }); } _killProcessGroup(signal = 'SIGTERM') { if (!this.proc) return; try { if (process.platform !== 'win32') process.kill(-this.proc.pid, signal); else this.proc.kill(signal); } catch {} } stop() { if (!this.proc) return; this._restarting = false; this._killProcessGroup('SIGTERM'); } restart(newOpts = {}) { this._restarting = true; if (newOpts.input) this.opts.input = { ...this.opts.input, ...newOpts.input }; this.opts = { ...this.opts, ...newOpts, input: this.opts.input }; if (newOpts.input && Object.prototype.hasOwnProperty.call(newOpts.input, 'format')) { const idx = this.opts.tryFormats.indexOf(this.opts.input.format); if (idx >= 0) { this.formatIdx = idx; this.currentFormat = this.opts.tryFormats[this.formatIdx]; } } if (this.proc) this._killProcessGroup('SIGTERM'); else { this._restarting = false; this.start(); } } attach(ws) { this.clients.add(ws); if (this.latestFrame && ws.readyState === WebSocket.OPEN) { try { ws.send(this.latestFrame, { binary: true }); } catch {} } ws.on('close', () => this.clients.delete(ws)); } snapshot(toFile) { if (!this.latestFrame) throw new Error('No frame available yet'); fs.writeFileSync(toFile, this.latestFrame); return toFile; } _broadcast(frame) { if (!this.clients.size) return; for (const ws of this.clients) { if (ws.readyState !== WebSocket.OPEN) continue; if (ws.bufferedAmount > 512 * 1024) continue; // drop if back-pressured try { ws.send(frame, { binary: true }); } catch {} } } } module.exports = { FFmpegStreamer };