Committed and Pushed by ButtonClick
This commit is contained in:
234
programs/videoServer.js
Executable file
234
programs/videoServer.js
Executable file
@@ -0,0 +1,234 @@
|
||||
// programs/videoServer.js
|
||||
'use strict';
|
||||
|
||||
const fs = require('fs');
|
||||
const { spawn } = require('child_process');
|
||||
const WebSocket = require('ws');
|
||||
|
||||
class JpegFrameSplitter {
|
||||
constructor(onFrame) {
|
||||
this.onFrame = onFrame;
|
||||
this.buffer = Buffer.alloc(0);
|
||||
}
|
||||
push(chunk) {
|
||||
if (!chunk || !chunk.length) return;
|
||||
this.buffer = Buffer.concat([this.buffer, chunk]);
|
||||
let start = this.buffer.indexOf(Buffer.from([0xFF, 0xD8]));
|
||||
while (start !== -1) {
|
||||
const end = this.buffer.indexOf(Buffer.from([0xFF, 0xD9]), start + 2);
|
||||
if (end === -1) break;
|
||||
const frame = this.buffer.slice(start, end + 2);
|
||||
try { this.onFrame(frame); } catch {}
|
||||
this.buffer = this.buffer.slice(end + 2);
|
||||
start = this.buffer.indexOf(Buffer.from([0xFF, 0xD8]));
|
||||
}
|
||||
if (this.buffer.length > 8 * 1024 * 1024) {
|
||||
const nextSOI = this.buffer.indexOf(Buffer.from([0xFF, 0xD8]));
|
||||
this.buffer = nextSOI !== -1 ? this.buffer.slice(nextSOI) : Buffer.alloc(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class FFmpegStreamer {
|
||||
/**
|
||||
* devicePath: '/dev/videoX'
|
||||
* options: {
|
||||
* name, width, height, fps, quality,
|
||||
* input: { format, fps, size, useWallclock, threadQueueSize, channel },
|
||||
* tryFormats: [ 'mjpeg', undefined, 'yuyv422', 'rgb24' ]
|
||||
* }
|
||||
*/
|
||||
constructor(devicePath, options = {}) {
|
||||
this.devicePath = devicePath;
|
||||
this.name = options.name || devicePath;
|
||||
this.opts = {
|
||||
width: options.width ?? undefined,
|
||||
height: options.height ?? undefined,
|
||||
fps: options.fps ?? 20,
|
||||
quality: options.quality ?? 5,
|
||||
input: {
|
||||
format: options.input?.format,
|
||||
fps: options.input?.fps,
|
||||
size: options.input?.size,
|
||||
useWallclock: options.input?.useWallclock ?? true,
|
||||
threadQueueSize: options.input?.threadQueueSize ?? 64,
|
||||
channel: options.input?.channel,
|
||||
},
|
||||
tryFormats: (options.tryFormats || [options.input?.format, 'yuyv422', 'mjpeg', 'rgb24'])
|
||||
.filter((v, i, a) => a.indexOf(v) === i),
|
||||
};
|
||||
|
||||
this.proc = null;
|
||||
this.clients = new Set();
|
||||
this.startedAt = null;
|
||||
this.latestFrame = null;
|
||||
this.splitter = null;
|
||||
|
||||
this.formatIdx = 0;
|
||||
this.currentFormat = this.opts.tryFormats[this.formatIdx];
|
||||
|
||||
this._restarting = false;
|
||||
this._backoffMs = 500;
|
||||
this._maxBackoffMs = 8000;
|
||||
this._stderrBuf = [];
|
||||
this._stderrMaxLines = 8;
|
||||
|
||||
this._quickFailCount = 0;
|
||||
this._quickFailLimit = 6;
|
||||
this._suspendedUntil = 0;
|
||||
}
|
||||
|
||||
get running() { return !!this.proc; }
|
||||
_scaling() { return Number(this.opts.width) > 0 && Number(this.opts.height) > 0; }
|
||||
|
||||
_buildFfmpegArgs() {
|
||||
const outFps = this.opts.fps;
|
||||
const quality = this.opts.quality;
|
||||
const scaling = this._scaling();
|
||||
|
||||
const inFmt = this.currentFormat;
|
||||
const inFps = this.opts.input.fps;
|
||||
const inSize = this.opts.input.size;
|
||||
const useWallclock = this.opts.input.useWallclock;
|
||||
const tqs = this.opts.input.threadQueueSize;
|
||||
const inChannel = this.opts.input.channel;
|
||||
|
||||
const args = [
|
||||
'-hide_banner', '-loglevel', 'error', '-nostdin',
|
||||
'-f', 'video4linux2',
|
||||
...(tqs ? ['-thread_queue_size', String(tqs)] : []),
|
||||
...(inFmt ? ['-input_format', String(inFmt)] : []),
|
||||
...(inFps ? ['-framerate', String(inFps)] : []),
|
||||
...(inSize ? ['-video_size', String(inSize)] : []),
|
||||
...(typeof inChannel === 'number' ? ['-channel', String(inChannel)] : []),
|
||||
...(useWallclock ? ['-use_wallclock_as_timestamps', '1'] : []),
|
||||
'-i', this.devicePath,
|
||||
'-fflags', 'nobuffer', '-flags', 'low_delay', '-an', '-sn',
|
||||
];
|
||||
|
||||
if (inFmt === 'mjpeg' && !scaling) {
|
||||
args.push('-vsync', 'passthrough', '-c:v', 'copy', '-f', 'mjpeg', 'pipe:1');
|
||||
return args;
|
||||
}
|
||||
if (scaling) args.push('-vf', `scale=${Number(this.opts.width)}:${Number(this.opts.height)}`);
|
||||
if (outFps) args.push('-r', String(outFps));
|
||||
args.push('-f', 'mjpeg', '-q:v', String(quality), 'pipe:1');
|
||||
return args;
|
||||
}
|
||||
|
||||
_logStderr(d) {
|
||||
const s = d.toString().trim();
|
||||
if (!s) return;
|
||||
this._stderrBuf.push(s);
|
||||
if (this._stderrBuf.length > this._stderrMaxLines) this._stderrBuf.shift();
|
||||
}
|
||||
|
||||
start() {
|
||||
if (this.proc) return;
|
||||
if (Date.now() < this._suspendedUntil) {
|
||||
const wait = this._suspendedUntil - Date.now();
|
||||
console.warn(`[FFmpeg] ${this.name} suspended for ${wait}ms due to repeated failures`);
|
||||
return setTimeout(() => this.start(), wait);
|
||||
}
|
||||
|
||||
const args = this._buildFfmpegArgs();
|
||||
console.log(`[FFmpeg] Start ${this.devicePath} (${this.name}) :: ${args.join(' ')}`);
|
||||
|
||||
this._stderrBuf = [];
|
||||
this.proc = spawn('ffmpeg', args, { stdio: ['ignore', 'pipe', 'pipe'], detached: true });
|
||||
this.startedAt = Date.now();
|
||||
|
||||
this.splitter = new JpegFrameSplitter((frame) => {
|
||||
this.latestFrame = frame;
|
||||
this._broadcast(frame);
|
||||
});
|
||||
|
||||
this.proc.stdout.on('data', (chunk) => this.splitter?.push(chunk));
|
||||
this.proc.stderr.on('data', (d) => this._logStderr(d));
|
||||
|
||||
this.proc.on('exit', (code, signal) => {
|
||||
console.warn(`[FFmpeg] ${this.devicePath} exited code=${code} sig=${signal}`);
|
||||
if (this._stderrBuf.length) console.warn(`[FFmpeg] ${this.name} last errors:\n - ${this._stderrBuf.join('\n - ')}`);
|
||||
|
||||
this.proc = null;
|
||||
const quick = (Date.now() - (this.startedAt || Date.now())) < 2000;
|
||||
this.startedAt = null;
|
||||
|
||||
if (quick && !this._restarting) {
|
||||
this._quickFailCount++;
|
||||
this.formatIdx = (this.formatIdx + 1) % this.opts.tryFormats.length;
|
||||
this.currentFormat = this.opts.tryFormats[this.formatIdx];
|
||||
console.warn(`[FFmpeg] ${this.name}: quick failure -> trying next format: ${this.currentFormat}`);
|
||||
} else {
|
||||
this._quickFailCount = 0;
|
||||
}
|
||||
|
||||
if (this._quickFailCount >= this._quickFailLimit) {
|
||||
this._suspendedUntil = Date.now() + 60000; // 60s pause
|
||||
this._quickFailCount = 0;
|
||||
console.error(`[FFmpeg] ${this.name}: too many quick failures; suspending restarts for 60s`);
|
||||
}
|
||||
|
||||
const delay = this._restarting ? 300 : Math.min(this._backoffMs, this._maxBackoffMs);
|
||||
setTimeout(() => {
|
||||
if (!this._restarting) this._backoffMs = Math.min(this._backoffMs * 2, this._maxBackoffMs);
|
||||
else this._backoffMs = 500;
|
||||
this.start();
|
||||
}, delay);
|
||||
this._restarting = false;
|
||||
});
|
||||
}
|
||||
|
||||
_killProcessGroup(signal = 'SIGTERM') {
|
||||
if (!this.proc) return;
|
||||
try {
|
||||
if (process.platform !== 'win32') process.kill(-this.proc.pid, signal);
|
||||
else this.proc.kill(signal);
|
||||
} catch {}
|
||||
}
|
||||
|
||||
stop() {
|
||||
if (!this.proc) return;
|
||||
this._restarting = false;
|
||||
this._killProcessGroup('SIGTERM');
|
||||
}
|
||||
|
||||
restart(newOpts = {}) {
|
||||
this._restarting = true;
|
||||
if (newOpts.input) this.opts.input = { ...this.opts.input, ...newOpts.input };
|
||||
this.opts = { ...this.opts, ...newOpts, input: this.opts.input };
|
||||
if (newOpts.input && Object.prototype.hasOwnProperty.call(newOpts.input, 'format')) {
|
||||
const idx = this.opts.tryFormats.indexOf(this.opts.input.format);
|
||||
if (idx >= 0) {
|
||||
this.formatIdx = idx;
|
||||
this.currentFormat = this.opts.tryFormats[this.formatIdx];
|
||||
}
|
||||
}
|
||||
if (this.proc) this._killProcessGroup('SIGTERM'); else { this._restarting = false; this.start(); }
|
||||
}
|
||||
|
||||
attach(ws) {
|
||||
this.clients.add(ws);
|
||||
if (this.latestFrame && ws.readyState === WebSocket.OPEN) {
|
||||
try { ws.send(this.latestFrame, { binary: true }); } catch {}
|
||||
}
|
||||
ws.on('close', () => this.clients.delete(ws));
|
||||
}
|
||||
|
||||
snapshot(toFile) {
|
||||
if (!this.latestFrame) throw new Error('No frame available yet');
|
||||
fs.writeFileSync(toFile, this.latestFrame);
|
||||
return toFile;
|
||||
}
|
||||
|
||||
_broadcast(frame) {
|
||||
if (!this.clients.size) return;
|
||||
for (const ws of this.clients) {
|
||||
if (ws.readyState !== WebSocket.OPEN) continue;
|
||||
if (ws.bufferedAmount > 512 * 1024) continue; // drop if back-pressured
|
||||
try { ws.send(frame, { binary: true }); } catch {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = { FFmpegStreamer };
|
||||
Reference in New Issue
Block a user