// server.js 'use strict'; const fs = require('fs'); const path = require('path'); const https = require('https'); const express = require('express'); const helmet = require('helmet'); const compression = require('compression'); const WebSocket = require('ws'); const { FFmpegStreamer } = require('./programs/videoServer'); const { pickDevices } = require('./programs/input'); const driverWS = require("./programs/driver"); const screenShot = require("./programs/screenShot") const {logHttpRequest, logTcpConnection, logHttpUpgrade, logWssConnected, logWssClosed, connected, connectionLost} = require('./programs/log'); const PORT = Number(process.env.PORT || 8443); const HOST = process.env.HOST || '0.0.0.0'; // --- TLS --- const TLS_DIR = path.resolve(__dirname, 'https'); const serverOptions = { key: fs.readFileSync(path.join(TLS_DIR, 'server.key')), cert: fs.readFileSync(path.join(TLS_DIR, 'server.crt')), ...(fs.existsSync(path.join(TLS_DIR, 'dhparam.pem')) ? { dhparam: fs.readFileSync(path.join(TLS_DIR, 'dhparam.pem')) } : {}), requestCert: false, rejectUnauthorized: false, // dev-friendly for self-signed }; // --- Express + CSP --- const app = express(); app.disable('x-powered-by'); app.use(compression()); app.use( helmet({ contentSecurityPolicy: { useDefaults: true, directives: { "default-src": ["'self'"], "script-src": ["'self'"], // no inline JS "style-src": ["'self'", "'unsafe-inline'"], "img-src": ["'self'", "data:"], "connect-src": ["'self'"], // same-origin WSS "object-src": ["'none'"], "base-uri": ["'self'"], "frame-ancestors": ["'self'"] } }, }) ); app.use(express.static(path.join(__dirname, 'public'), { etag: true, maxAge: '1h' })); app.get('/health', (_req, res) => res.status(200).send('ok')); // --- HTTPS server --- const server = https.createServer(serverOptions, app); // Track sockets so shutdown is clean const activeHttpSockets = new Set(); server.on('connection', (socket) => { activeHttpSockets.add(socket); socket.on('close', () => activeHttpSockets.delete(socket)); }); // --- WebSocket server --- const wss = new WebSocket.Server({ noServer: true, perMessageDeflate: false }); // Heartbeat function installHeartbeat(ws) { ws.isAlive = true; ws.on('pong', () => { ws.isAlive = true; }); } const pingInterval = setInterval(() => { wss.clients.forEach((ws) => { if (!ws.isAlive) return ws.terminate(); ws.isAlive = false; try { ws.ping(); } catch {} }); }, 30000); // Upgrade routing server.on('upgrade', (request, socket, head) => { try { socket.setNoDelay(true); } catch {} const { url } = request; if (!url || !url.startsWith('/ws/')) return socket.destroy(); if (url === '/ws/robot') { console.log('Robot requested'); wssInput.handleUpgrade(request, socket, head, (ws) => { ws.upgradePath = url; wssInput.emit('connection', ws, request); }); return; } wss.handleUpgrade(request, socket, head, (ws) => { ws.upgradePath = url; wss.emit('connection', ws, request); }); }); // --- Streams --- const [DEV0, DEV1] = pickDevices(process.env); console.log(`[DEV] Using devices: ${DEV0} (video0), ${DEV1} (video1)`); // Cam0: MJPEG pass-through if available (lowest latency) const cam0 = new FFmpegStreamer(DEV0, { name: 'video0', fps: 30, quality: 8, // 5 wäre besser input: { format: 'mjpeg', fps: 30, //size: '640x480', size: '1280x960', useWallclock: true, threadQueueSize: 64, channel: 0, }, tryFormats: ['mjpeg', 'yuyv422', 'rgb24'], }); // Cam1: your working timing on /dev/video2; let driver pick format first const cam1 = new FFmpegStreamer(DEV1, { name: 'video1', fps: 30, quality: 8, // 5 wäre besser input: { format: 'mjpeg', // driver decides fps: 30, //size: '640x480', size: '1280x960', useWallclock: true, threadQueueSize: 64, channel: 0, }, tryFormats: ['mjpeg', 'yuyv422', 'rgb24'], }); cam0.start(); cam1.start(); const wssInput = new WebSocket.Server({ noServer: true }); // WS connections wss.on('connection', (ws, req) => { logWssConnected(req); installHeartbeat(ws); const pathName = ws.upgradePath || req.url || ''; const controlEnabled = pathName === '/ws/video0'; if (pathName === '/ws/video0') cam0.attach(ws); else if (pathName === '/ws/video1') cam1.attach(ws); else return ws.close(1008, 'Unknown WS path'); // If the streamer isn't running when a client connects, try to start it. // This makes the system more resilient to transient ffmpeg failures. try { if (pathName === '/ws/video0' && !cam0.running) { console.log('[WS] client connected to /ws/video0 — starting cam0'); cam0.start(); } if (pathName === '/ws/video1' && !cam1.running) { console.log('[WS] client connected to /ws/video1 — starting cam1'); cam1.start(); } } catch (err) { console.warn('[WS] failed to start streamer:', err?.message || err); } ws.on('message', (data, isBinary) => { if (isBinary || !controlEnabled) return; let msg; try { msg = JSON.parse(data.toString()); } catch { return ws.send(JSON.stringify({ type: 'error', error: 'Invalid JSON' })); } handleControlMessage(ws, msg); }); ws.send(JSON.stringify({ type: 'hello', path: pathName, control: controlEnabled, camera: pathName.endsWith('video0') ? 'video0' : 'video1', })); ws.on('close', (code, reason) => { logWssClosed(req, code, reason); }); }); wss.on("upgrade", (req, socket, head) => { if (req.url === "/ws/robot") { console.log("Robot requested"); wssInput.handleUpgrade(req, socket, head, (ws) => { wssInput.emit("connection", ws, req); }); } }); const targetServer = process.env.TARGET_SERVER || 'wss://localhost:2095'; process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; driverWS.setupCommandForwarding(wssInput, targetServer); // Control channel (video0 only) function handleControlMessage(ws, msg) { if (msg?.type !== 'control') { ws.send(JSON.stringify({ type: 'error', error: 'Unsupported message type' })); return; } try { switch (msg.action) { case 'snapshot': { const outDir = path.join(__dirname, 'public', 'snapshots'); screenShot.snapshot(outDir, cam0, cam1, ws); break; } case 'setParams': { const p = msg.params || {}; cam0.restart({ ...(Number(p.width) ? { width: Number(p.width) } : { width: undefined }), ...(Number(p.height) ? { height: Number(p.height) } : { height: undefined }), ...(Number(p.fps) ? { fps: Number(p.fps) } : {}), ...(Number(p.quality) ? { quality: Number(p.quality) } : {}), ...(p.input ? { input: { ...cam0.opts.input, ...p.input } } : {}), }); ws.send(JSON.stringify({ type: 'ack', action: 'setParams', params: { ...cam0.opts } })); break; } case 'start': cam0.start(); ws.send(JSON.stringify({ type: 'ack', action: 'start' })); break; case 'stop': cam0.stop(); ws.send(JSON.stringify({ type: 'ack', action: 'stop' })); break; default: ws.send(JSON.stringify({ type: 'error', error: `Unknown action: ${msg.action}` })); } } catch (err) { ws.send(JSON.stringify({ type: 'error', error: err.message })); } } // Start server.listen(PORT, HOST, () => { console.log(`HTTPS server listening on https://${HOST}:${PORT}`); console.log(`Open https://localhost:${PORT} (accept the self-signed certificate in dev)`); }); // Graceful shutdown let shuttingDown = false; async function shutdown(code = 0) { if (shuttingDown) return; shuttingDown = true; console.log('\nShutting down…'); try { clearInterval(pingInterval); } catch {} try { cam0.stop(); } catch {} try { cam1.stop(); } catch {} try { wss.clients.forEach(ws => { try { ws.terminate(); } catch {} }); } catch {} try { await new Promise(res => wss.close(() => res())); } catch {} try { for (const s of activeHttpSockets) { try { s.destroy(); } catch {} } } catch {} try { await new Promise(res => server.close(() => res())); } catch {} setTimeout(() => { console.warn('Force exiting…'); process.exit(code); }, 1500).unref(); process.exit(code); } process.once('SIGINT', () => shutdown(0)); process.once('SIGTERM', () => shutdown(0)); process.once('uncaughtException', (e) => { console.error(e); shutdown(1); }); process.once('unhandledRejection', (r) => { console.error(r); shutdown(1); });