Umbau mit cameraSwitch

This commit is contained in:
chk
2026-06-05 06:36:48 +02:00
parent 0ea475d6b6
commit 8c8c769e22
10 changed files with 641 additions and 839 deletions

259
src/cameraSwitch.js Normal file
View File

@@ -0,0 +1,259 @@
'use strict';
const { spawn } = require('child_process');
const EventEmitter = require('events');
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
// Liest Bildbreite aus JPEG-Header (SOF0/SOF2-Marker) ohne externe Bibliothek.
// Gibt null zurück wenn der Marker nicht gefunden wird.
function readJpegWidth(buf) {
let i = 2; // SOI (FF D8) überspringen
while (i < buf.length - 8) {
if (buf[i] !== 0xFF) break;
const marker = buf[i + 1];
const segLen = buf.readUInt16BE(i + 2);
if (marker === 0xC0 || marker === 0xC2) {
return buf.readUInt16BE(i + 7); // Breite bei Offset +7 im SOF
}
i += 2 + segLen;
}
return null;
}
// ── Parser für FFmpeg `-f mpjpeg` ────────────────────────────────────────────
// FFmpeg schreibt pro Frame: --<boundary>\r\nContent-Type: image/jpeg\r\n
// Content-Length: <n>\r\n\r\n<n bytes JPEG>\r\n
// Wir keyen auf Content-Length → deterministisch, unabhängig vom Boundary-String.
class MpjpegParser {
constructor(onFrame) {
this.onFrame = onFrame;
this.buf = Buffer.alloc(0);
this.need = -1; // -1 = Header-Modus, sonst erwartete Body-Bytes
}
push(chunk) {
this.buf = this.buf.length ? Buffer.concat([this.buf, chunk]) : chunk;
for (;;) {
if (this.need < 0) {
const headEnd = this.buf.indexOf('\r\n\r\n');
if (headEnd < 0) {
// Schutz gegen unbegrenztes Puffern bei unerwartetem Müll
if (this.buf.length > (1 << 20)) this.buf = this.buf.subarray(this.buf.length - 4096);
return;
}
const header = this.buf.toString('latin1', 0, headEnd);
const m = /content-length:\s*(\d+)/i.exec(header);
if (!m) { this.buf = this.buf.subarray(headEnd + 4); continue; }
this.need = parseInt(m[1], 10);
this.buf = this.buf.subarray(headEnd + 4);
}
if (this.buf.length < this.need) return;
const frame = this.buf.subarray(0, this.need);
this.buf = this.buf.subarray(this.need);
this.need = -1;
try { this.onFrame(frame); } catch (_e) { /* Consumer-Fehler ignorieren */ }
}
}
}
// ── CameraSwitch ─────────────────────────────────────────────────────────────
// Eine Instanz pro physischem Gerät. Der EINZIGE Öffner von /dev/videoN.
// Hält IMMER nur EINEN FFmpeg-Prozess: entweder Live (640) oder HD-Grab (1280) —
// NIE beide. Der Übergang wird über das `close`-Event des FFmpeg-Kindprozesses
// synchronisiert: Prozess weg ⇒ Kernel hat den Device-FD geschlossen ⇒ Gerät frei.
// Genau dieses Signal verweigert go2rtcs API — deshalb der Eigenbau.
//
// Events: 'frame' (Buffer) je ein Live-JPEG
class CameraSwitch extends EventEmitter {
constructor({ id, device, liveSize = '640x480', liveFps = 30, hiresSize = '1280x960', hiresFps = 15 }) {
super();
this.setMaxListeners(0); // beliebig viele Stream-Clients
this.id = id;
this.device = device;
this.liveSize = liveSize;
this.liveFps = liveFps;
this.hiresSize = hiresSize;
this.hiresFps = hiresFps;
this.proc = null; // aktueller FFmpeg-Prozess (Live ODER Grab)
this.latest = null; // letztes Live-JPEG (für /api/snapshot)
this.state = 'stopped'; // stopped | live | grabbing
this.lock = false; // Mutex: nur ein Grab gleichzeitig
this.stopping = false; // unterscheidet absichtliches Kill von Crash
this.restartTimer = null;
}
start() {
if (this.state === 'stopped' && !this.proc) this._spawnLive();
}
// ── Live-Producer (Dauerbetrieb, Auto-Restart bei Crash) ───────────────────
_spawnLive() {
this.stopping = false;
const args = [
'-hide_banner', '-loglevel', 'warning',
'-f', 'v4l2', '-input_format', 'mjpeg',
'-video_size', this.liveSize, '-framerate', String(this.liveFps),
'-i', this.device,
'-c:v', 'copy', '-f', 'mpjpeg', 'pipe:1',
];
let p;
try {
p = spawn('ffmpeg', args, { stdio: ['ignore', 'pipe', 'pipe'] });
} catch (e) {
console.error(`[cam ${this.id}] spawn fehlgeschlagen: ${e.message} → Retry in 1.5s`);
this._scheduleRestart();
return;
}
this.proc = p;
this.state = 'live';
const parser = new MpjpegParser((frame) => {
this.latest = frame;
this.emit('frame', frame);
});
p.stdout.on('data', (c) => parser.push(c));
p.stderr.on('data', (c) => {
const s = c.toString();
if (/error|busy|invalid|no such|cannot|denied/i.test(s)) console.error(`[cam ${this.id}] ffmpeg: ${s.trim()}`);
});
p.on('error', (e) => console.error(`[cam ${this.id}] live ffmpeg error: ${e.message}`));
p.on('close', (code, sig) => {
this.proc = null;
const wasStopping = this.stopping;
if (this.state === 'live') this.state = 'stopped';
if (wasStopping) return; // beabsichtigt (HD-Grab) → grabHires startet Live neu
console.warn(`[cam ${this.id}] live ffmpeg unerwartet beendet (code=${code} sig=${sig}) → Restart`);
this._scheduleRestart();
});
console.log(`[cam ${this.id}] live gestartet (${this.liveSize}@${this.liveFps}, ${this.device})`);
}
_scheduleRestart() {
if (this.restartTimer) return;
this.restartTimer = setTimeout(() => {
this.restartTimer = null;
if (this.state === 'stopped' && !this.lock) this._spawnLive();
}, 1500);
}
// ── HD-Grab: Live sauber stoppen → 1280 greifen → Live zurück ──────────────
// Garantie: zwischen Stop und 1280-Start liegt das `close`-Event des Live-
// FFmpeg → /dev/videoN ist frei. Niemals zwei Encoder gleichzeitig.
async grabHires(opts = {}) {
const { minSize = 15000, minWidth = 1000, settleFrames = 6, maxWaitMs = 6000 } = opts;
if (this.lock) throw new Error('HD-Grab läuft bereits');
this.lock = true;
const t0 = Date.now();
if (this.restartTimer) { clearTimeout(this.restartTimer); this.restartTimer = null; }
try {
// 1. Live-FFmpeg beenden, auf Prozess-Ende warten (= Device-FD frei)
await this._killCurrentAndWait();
this.state = 'grabbing';
console.log(`[cam ${this.id}] HD: Live gestoppt nach ${Date.now() - t0}ms, Gerät frei → 1280-Grab`);
// 2. 1280-FFmpeg starten, warmlaufen lassen, besten Frame greifen
const jpeg = await this._captureHires({ minSize, minWidth, settleFrames, maxWaitMs });
console.log(`[cam ${this.id}] HD OK ${jpeg.length} bytes, Breite=${readJpegWidth(jpeg) ?? '?'} (${Date.now() - t0}ms)`);
return jpeg;
} finally {
// 3. IMMER zurück auf Live (auch bei Fehler) Live hat Priorität
this.state = 'stopped';
this._spawnLive();
this.lock = false;
console.log(`[cam ${this.id}] HD beendet, Live zurück (gesamt ${Date.now() - t0}ms)`);
}
}
// Beendet den aktuellen Prozess und resolved erst nach dessen 'close' (FD frei).
_killCurrentAndWait(timeoutMs = 4000) {
return new Promise((resolve) => {
const p = this.proc;
if (!p) return resolve();
this.stopping = true;
let done = false;
const fin = () => { if (!done) { done = true; resolve(); } };
p.once('close', fin);
try { p.kill('SIGTERM'); } catch (_e) { /* schon weg */ }
setTimeout(() => { if (!done) { try { p.kill('SIGKILL'); } catch (_e) {} } }, Math.max(500, timeoutMs - 1000));
setTimeout(fin, timeoutMs); // Sicherheitsnetz
});
}
_captureHires({ minSize, minWidth, settleFrames, maxWaitMs }) {
return new Promise((resolve, reject) => {
const args = [
'-hide_banner', '-loglevel', 'warning',
'-f', 'v4l2', '-input_format', 'mjpeg',
'-video_size', this.hiresSize, '-framerate', String(this.hiresFps),
'-i', this.device,
'-c:v', 'copy', '-f', 'mpjpeg', 'pipe:1',
];
let p;
try {
p = spawn('ffmpeg', args, { stdio: ['ignore', 'pipe', 'pipe'] });
} catch (e) { return reject(e); }
this.proc = p;
this.stopping = false;
let count = 0;
let best = null;
let decided = false;
let closed = false;
let finalized = false;
let storedResult = null;
let storedErr = null;
let timer = setTimeout(() => decide(best, best ? null : new Error('HD-Timeout')), maxWaitMs);
let hardFin = null;
function finalize(self) {
if (finalized) return;
finalized = true;
if (hardFin) clearTimeout(hardFin);
self.proc = null;
if (storedResult) resolve(storedResult);
else reject(storedErr || new Error('kein HD-Frame'));
}
const self = this;
function decide(result, err) {
if (decided) return;
decided = true;
storedResult = result;
storedErr = err;
if (timer) { clearTimeout(timer); timer = null; }
if (closed) { finalize(self); return; }
// Prozess beenden; finalize() erst nach 'close' (= FD frei)
self.stopping = true;
try { p.kill('SIGTERM'); } catch (_e) {}
setTimeout(() => { if (!closed) { try { p.kill('SIGKILL'); } catch (_e) {} } }, 800);
hardFin = setTimeout(() => finalize(self), 2500); // falls 'close' ausbleibt
}
const parser = new MpjpegParser((frame) => {
count++;
if (!best || frame.length > best.length) best = frame;
const w = readJpegWidth(frame);
if (count >= settleFrames && frame.length >= minSize && (w === null || w >= minWidth)) {
decide(Buffer.from(frame), null); // kopieren: subarray teilt den Parser-Puffer
}
});
p.stdout.on('data', (c) => parser.push(c));
p.stderr.on('data', (c) => {
const s = c.toString();
if (/error|busy|invalid|no such|cannot|denied/i.test(s)) console.error(`[cam ${this.id}] hires ffmpeg: ${s.trim()}`);
});
p.on('error', (e) => { if (!decided) decide(null, e); });
p.on('close', () => {
closed = true;
if (decided) finalize(self);
else decide(best ? Buffer.from(best) : null, best ? null : new Error('HD-FFmpeg vorzeitig beendet'));
});
});
}
}
module.exports = { CameraSwitch, MpjpegParser, readJpegWidth };

View File

@@ -1,253 +1,108 @@
'use strict';
const express = require('express');
const { readJpegWidth } = require('./cameraSwitch');
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
// Liest Bildbreite aus JPEG-Header (SOF0/SOF2-Marker) ohne externe Bibliothek.
// Gibt null zurück wenn der Marker nicht gefunden wird.
function readJpegWidth(buf) {
let i = 2; // SOI (FF D8) überspringen
while (i < buf.length - 8) {
if (buf[i] !== 0xFF) break;
const marker = buf[i + 1];
const segLen = buf.readUInt16BE(i + 2);
if (marker === 0xC0 || marker === 0xC2) {
return buf.readUInt16BE(i + 7); // Breite bei Offset +7 im SOF
}
i += 2 + segLen;
}
return null;
}
// Stabile Snapshot-Schnittstelle für das Homing-Projekt.
// Entkoppelt den Consumer von go2rtc-Interna proxied intern auf /api/frame.jpeg.
// Stabile Schnittstellen für Viewer und Homing-Projekt lesen NUR aus den
// CameraSwitch-Instanzen (RAM-Puffer + Event-Stream). Kein Gerätezugriff hier,
// keine go2rtc-Abhängigkeit mehr.
//
// GET /api/snapshot → JSON-Liste der Kameras
// GET /api/snapshot/cam0 → 640er JPEG (live)
// GET /api/snapshot/cam0/hires → 1280×960 JPEG via cam0_hires (Phase 2)
function createSnapshotRouter(go2rtcUrl) {
// GET /api/snapshot/cam0 → letztes Live-JPEG (640) aus dem Puffer
// GET /api/snapshot/cam0/hires → HD-JPEG (1280): Schalter pausiert Live kurz
// GET /api/stream/cam0 → MJPEG multipart/x-mixed-replace (Live)
function createSnapshotRouter(switches) {
const router = express.Router();
const hiresLocks = {}; // Mutex pro Kamera: { cam0: false, cam1: false, … }
// ── PHASE 2: Hi-Res-Grab via cam0_hires (rein LESEND gegenüber cam0/cam1) ────
// Voraussetzung: Client hat seinen <video-stream> bereits entfernt (Umhängen),
// BEVOR er diesen Endpunkt ruft. cam0/cam1 werden NICHT verändert.
// cam{id}_hires muss in der go2rtc-Config definiert sein (docker-compose.yaml).
//
// Ablauf: Warten bis id 0 Consumer hat → cam_hires-Frame per frame.jpeg holen.
// Eiserne Regeln (04_Delay_roadmap.md): nur GET, kein PUT/PATCH/DELETE. ✓
router.get('/', (_req, res) => {
res.json({
cameras: Object.keys(switches).map((id) => ({ id, url: `/api/snapshot/${id}` })),
});
});
// HD-Grab: delegiert an den Schalter. Der Schalter garantiert, dass Live
// sauber gestoppt ist (Prozess-close), bevor 1280 startet → kein Race.
router.get('/:id/hires', async (req, res) => {
const { id } = req.params;
const hiresId = `${id}_hires`;
if (hiresLocks[id]) {
return res.status(429).json({ error: `Hi-Res-Grab für ${id} läuft bereits bitte warten` });
}
hiresLocks[id] = true;
const t0 = Date.now();
const sw = switches[req.params.id];
if (!sw) return res.status(404).json({ error: `Unbekannte Kamera: ${req.params.id}` });
try {
// Schritt 1: Warten bis id keine Consumer mehr hat (Gerät frei, max 8 s)
const POLL_MS = 200;
const MAX_WAIT = 8000;
const MIN_SIZE = 15000; // <15KB → Warmup-Schwarzbild, retry
let deviceFree = false;
while (Date.now() - t0 < MAX_WAIT) {
try {
const r = await fetch(`${go2rtcUrl}/api/streams`, { signal: AbortSignal.timeout(1000) });
if (r.ok) {
const streams = await r.json();
const s = streams[id];
const nC = s ? (s.consumers ?? []).length : 0;
const pRunning = s ? (s.producers ?? []).some(p => (p.state ?? '') === 'running') : false;
if (nC === 0 && !pRunning) {
deviceFree = true;
console.log(`[hires][${id}] Gerät frei nach ${Date.now() - t0}ms`);
break;
}
}
} catch (e) {
console.warn(`[hires][${id}] Poll fehlgeschlagen: ${e.message}`);
}
await sleep(POLL_MS);
}
if (!deviceFree) {
return res.status(503).json({
error: `Gerät nicht frei nach ${MAX_WAIT}ms noch ${id}-Consumer aktiv?`,
});
}
// Schritt 2: Frame greifen (cam_hires on-demand, mit Warmup-Retry)
// go2rtc öffnet /dev/videoN bei der ersten Anfrage → erste Frames können
// unterbelichtet sein → Größen-Check; Retry gibt Kamera Zeit zum Einschwingen.
const MAX_RETRIES = 4;
const RETRY_MS = 800;
let jpeg = null;
let lastWidth = null;
for (let attempt = 0; attempt < MAX_RETRIES; attempt++) {
try {
const fr = await fetch(
`${go2rtcUrl}/api/frame.jpeg?src=${encodeURIComponent(hiresId)}`,
{ signal: AbortSignal.timeout(5000) }
);
if (fr.ok) {
const buf = Buffer.from(await fr.arrayBuffer());
const w = readJpegWidth(buf);
console.log(`[hires][${id}] Versuch ${attempt + 1}: ${buf.length} bytes, Breite=${w ?? '?'}`);
if (buf.length >= MIN_SIZE && (w === null || w >= 1000)) {
jpeg = buf;
lastWidth = w;
break;
}
}
} catch (e) {
console.warn(`[hires][${id}] frame.jpeg Versuch ${attempt + 1}: ${e.message}`);
}
if (attempt < MAX_RETRIES - 1) await sleep(RETRY_MS);
}
if (!jpeg) {
return res.status(503).json({ error: 'kein verwertbarer Hi-Res-Frame (Warmup-Timeout)' });
}
console.log(`[hires][${id}] OK ${jpeg.length} bytes, Breite=${lastWidth}, Dauer=${Date.now() - t0}ms`);
const jpeg = await sw.grabHires();
res.set({
'Content-Type': 'image/jpeg',
'Content-Type': 'image/jpeg',
'Content-Length': jpeg.length,
'Cache-Control': 'no-store',
'X-Camera-Id': id,
'X-Hires-Id': hiresId,
'X-Frame-Width': String(lastWidth ?? ''),
'X-Timestamp': new Date().toISOString(),
'Cache-Control': 'no-store',
'X-Camera-Id': req.params.id,
'X-Frame-Width': String(readJpegWidth(jpeg) ?? ''),
'X-Timestamp': new Date().toISOString(),
});
res.end(jpeg);
} catch (err) {
if (!res.headersSent) res.status(503).json({ error: `hires: ${err.message}` });
} finally {
hiresLocks[id] = false;
res.status(503).json({ error: `hires: ${err.message}` });
}
});
// ── 🔬 TEMPORÄR: Diagnose-Probe für den cam_hires-Teardown (Bug 106%) ─────────
// Misst REIN LESEND, wann go2rtc den cam_hires-Producer nach einem frame.jpeg
// wirklich abbaut. Beantwortet: Leert sich das producers-Array? Wann? Geht
// consumers sofort auf 0? Daraus wird der robuste Rückweg gebaut (Weg C).
//
// VORHER im Viewer die betreffende Kamera AUSschalten (⏸), damit cam frei ist
// (sonst zwei Encoder auf einem Device = genau der 106%-Konflikt).
// curl http://<host>:8444/api/snapshot/cam0/hires-probe
// Nach der Messung diese Route + doc-Eintrag wieder entfernen.
router.get('/:id/hires-probe', async (req, res) => {
const { id } = req.params;
const hiresId = `${id}_hires`;
if (hiresLocks[id]) return res.status(429).json({ error: `${id} belegt` });
hiresLocks[id] = true;
const t0 = Date.now();
const snapHires = (streams) => {
const s = streams[hiresId];
const prods = s ? (s.producers ?? []) : [];
return {
cons: s ? (s.consumers ?? []).length : 0,
prods: prods.length,
states: prods.map(p => p.state ?? '?').join(',') || '-',
};
};
try {
// Schritt 1: warten bis cam frei (max 8s) sonst messen wir den Konflikt mit
while (Date.now() - t0 < 8000) {
const r = await fetch(`${go2rtcUrl}/api/streams`, { signal: AbortSignal.timeout(1000) }).catch(() => null);
if (r && r.ok) {
const s = (await r.json())[id];
const nC = s ? (s.consumers ?? []).length : 0;
const pR = s ? (s.producers ?? []).some(p => (p.state ?? '') === 'running') : false;
if (nC === 0 && !pR) break;
}
await sleep(200);
}
// Schritt 2: einen cam_hires-Frame holen (startet den Producer)
const fr = await fetch(`${go2rtcUrl}/api/frame.jpeg?src=${encodeURIComponent(hiresId)}`,
{ signal: AbortSignal.timeout(6000) });
const buf = fr.ok ? Buffer.from(await fr.arrayBuffer()) : null;
const tFrame = Date.now();
const frameBytes = buf ? buf.length : 0;
const frameWidth = buf ? readJpegWidth(buf) : null;
console.log(`[probe][${id}] frame: ${frameBytes} bytes, Breite=${frameWidth ?? '?'} → poll teardown…`);
// Schritt 3: 12s lang alle 100ms den cam_hires-Zustand mitschreiben
const timeline = [];
let producerGoneAtMs = null;
let consumersZeroAtMs = null;
while (Date.now() - tFrame < 12000) {
const r = await fetch(`${go2rtcUrl}/api/streams`, { signal: AbortSignal.timeout(1000) }).catch(() => null);
const t = Date.now() - tFrame;
if (r && r.ok) {
const snap = snapHires(await r.json());
timeline.push({ t, ...snap });
if (producerGoneAtMs === null && snap.prods === 0) producerGoneAtMs = t;
if (consumersZeroAtMs === null && snap.cons === 0) consumersZeroAtMs = t;
} else {
timeline.push({ t, err: true });
}
await sleep(100);
}
console.log(`[probe][${id}] producerGoneAtMs=${producerGoneAtMs} consumersZeroAtMs=${consumersZeroAtMs}`);
console.log(`[probe][${id}] timeline:`, JSON.stringify(timeline));
res.json({ hiresId, frameBytes, frameWidth, producerGoneAtMs, consumersZeroAtMs, timeline });
} catch (err) {
if (!res.headersSent) res.status(503).json({ error: `probe: ${err.message}` });
} finally {
hiresLocks[id] = false;
}
});
router.get('/', async (_req, res) => {
try {
const r = await fetch(`${go2rtcUrl}/api/streams`);
if (!r.ok) throw new Error(`go2rtc HTTP ${r.status}`);
const streams = await r.json();
res.json({
cameras: Object.keys(streams)
.filter(id => !id.endsWith('_hires'))
.map(id => ({ id, url: `/api/snapshot/${id}` })),
});
} catch (err) {
res.status(503).json({ error: `go2rtc nicht erreichbar: ${err.message}` });
}
});
router.get('/:id', async (req, res) => {
const { id } = req.params;
try {
const upstream = await fetch(
`${go2rtcUrl}/api/frame.jpeg?src=${encodeURIComponent(id)}`
);
if (!upstream.ok) {
return res.status(upstream.status).json({ error: `kein Frame (${id})` });
}
const buf = Buffer.from(await upstream.arrayBuffer());
res.set({
'Content-Type': 'image/jpeg',
'Content-Length': buf.length,
'Cache-Control': 'no-store',
'X-Camera-Id': id,
'X-Timestamp': new Date().toISOString(),
});
res.end(buf);
} catch (err) {
res.status(503).json({ error: `go2rtc: ${err.message}` });
}
router.get('/:id', (req, res) => {
const sw = switches[req.params.id];
if (!sw) return res.status(404).json({ error: `Unbekannte Kamera: ${req.params.id}` });
const frame = sw.latest;
if (!frame) return res.status(503).json({ error: 'noch kein Frame verfügbar' });
res.set({
'Content-Type': 'image/jpeg',
'Content-Length': frame.length,
'Cache-Control': 'no-store',
'X-Camera-Id': req.params.id,
'X-Timestamp': new Date().toISOString(),
});
res.end(frame);
});
return router;
}
module.exports = { createSnapshotRouter };
// MJPEG-Live-Stream als multipart/x-mixed-replace. Ein FFmpeg (im Schalter) →
// Fan-out an beliebig viele Browser. Browser rendert das nativ im <img>.
function createStreamRouter(switches) {
const router = express.Router();
router.get('/:id', (req, res) => {
const sw = switches[req.params.id];
if (!sw) return res.status(404).end();
res.writeHead(200, {
'Content-Type': 'multipart/x-mixed-replace; boundary=frame',
'Cache-Control': 'no-cache, no-store, must-revalidate',
'Pragma': 'no-cache',
'Connection': 'close',
'X-Camera-Id': req.params.id,
});
let closed = false;
const cleanup = () => { if (!closed) { closed = true; sw.removeListener('frame', onFrame); } };
const onFrame = (buf) => {
if (closed) return;
// Backpressure: langsamer Client bremst die anderen nicht Frames droppen
if (res.writableLength > (1 << 20)) return;
// try/catch: ein kaputter Client darf die anderen nicht aushungern
// (ein werfender 'frame'-Listener würde sonst emit() abbrechen)
try {
res.write(`--frame\r\nContent-Type: image/jpeg\r\nContent-Length: ${buf.length}\r\n\r\n`);
res.write(buf);
res.write('\r\n');
} catch (_e) {
cleanup();
}
};
sw.on('frame', onFrame);
if (sw.latest) onFrame(sw.latest); // sofort erstes Bild
req.on('close', cleanup);
res.on('error', cleanup);
});
return router;
}
module.exports = { createSnapshotRouter, createStreamRouter };