parts / meta-factory
MetaFactory — reusable patterns
A two-repo content-ingestion engine: a heavy local pipeline (collector, organizer, quality gate, classifier) emits durable canonical-output artifacts per source; a thin cloud API host serves them over REST + MCP. Patterns extract the load-bearing seams — the pipeline-vs-host split, the checkpoint-resumable staged pipeline, the SHA-256 mirror with restore-verify, multi-source reconciliation with quality-flag gates, and the vendored consumer-contract pattern with explicit semver ratification.
Meta Factory — Reusable Engineering Patterns
Production-validated patterns from the Meta Factory codebase, stripped of business context, written to be dropped into any new system.
Each pattern is copy-paste-ready TypeScript. Domain-specific terminology has been removed. Citations at the bottom of each entry point to the concrete implementation; sources are split across two repos — the active host meta-factory-prod (paths shown bare) and the heavy ingestion engine in the legacy meta-factory repo (cited as OLD-legacy: prefix). See P03 for the rationale behind the split.
Pattern Index
| # | Pattern Name | Core Technology | Problem Solved |
|---|---|---|---|
| P01 | Checkpoint-Resumable Staged Pipeline | Node fs + per-step JSON state file | Run a long, multi-step LLM extraction that can crash on step 7 and resume without re-running steps 1–6 |
| P02 | Canonical-Output as Durable Stage Boundary | Filesystem tree + per-stage JSON | Decouple expensive producer pipelines from cheap consumer reads so consumers can ship before producers stabilize |
| P03 | Pipeline-vs-Host Two-Repo Split | Filesystem + asymmetric env wiring | Run heavy, machine-bound ingestion locally while shipping a thin cloud API host that reads its outputs |
| P04 | Cryptographic Content Mirror with Restore-Verify | SHA-256 + Supabase Storage + JSON manifest | Treat extracted artifacts as deletable cache that can be byte-identically restored from the cloud |
| P05 | Three-Tier Resource Resolver with Bundled Snapshot Fallback | env-driven precedence + bundled JSON | Serve the same API in local dev (live FS), cloud (bundled snapshot), and uninitialized (empty fallback) without code branches |
| P06 | Asset Registry with Deterministic Domain-Typed IDs | TypeScript types + path resolution table | End scattered path-guessing across a polyglot ingestion tree by giving every artifact one globally unique addressable id |
| P07 | Multi-Source Reconciler with Quality-Flag Gates | Pure reducer + validator chain | Merge per-item state from N backends into one view, with explicit quality gates that block known failure modes |
| P08 | Multi-Backend Mount Status with Last-Seen Cache | fs.existsSync probe + per-backend cache | Tolerate intermittent storage (external drives, cloud mounts) without losing the previous-session content listing |
| P09 | Resilient Extraction Wrapper with Error-Class-Aware Retry | Try/catch loop + quota-error fast-pause | Wrap third-party API calls with retry logic that distinguishes transient failures (back off) from quota exhaustion (stop) |
| P10 | Canonical Anchor Dereference with Result-Typed Errors | String-encoded path + discriminated union | Address a single leaf inside a nested JSON document with one opaque string, fail cleanly on missing/malformed addresses |
| P11 | REST + MCP Co-Located Shape Helpers | Shared helper module + density flag | Serve byte-identical payloads from REST and MCP surfaces without copy-paste drift |
| P12 | Vendored Consumer Contract with Explicit Semver Ratification | Plain copy + version constant + ratification doc | Publish a typed contract that downstream apps vendor (not npm-install), with explicit decisions captured so re-vendor diffs stay tiny |
| P13 | Multi-Candidate Consolidation Pipeline (Score → Consolidate → Fallback) | LLM judge + LLM merger + first-candidate fallback | Turn N parallel best-effort extractions into one consolidated artifact, with a graceful fallback when the merger itself fails |
| P14 | Cache-on-mtime Read-Through Snapshot | fs.statSync + cached parsed value | Serve a frequently-read JSON snapshot cheaply, auto-invalidating when the file regenerates |
Patterns
P01. Checkpoint-Resumable Staged Pipeline
Problem You run a long, multi-step pipeline against an expensive input (a book, a large transcript, a research paper) — each step is an LLM call that takes minutes and costs real money. Step 7 of 11 fails. You want to resume from step 7 on the next invocation, not re-run steps 1–6. You also want the resumption logic to be a property of the pipeline runner, not threaded through every step.
The Pattern
import * as fs from "node:fs";
import * as path from "node:path";
interface StepState {
stepId: string;
status: "pending" | "running" | "completed" | "failed" | "skipped";
startedAt?: string;
completedAt?: string;
error?: string;
outputFile?: string;
}
interface PipelineState {
runId: string;
startedAt: string;
lastUpdated: string;
steps: StepState[];
completed: boolean;
}
const statePath = (runDir: string) => path.join(runDir, "state.json");
function loadState(runDir: string): PipelineState | null {
const p = statePath(runDir);
if (!fs.existsSync(p)) return null;
try {
return JSON.parse(fs.readFileSync(p, "utf-8")) as PipelineState;
} catch {
return null;
}
}
function saveState(runDir: string, state: PipelineState): void {
state.lastUpdated = new Date().toISOString();
fs.writeFileSync(statePath(runDir), JSON.stringify(state, null, 2));
}
function isStepCompleted(state: PipelineState, stepId: string): boolean {
const step = state.steps.find((s) => s.stepId === stepId);
// Completion requires THREE things: status, output file path, and file present on disk.
// Status alone lies after manual file deletion.
return !!step && step.status === "completed" && !!step.outputFile && fs.existsSync(step.outputFile);
}
export interface PipelineStep {
id: string;
run: (ctx: Record<string, unknown>) => Promise<{ output: string; outputFile: string }>;
outputKey: string;
}
export async function runPipeline(
runDir: string,
steps: PipelineStep[],
opts: { resume?: boolean } = {}
) {
fs.mkdirSync(runDir, { recursive: true });
let state = opts.resume ? loadState(runDir) : null;
if (!state) {
state = {
runId: `run-${Date.now()}`,
startedAt: new Date().toISOString(),
lastUpdated: new Date().toISOString(),
steps: steps.map((s) => ({ stepId: s.id, status: "pending" })),
completed: false,
};
}
const context: Record<string, unknown> = {};
for (const step of steps) {
// Resume: skip if previously completed AND output file still on disk.
if (opts.resume && isStepCompleted(state, step.id)) {
const cached = fs.readFileSync(state.steps.find((s) => s.stepId === step.id)!.outputFile!, "utf-8");
context[step.outputKey] = cached;
continue;
}
const s = state.steps.find((x) => x.stepId === step.id)!;
s.status = "running";
s.startedAt = new Date().toISOString();
saveState(runDir, state);
try {
const result = await step.run(context);
context[step.outputKey] = result.output;
s.status = "completed";
s.completedAt = new Date().toISOString();
s.outputFile = result.outputFile;
s.error = undefined;
} catch (err) {
s.status = "failed";
s.error = (err as Error).message;
saveState(runDir, state);
throw err; // Next resume picks up here.
}
saveState(runDir, state);
}
state.completed = true;
saveState(runDir, state);
return context;
}
Design decisions
- State.json next to outputs. Per-run state lives in
runs/<inputId>/<runId>/state.jsonalongside the actual step output files. One directory carries everything needed to resume; no separate state-database to keep in sync. isStepCompletedtriple-checks: status + output path + file-on-disk. Trusting the status flag alone is a hazard — a manualrmof an output file leaves the status sayingcompletedbut the file gone. Three checks make resumption robust against half-clean states.- Failures throw and persist; success persists then continues. On crash, the failed step's state is on disk; the unfinished steps stay
pending. Resumption is "rerun the first non-completed step." - Context is built from cached outputs on resume. The runner reads the cached output files back into the same context map that the step would have populated. Steps after the resume point can't tell they're in a resumed run.
- One run directory per invocation. A new invocation gets a new
runId; resumption is explicit ({ resume: true, runId }). Avoids the ambiguity of "which prior partial run am I picking up?"
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Recovering from a step-7 failure costs zero re-runs of steps 1–6 | One state file per run; long-tail runs proliferate on disk |
| Manual mid-pipeline editing is supported — delete an output file to force that step to re-run | Steps must be pure-functions of the context they're given; hidden state breaks resumption |
| New steps drop into the pipeline with zero schema migration | State schema changes require a one-time state.json patch or full re-run |
Trivial to introspect — cat state.json tells you exactly where the pipeline is | Concurrent runs against the same runDir will corrupt the state file (no lock) |
Citations
OLD-legacy: packages/collector/src/state.ts— fullPipelineStateshape + load/save/isCompleted helpers.OLD-legacy: packages/collector/src/pipeline.ts— the runner that checksisStepCompleted, restores context from cached outputs, persists step-by-step.OLD-legacy: packages/research_agent/src/processing-state.ts— same shape applied to the research-paper extraction pipeline (different stages, identical resumption logic).- See also: DevPlane P01 (Load–Modify–Save State File) for the simpler one-shot variant; this pattern adds the resumption discipline on top.
P02. Canonical-Output as Durable Stage Boundary
Problem You have an expensive producer pipeline (slow, model-driven, hardware-bound) and a fast consumer surface (a REST API, a web UI, an MCP server) that wants to query the producer's output. If the consumer reads the producer's in-memory representation, the two are coupled — the API can't ship until the pipeline stabilizes, and any pipeline restart blocks reads. You want the producer to emit a durable artifact the consumer reads from disk, decoupling them at a stable contract.
The Pattern
import * as fs from "node:fs";
import * as path from "node:path";
// The contract — what the consumer agrees to read.
export interface CanonicalDocument {
source_id: string;
setup?: Record<string, unknown>;
structure?: { chapters?: Array<{ id?: string; [k: string]: unknown }> };
provenance: {
pipeline_version: string;
generated_at: string;
};
}
// Producer-side: atomic write so a half-finished producer can't be read mid-flight.
export function emitCanonical(rootDir: string, sourceId: string, doc: CanonicalDocument): void {
const dir = path.join(rootDir, "canonical_outputs", sourceId);
fs.mkdirSync(dir, { recursive: true });
const finalPath = path.join(dir, "summary.json");
const tmpPath = finalPath + ".tmp";
fs.writeFileSync(tmpPath, JSON.stringify(doc, null, 2));
fs.renameSync(tmpPath, finalPath); // atomic on POSIX
}
// Consumer-side: read with explicit "missing" envelope, never throw on missing source.
export type ReadResult =
| { status: "ok"; document: CanonicalDocument }
| { status: "missing"; source_id: string }
| { status: "malformed"; source_id: string; error: string };
export function readCanonical(rootDir: string, sourceId: string): ReadResult {
const finalPath = path.join(rootDir, "canonical_outputs", sourceId, "summary.json");
if (!fs.existsSync(finalPath)) return { status: "missing", source_id: sourceId };
try {
const raw = fs.readFileSync(finalPath, "utf-8");
const doc = JSON.parse(raw) as CanonicalDocument;
return { status: "ok", document: doc };
} catch (err) {
return { status: "malformed", source_id: sourceId, error: (err as Error).message };
}
}
// Per-document side artifacts (chunks, classifications, processing state) live as
// SIBLING files inside the same per-source directory:
// canonical_outputs/<sourceId>/summary.json (main artifact)
// canonical_outputs/<sourceId>/chunks.json (extracted leaf units)
// canonical_outputs/<sourceId>/classification.json (taxonomy tags)
// canonical_outputs/<sourceId>/processing-state.json (pipeline step progress)
// canonical_outputs/<sourceId>/chapters/<chId>/... (per-subunit artifacts)
//
// The producer can update one sibling without touching the others; consumers
// who only want chunks can skip the summary parse cost entirely.
Design decisions
- Atomic write via temp + rename. The producer is slow; partial writes are the failure mode that bites hardest. POSIX
renamegives all-or-nothing semantics; no consumer ever sees a half-written summary. - One directory per source, sibling files per concern. The directory is the unit of identity; files inside are independently regenerable. A
re-extract chunks onlyinvocation overwriteschunks.jsonand leaves the rest alone. - Read returns a discriminated envelope. "Missing" is a normal state during pipeline bring-up; "malformed" is real corruption. Consumers pattern-match instead of try/catching.
- Provenance baked into the artifact.
pipeline_versionandgenerated_attravel with the document. Consumers can detect "this was extracted with an old pipeline" without a separate index. - No write API for consumers. The contract is read-only by construction — the consumer surface never has the option to mutate, which means the producer is the single writer and the cache-coherence question never arises.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Producer and consumer can deploy independently | Eventual-consistency window between pipeline finish and consumer-visible update |
| Reads are cheap (one file read per source) | Storage cost scales linearly with sources (no compression) |
| Pipeline restart doesn't block reads of already-emitted artifacts | Schema migrations require re-running the pipeline on every source |
| Trivially backupable (the directory IS the data) | No cross-source queries without an in-memory index layer (see P11) |
Citations
OLD-legacy: packages/collector/src/pipeline.ts— producer emitscanonical_outputs/<bookId>/summary.json+ per-chapter siblings underchapters/ch<n>/.lib/v1/passages-search-helpers.ts— consumer loadscanonical_outputs/<bookId>/chunks.json+ per-chapter chunks into an in-memory search index without ever touching the producer.lib/v1/content-storage.ts— same directory layout mirrored to Supabase Storage so the cloud-deployed host can read the canonical artifacts even when the producer machine is offline.
P03. Pipeline-vs-Host Two-Repo Split
Problem You're running an expensive ingestion pipeline (LLM-heavy, hours per source, dependent on local hardware and large external storage) and a thin cloud API host that serves the pipeline's outputs to consumers. Co-locating them in one repo makes the cloud deploy carry hundreds of MB of pipeline machinery it'll never run; co-locating also tempts the pipeline to import "convenience" helpers from the host that quietly become load-bearing and break the cloud build. You want the two responsibilities in two repos with a deliberate one-way data dependency.
The Pattern
┌──────────────────────────────────────────────────────────────────────┐
│ Repo A: <product>-pipeline (heavy, local) │
│ │
│ packages/collector/ ─┐ │
│ packages/organizer/ │ staged producer (P01) │
│ packages/quality-gate/ │ │
│ packages/classifier/ ─┘ │
│ │ │
│ ▼ │
│ canonical_outputs/<sourceId>/{summary.json,chunks.json,...} │
│ asset-registry.json ◄── single source of truth for identity (P06)│
│ │
│ [Independently invoked: `npm run pipeline -- <sourceId>`] │
└──────────────────────────────────────────────────────────────────────┘
│
│ A produces; B reads. Never the reverse.
▼
┌──────────────────────────────────────────────────────────────────────┐
│ Repo B: <product>-host (thin, cloud) │
│ │
│ app/api/v1/.../route.ts ◄── REST surface │
│ app/mcp/... ◄── MCP surface │
│ lib/v1/registry.ts ◄── resolution order: │
│ 1. live `<A>/asset-registry.json` │
│ 2. bundled snapshot under lib/v1/ │
│ 3. empty (404 to consumers) │
│ lib/content-state/mirror.ts ◄── cloud mirror of A's outputs │
│ (P04 — SHA-256 + restore-verify) │
│ │
│ [Deployed to Vercel; reads via env var META_FACTORY_ROOT in dev, │
│ bundled snapshot + cloud bucket in prod.] │
└──────────────────────────────────────────────────────────────────────┘
Design decisions
- The data dependency points one way. The host reads the pipeline's outputs; the pipeline never reads from the host. If a host-side feature needs new producer logic, it lands in the pipeline repo, the pipeline re-runs, the outputs migrate via P04's mirror, the host re-reads. No circular "host calls back into pipeline" anti-pattern.
- Identity lives with the pipeline. The
asset-registry.jsonthat names every artifact is in the pipeline repo; the host snapshots it (P05). This avoids a class of bugs where the host invents an id the pipeline didn't produce. - The host deploys without the pipeline. Cloud builds don't pull the pipeline's
node_modules(hundreds of dependencies, image-processing libs, LLM SDK retry shims, etc.). Build times are minutes, not hours. - Each repo has its own README, CI, deployment story. No "monorepo with packages but you still need to remember which scripts only work in which subdirectory."
- The host can serve an empty registry gracefully. If the producer hasn't run yet (or its machine is offline), the host returns 404s for source lookups but stays up. Health checks pass; the API surface doesn't crash.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Cloud deploy stays small + fast — pipeline deps don't ship | Two repos to update when the contract changes |
| Pipeline can churn on local hardware without redeploying the host | Drift risk: pipeline emits a new field, host doesn't read it yet |
| One-way data flow rules out a class of circular bugs | "Where does code go?" requires a judgment call until the split is internalized |
Each repo's package.json reflects only what it actually runs | Bootstrapping a new sister-host means recreating the resolution-order glue from scratch |
Citations
lib/v1/env.ts— theMETA_FACTORY_ROOTenv var that lets the host point at any pipeline checkout in dev. In prod the var is unset and the bundled snapshot serves.lib/v1/registry.ts— three-tier resolver: live filesystem → bundled snapshot → empty (see P05 for the generalized pattern).lib/content-state/mirror.ts— the one-way mirror that pushes pipeline outputs to cloud storage so the host can serve them when the pipeline machine is offline.docs/decisions/MF-DEC-1.md+docs/decisions/MF-DEC-19.md— the explicit decisions that locked this topology.OLD-legacy: packages/(collector/organizer/quality-gate/classifier/research_agent/deep_research_agent) — the entire pipeline tree that the host never imports.
P04. Cryptographic Content Mirror with Restore-Verify
Problem
Your producer pipeline emits artifacts to a local filesystem (a per-source directory under canonical_outputs/). The disk is your machine — it can die, get reformatted, run out of space. You want every artifact byte-identically restorable from cloud storage, and you want a hash-verified manifest so a "restore" operation can prove byte-for-byte fidelity, not just "we got some bytes back."
The Pattern
import * as fs from "node:fs";
import * as path from "node:path";
import * as crypto from "node:crypto";
export interface FileManifest {
relpath: string; // relative to <sourceDir>
hash: string; // sha256 hex (full)
size_bytes: number;
mtime: string;
}
export interface MirrorManifest {
version: "1.0";
source_id: string;
generated_at: string;
bucket: string;
cloud_prefix: string;
files: FileManifest[];
cloud_synced: boolean;
}
function hashFile(filePath: string): { hash: string; size: number; mtime: Date } {
const data = fs.readFileSync(filePath);
const hash = crypto.createHash("sha256").update(data).digest("hex");
const stat = fs.statSync(filePath);
return { hash, size: stat.size, mtime: stat.mtime };
}
function walkRel(rootDir: string, sub = ""): string[] {
const out: string[] = [];
const dir = sub ? path.join(rootDir, sub) : rootDir;
for (const entry of fs.readdirSync(dir, { withFileTypes: true })) {
if (entry.name.startsWith(".") && entry.name !== ".mirror-manifest.json") continue;
if (entry.name.startsWith("._")) continue; // macOS junk
const rel = sub ? path.join(sub, entry.name) : entry.name;
const abs = path.join(rootDir, rel);
if (entry.isDirectory()) out.push(...walkRel(rootDir, rel));
else if (entry.isFile() && entry.name !== ".mirror-manifest.json") out.push(rel);
}
return out;
}
export function buildManifest(sourceDir: string, sourceId: string, bucket: string): MirrorManifest {
const files: FileManifest[] = [];
for (const rel of walkRel(sourceDir)) {
const abs = path.join(sourceDir, rel);
const h = hashFile(abs);
files.push({ relpath: rel, hash: h.hash, size_bytes: h.size, mtime: h.mtime.toISOString() });
}
return {
version: "1.0",
source_id: sourceId,
generated_at: new Date().toISOString(),
bucket,
cloud_prefix: `archive/${sourceId}`,
files,
cloud_synced: false,
};
}
// Mirror = upload each file + write manifest. Set cloud_synced once all uploads succeed.
export async function mirror(sourceDir: string, sourceId: string, bucket: string,
upload: (relpath: string, abs: string, target: string) => Promise<void>) {
const manifest = buildManifest(sourceDir, sourceId, bucket);
for (const file of manifest.files) {
const abs = path.join(sourceDir, file.relpath);
const target = `${manifest.cloud_prefix}/${file.relpath}`;
await upload(file.relpath, abs, target);
}
manifest.cloud_synced = true;
fs.writeFileSync(path.join(sourceDir, ".mirror-manifest.json"), JSON.stringify(manifest, null, 2));
return manifest;
}
// Restore = pull each file from cloud + verify hash matches manifest.
export async function restore(sourceDir: string, manifest: MirrorManifest,
download: (target: string) => Promise<Buffer>): Promise<{ ok: number; mismatched: string[] }> {
fs.mkdirSync(sourceDir, { recursive: true });
const mismatched: string[] = [];
let ok = 0;
for (const file of manifest.files) {
const target = `${manifest.cloud_prefix}/${file.relpath}`;
const data = await download(target);
const actualHash = crypto.createHash("sha256").update(data).digest("hex");
if (actualHash !== file.hash) {
mismatched.push(`${file.relpath}: manifest=${file.hash.slice(0, 8)}… actual=${actualHash.slice(0, 8)}…`);
continue;
}
const abs = path.join(sourceDir, file.relpath);
fs.mkdirSync(path.dirname(abs), { recursive: true });
fs.writeFileSync(abs, data);
ok++;
}
return { ok, mismatched };
}
Design decisions
- SHA-256 per file, full hex. Truncated hashes save bytes in the manifest at the cost of "two truncations collided" forensic ambiguity. The manifest is small; full hex pays for itself the first time you need to investigate a mismatch.
- Manifest lives WITH the files (
.mirror-manifest.jsonnext to the artifacts). After restore, the manifest is self-describing — anyone reading the restored directory can immediately verify integrity without external state. cloud_syncedflag set last. The flag flips totrueonly after every file uploads successfully. A partial upload leaves the flagfalse, surfaced honestly to operators.- Restore is verify-first, not write-first. A hash mismatch never writes the file to disk — the restored directory is either correct or absent for that path, never a mix of "verified bytes" and "unverified bytes."
- macOS junk filtered out (
._*files,.DS_Store). These leak into directory walks on Mac dev machines and would otherwise pollute every manifest with churn that has no semantic meaning.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Disk loss is recoverable; integrity is provable byte-for-byte | Full file rewrites on every upload (no delta) |
| Manifest is self-describing — restore needs no external state | Hash compute cost scales linearly with corpus size |
Discriminated-result restore (ok count + mismatched list) surfaces partial failure | Doesn't dedupe across sources (same chunk in two manifests = two cloud copies) |
| Works against any object store with put/get (S3, Supabase Storage, R2, GCS) | No GC of old manifest versions in the cloud — cleanup is operator's job |
Citations
lib/content-state/mirror.ts— fullMirrorManifestshape +hashFile+buildManifest+mirrorBook+restoreBookagainst Supabase Storage. The "working = delete locally, restore, hashes match byte-for-byte" invariant is documented at the top.scripts/restore-canonical.ts— CLI restore entry point that printsmismatchedlist when hashes diverge.
P05. Three-Tier Resource Resolver with Bundled Snapshot Fallback
Problem Your application reads a chunk of structured data (a registry, a config, a snapshot) that has three legitimate sources depending on environment: a live filesystem file in local dev (so dev edits show up immediately), a bundled snapshot baked into the build for cloud deploys (where the live file isn't available), and an empty fallback so the app stays up if both are missing (uninitialized state isn't a crash). You want one resolver and three explicit branches, with the chosen branch announced in logs so debugging "why is my dev pulling from prod?" is one log line away.
The Pattern
import * as fs from "node:fs";
import * as path from "node:path";
export interface RegistryData {
version: "1.0";
last_updated: string;
assets: Record<string, unknown>;
}
const SNAPSHOT_PATH = path.join(process.cwd(), "lib", "data", "registry.snapshot.json");
const SNAPSHOT_VIRTUAL_ROOT = "<bundled-snapshot>";
function getLiveRoot(): string | null {
const raw = process.env.RESOURCE_ROOT?.trim();
return raw && raw.length > 0 ? path.resolve(raw) : null;
}
function tryParseRegistry(raw: string): RegistryData | null {
try {
const data = JSON.parse(raw) as RegistryData;
if (!data.assets || typeof data.assets !== "object") return null;
return data;
} catch {
return null;
}
}
function emptyRegistryData(): RegistryData {
return { version: "1.0", last_updated: new Date().toISOString(), assets: {} };
}
export function loadRegistry(): { root: string; data: RegistryData; source: "filesystem" | "bundled-snapshot" | "empty" } {
const root = getLiveRoot();
// Tier 1 — Live filesystem (preferred in local dev).
if (root) {
const livePath = path.join(root, "registry.json");
if (fs.existsSync(livePath)) {
try {
const parsed = tryParseRegistry(fs.readFileSync(livePath, "utf-8"));
if (parsed) {
console.log(`[resolver] using filesystem registry at ${livePath}`);
return { root, data: parsed, source: "filesystem" };
}
} catch {
// fall through to snapshot
}
}
}
// Tier 2 — Bundled snapshot (preferred in cloud deploys).
if (fs.existsSync(SNAPSHOT_PATH)) {
try {
const parsed = tryParseRegistry(fs.readFileSync(SNAPSHOT_PATH, "utf-8"));
if (parsed) {
console.log(`[resolver] using bundled snapshot (${SNAPSHOT_VIRTUAL_ROOT})`);
return { root: SNAPSHOT_VIRTUAL_ROOT, data: parsed, source: "bundled-snapshot" };
}
} catch {
// fall through to empty
}
}
// Tier 3 — Empty (never crashes; consumers get 404s).
console.warn(`[resolver] no live file at ${root ?? "<unset>"} and no bundled snapshot; serving empty`);
return { root: SNAPSHOT_VIRTUAL_ROOT, data: emptyRegistryData(), source: "empty" };
}
Design decisions
- Three tiers, explicit order. Each tier covers a real deployment posture: live filesystem (dev), bundled snapshot (cloud), empty (uninitialized). The order is encoded in code, not config — reading the function tells you everything.
- Live filesystem wins when present. Dev iterations don't require re-running a snapshot script; edit the live file, refresh, see the change.
- Bundled snapshot is a build artifact, not a runtime fetch. The snapshot is generated by a script (
npm run snapshot) before deploy and committed to the repo. Cloud build picks it up automatically; no extra deploy plumbing. - Empty fallback returns a real object, not null. Consumers don't need to null-check the registry; they just see an empty
assetsmap and serve 404s for lookups. The API stays up. - Every branch announces itself in logs. Without this, "why is the cloud serving stale data?" debugging is brutal. With it, you grep one line.
- Loaded source travels with the data. The caller can distinguish "live data" from "bundled snapshot data" at runtime — useful for cache-control headers, debug surfaces, or refusing to mutate when the source is read-only-by-policy (snapshot).
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Same code serves dev, cloud, and uninitialized states | Three branches means three things to test (in practice the empty branch is often under-tested) |
| Dev iteration doesn't require regenerating the snapshot | Snapshot can drift from live if the refresh script isn't part of the deploy pipeline |
| App stays up when both sources are missing | The console.warn for empty is the only signal — easy to miss without log monitoring |
| Source-of-data attribution is preserved in the return value | Consumers can write code that branches on source, defeating the abstraction |
Citations
lib/v1/registry.ts—loadRegistry()with the three-tier resolution againstMETA_FACTORY_ROOTenv, bundled snapshot atlib/v1/data/asset-registry.snapshot.json, empty fallback. Source is reported as"filesystem" | "bundled-snapshot" | "empty".scripts/snapshot-registry.ts— the build-time script that regenerates the bundled snapshot from the current live registry. Run before deploy.lib/content-state/snapshot.ts— same pattern for the larger content-state snapshot served to integrity dashboards.
P06. Asset Registry with Deterministic Domain-Typed IDs
Problem
Your ingestion tree has dozens of artifact types across multiple domains — source PDFs, extracted text, metadata blobs, derived structured outputs, models. Without a discipline, every consumer constructs its own path to find anything: path.join(root, "books", "text", ${id}.txt) here, path.join(root, "research", "raw", id, "metadata.json") there. The result is fragile string-building scattered across the codebase. You want one globally-unique addressable id per artifact and one place to resolve any id to an absolute path.
The Pattern
import * as fs from "node:fs";
import * as path from "node:path";
export type AssetDomain = "books" | "research" | "compensation" | "onet" | "bls";
export type AssetType =
| "source_pdf" | "extracted_text" | "metadata" | "construct" | "model" | "registry_data";
export type StorageTier = "hot" | "warm" | "cold";
// Convention: <domain>:<asset_type>:<source_id> e.g. books:extracted_text:my_book_id
export interface Asset {
asset_id: string;
domain: AssetDomain;
asset_type: AssetType;
source_id: string;
display_name: string;
storage_tier: StorageTier;
storage_path: string; // relative to repo root
registered_at: string;
parent_id?: string;
derived_from?: string[];
content_hash?: string;
status: "registered" | "text_extracted" | "metadata_extracted" | "fully_processed";
}
export interface RegistryData {
version: "1.0";
last_updated: string;
assets: Record<string, Asset>;
}
export class AssetRegistry {
private data: RegistryData;
constructor(private readonly path: string) {
this.data = fs.existsSync(path)
? JSON.parse(fs.readFileSync(path, "utf-8"))
: { version: "1.0", last_updated: new Date().toISOString(), assets: {} };
}
static buildId(domain: string, assetType: string, sourceId: string): string {
return `${domain}:${assetType}:${sourceId}`;
}
get(assetId: string): Asset | undefined {
return this.data.assets[assetId];
}
has(assetId: string): boolean {
return assetId in this.data.assets;
}
resolveAbsolutePath(asset: Asset, repoRoot: string): string {
return path.resolve(repoRoot, asset.storage_path);
}
register(asset: Omit<Asset, "asset_id" | "registered_at">): Asset {
const asset_id = AssetRegistry.buildId(asset.domain, asset.asset_type, asset.source_id);
const full: Asset = { ...asset, asset_id, registered_at: new Date().toISOString() };
this.data.assets[asset_id] = full;
this.flush();
return full;
}
private flush(): void {
this.data.last_updated = new Date().toISOString();
const tmp = this.path + ".tmp";
fs.writeFileSync(tmp, JSON.stringify(this.data, null, 2));
fs.renameSync(tmp, this.path);
}
}
// Consumers depend on this — never on raw path joins.
export class StorageResolver {
constructor(private readonly registry: AssetRegistry, private readonly repoRoot: string) {}
resolve(assetId: string): { asset: Asset; absolutePath: string; exists: boolean } | null {
const asset = this.registry.get(assetId);
if (!asset) return null;
const absolutePath = this.registry.resolveAbsolutePath(asset, this.repoRoot);
return { asset, absolutePath, exists: fs.existsSync(absolutePath) };
}
readText(assetId: string): string | null {
const r = this.resolve(assetId);
return r && r.exists ? fs.readFileSync(r.absolutePath, "utf-8") : null;
}
readJSON<T>(assetId: string): T | null {
const text = this.readText(assetId);
return text ? (JSON.parse(text) as T) : null;
}
}
Design decisions
<domain>:<type>:<source_id>is the entire ID grammar. Three colon-separated segments. Globally unique, parse-trivial, sort-stable, grep-friendly. The id IS the natural-language description of the artifact.- Storage path is data, not code. The registry stores
storage_pathas a relative string; consumers never compute it. Moving an artifact fromhottocoldstorage is a registry edit, not a code change. - The resolver is the only place that joins paths. Every consumer asks the resolver "give me the absolute path for this id" — there's exactly one path-construction routine in the codebase. When the storage layout changes, you change one function.
- Status enum tracks lifecycle.
registered → text_extracted → metadata_extracted → fully_processedis the canonical lifecycle; consumers can filter "give me everything that's reachedfully_processed" without inspecting individual artifacts. - Atomic write on flush. The registry is the load-bearing identity map; a half-written
asset-registry.jsonwould brick the entire system. Temp + rename gives all-or-nothing semantics.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| One id grammar across all domains | Registry file grows linearly with artifacts (10k+ entries is still fine, 1M is not) |
| Path construction lives in one file | Adding a new domain or asset type requires a TypeScript type bump |
| Status enum gates downstream consumers cleanly | No multi-writer support — pipeline must serialize registry writes |
Self-describing: cat asset-registry.json reveals the whole inventory | Lookup is O(1) but full-table scans (find all domain=books) are O(n) — needs an index for large stores |
Citations
OLD-legacy: packages/core/src/asset-registry.ts— fullAssetRegistryclass with id-construction helper, persistence, status enum, derived-from links.OLD-legacy: packages/core/src/storage-resolver.ts—StorageResolverwithreadText/readJSON/ per-domain ergonomics so consumers never callfs.readFileSyncagainst raw paths.lib/v1/types.ts— the host-sideV1RegistryAssetmirror of the pipeline-sideAssetshape (necessary because the host doesn't import from the pipeline; see P03).- See also: Principia P04 (Deterministic Idempotent IDs from Content Fingerprints) for the related-but-distinct pattern of deriving ids from content rather than from declaration.
P07. Multi-Source Reconciler with Quality-Flag Gates
Problem Your view of "what content exists" is assembled from multiple backends — a primary asset registry, a snapshot bundle, on-disk pipeline outputs, a cloud storage listing. Each backend knows part of the picture; none knows the whole. You want one merged view per item, with explicit gates that block known failure modes (mislabeled files, format mismatches, corrupted extractions) from masquerading as healthy state.
The Pattern
type Backend = "registry" | "snapshot" | "filesystem" | "cloud";
type MountStatus = "online" | "offline" | "unknown";
export interface SourceLocation {
backend: Backend;
path: string;
mount_status: MountStatus;
last_seen_at?: string;
}
export interface ExtractionState {
status: "unprocessed" | "running" | "done" | "failed";
populated_count?: number;
generated_at?: string;
}
export interface QualityFlag {
validator: string;
severity: "block" | "warn";
message: string;
evidence?: Record<string, unknown>;
}
export interface ContentItem {
id: string;
source_locations: SourceLocation[];
extraction?: ExtractionState;
quality_flags: QualityFlag[];
state: "complete" | "in_progress" | "failed" | "unprocessed" | "source_missing" | "source_known_offline" | "quality_flagged";
}
// Quality validators — each runs against a partially-merged item and adds flags.
type Validator = (item: Omit<ContentItem, "state">) => QualityFlag[];
const validators: Validator[] = [
// Example: detect when the on-disk text contradicts the declared title.
(item) => {
const flags: QualityFlag[] = [];
const txt = item.source_locations.find((l) => l.path.endsWith(".txt") && l.mount_status === "online");
if (!txt) return flags;
// (read first 2KB; compare detected title to item.id-derived expectation; flag if disagreement)
return flags;
},
];
// Quality-flag-first state derivation: any block-severity flag wins, period.
function deriveState(item: Omit<ContentItem, "state">): ContentItem["state"] {
if (item.quality_flags.some((f) => f.severity === "block")) return "quality_flagged";
const hasOnline = item.source_locations.some((l) => l.mount_status === "online" && l.last_seen_at);
const hasOffline = item.source_locations.some((l) => l.mount_status === "offline");
if (!hasOnline && !hasOffline) return "source_missing";
if (!hasOnline && hasOffline) return "source_known_offline";
const ext = item.extraction;
if (ext?.status === "running") return "in_progress";
if (ext?.status === "failed") return "failed";
if (ext?.status === "done" && (ext.populated_count ?? 0) > 0) return "complete";
return "unprocessed";
}
// Reconciler — pure function over backend snapshots.
export function reconcile(
registry: Map<string, { id: string; locations: SourceLocation[] }>,
filesystem: Map<string, { id: string; extraction: ExtractionState; locations: SourceLocation[] }>,
): ContentItem[] {
const idsAll = new Set([...registry.keys(), ...filesystem.keys()]);
const out: ContentItem[] = [];
for (const id of idsAll) {
const r = registry.get(id);
const f = filesystem.get(id);
const merged: Omit<ContentItem, "state"> = {
id,
source_locations: [...(r?.locations ?? []), ...(f?.locations ?? [])],
extraction: f?.extraction,
quality_flags: [],
};
for (const v of validators) merged.quality_flags.push(...v(merged));
out.push({ ...merged, state: deriveState(merged) });
}
return out;
}
Design decisions
- Reconciler is a pure function over snapshots. Inputs are immutable maps from each backend; output is the merged view. No backends touched during merge, no side effects — trivial to test.
- Quality flags trump everything. A
block-severity flag overrides any extraction state. "Looks done but the title doesn't match the content" must not display as healthy. - State enum encodes the full reasoning.
source_missing≠source_known_offline≠quality_flagged≠unprocessed. Operators reading a dashboard understand exactly what state each item is in without inspecting the raw data. - Validators are pluggable but called in-line. A new validator is one function added to the list. No async ordering, no priority resolution — quality flags just accumulate.
- Evidence travels with each flag. A flag without evidence is just an opinion; with evidence (matching scores, file paths, first-2KB samples), an operator can audit the call.
- Locations from multiple backends are concatenated, not merged. Two backends reporting the same path produce two entries — preserves provenance over dedup convenience.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Block-severity validators catch real failure modes before they pollute downstream | Validators with high false-positive rates poison the operator's attention |
| State enum is rich enough to drive a kanban-style operator UI directly | Adding a new state requires updating every consumer that pattern-matches on it |
| Pure function makes reconcile trivially testable + cache-friendly | Full re-reconcile on any backend change — no incremental updates |
| Locations from N backends preserved means provenance is auditable | Concatenated locations can grow unbounded for items in many backends |
Citations
lib/content-state/reconcile.ts— fullreconcile()over OLD-registry / library-snapshot / canonical-outputs backends withderiveState()quality-flag-first logic.lib/content-state/validators.ts— the quality validators (title contradiction, format mismatch, run-together HTML detection) wired into reconcile.lib/content-state/backends.ts— per-backend readers that produce theMap<id, ...>snapshots the reconciler consumes.
P08. Multi-Backend Mount Status with Last-Seen Cache
Problem Your content lives across multiple storage backends — external drives that come and go, cloud mounts that lag, local-disk caches. A backend being offline shouldn't make its content invisible; the system should remember "we saw this file on the external drive yesterday, the drive is offline now, but the file is still known to exist." You want per-backend mount-status reporting and a cache that remembers the last-seen listing.
The Pattern
import * as fs from "node:fs";
import * as path from "node:path";
export type Backend = "external-drive" | "local-cache" | "cloud" | "secondary-mount";
export type MountStatus = "online" | "offline" | "unknown";
const ROOTS: Record<Backend, string> = {
"external-drive": "/Volumes/External/data",
"local-cache": "/Users/me/data-cache",
"cloud": "/Volumes/Cloud/data",
"secondary-mount": "/Volumes/Secondary/data",
};
export function detectMountStatus(): Record<Backend, MountStatus> {
const out = {} as Record<Backend, MountStatus>;
for (const [b, root] of Object.entries(ROOTS) as [Backend, string][]) {
out[b] = fs.existsSync(root) ? "online" : "offline";
}
return out;
}
interface BackendCache {
version: "1.0";
last_updated: string;
listings: Partial<Record<Backend, { last_seen_at: string; paths: string[] }>>;
}
const CACHE_PATH = path.resolve(__dirname, "data", "backend-cache.json");
function loadCache(): BackendCache {
try {
return JSON.parse(fs.readFileSync(CACHE_PATH, "utf8"));
} catch {
return { version: "1.0", last_updated: new Date().toISOString(), listings: {} };
}
}
function saveCache(cache: BackendCache): void {
fs.mkdirSync(path.dirname(CACHE_PATH), { recursive: true });
fs.writeFileSync(CACHE_PATH, JSON.stringify(cache, null, 2));
}
// When a backend is online: rescan and update its cache entry.
// When offline: return the last-seen listing so consumers can still reason about content.
export function listBackend(backend: Backend): { listing: string[]; status: MountStatus; last_seen_at: string } {
const status = detectMountStatus()[backend];
const cache = loadCache();
if (status === "online") {
const root = ROOTS[backend];
const listing = walkRel(root);
const now = new Date().toISOString();
cache.listings[backend] = { last_seen_at: now, paths: listing };
cache.last_updated = now;
saveCache(cache);
return { listing, status, last_seen_at: now };
}
// Offline: serve from cache, mark status honestly.
const cached = cache.listings[backend];
return {
listing: cached?.paths ?? [],
status,
last_seen_at: cached?.last_seen_at ?? "never",
};
}
function walkRel(root: string, sub = ""): string[] {
const out: string[] = [];
const dir = sub ? path.join(root, sub) : root;
for (const entry of fs.readdirSync(dir, { withFileTypes: true })) {
if (entry.name.startsWith(".") || entry.name.startsWith("._")) continue;
const rel = sub ? path.join(sub, entry.name) : entry.name;
if (entry.isDirectory()) out.push(...walkRel(root, rel));
else if (entry.isFile()) out.push(rel);
}
return out;
}
Design decisions
fs.existsSyncon the root is the mount probe. No need formountparsing, no platform-specific calls. If the directory resolves, the backend is online.- Cache writes only on online detection. Offline backends never overwrite their cache — the last good listing stays authoritative until the drive comes back.
- Caller gets
{listing, status, last_seen_at}always. Status is part of the response, not a separate query. Consumers see "this listing is from yesterday because the drive is offline" in one read. - "Never seen" returns empty listing with
last_seen_at: "never". Distinguishable from "seen once a long time ago" — useful for first-run UX. - Macros / system junk filtered.
.DS_Store,._*AppleDouble files. These pollute listings with churn that has no semantic meaning. - Cache is one file, one JSON blob. No per-backend file, no schema-per-backend complication. The version field exists for future migration but starts as
"1.0".
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Intermittent mounts don't blank out the catalog | Cache can become stale silently — a deleted file shows in listing until rescan |
| Mount status travels with every listing — no separate query | Walk cost scales with backend size; large drives need pagination or batched walks |
| Cache is portable, git-diffable JSON | No mtime/size tracking — just paths; richer cache needs schema bump |
| One probe, one file, no platform-specific code | fs.existsSync lies in edge cases (broken symlinks, permission errors) |
Citations
lib/content-state/backends.ts— fulldetectMountStatus()+ per-backend cache for Seagate, local-storage, OLD-registry-path, iCloud, Dropbox, cloud-supabase. Cache lives atlib/content-state/data/backend-cache.json.
P09. Resilient Extraction Wrapper with Error-Class-Aware Retry
Problem You wrap a third-party API call (an LLM extractor, a search service, an enrichment provider) that fails for distinct reasons: transient network blips, server-side 5xx, hit-the-rate-limit 429s, and "you've burned your quota" — and the right response is different for each. Naive exponential backoff treats them all the same and either over-retries on permanent failures (wasting budget) or under-retries on transient ones. You want one wrapper that classifies the error and adapts.
The Pattern
type ErrorClass = "transient" | "rate_limit" | "quota_exhausted" | "permanent";
function classify(err: unknown): ErrorClass {
const e = err as { status?: number; code?: string; message?: string };
if (e.status === 429) return "rate_limit";
if (e.status && e.status >= 500) return "transient";
if (e.code === "ETIMEDOUT" || e.code === "ECONNRESET") return "transient";
if (typeof e.message === "string" && /quota|billing|exceeded/i.test(e.message)) {
return "quota_exhausted";
}
return "permanent";
}
const delayMs = (attempt: number, base: number, cap: number) =>
Math.min(cap, base * 2 ** attempt) + Math.floor(Math.random() * 250);
// Process-level quota state — shared across all extractors so one quota signal
// pauses every concurrent extractor, not just the one that saw the error.
let quotaPaused = false;
let consecutiveQuotaErrors = 0;
export interface ExtractionResult<T> { ok: true; value: T } | { ok: false; class: ErrorClass; errors: string[] };
export async function extractWithResilience<T>(
fn: () => Promise<T>,
opts: { maxRetries?: number; baseDelayMs?: number; maxDelayMs?: number } = {}
): Promise<ExtractionResult<T>> {
const maxRetries = opts.maxRetries ?? 10;
const baseDelayMs = opts.baseDelayMs ?? 3000;
const maxDelayMs = opts.maxDelayMs ?? 5 * 60_000;
const errors: string[] = [];
for (let attempt = 0; attempt <= maxRetries; attempt++) {
if (quotaPaused) {
return { ok: false, class: "quota_exhausted", errors: ["paused: prior quota exhaustion"] };
}
try {
const value = await fn();
// Fail-safe 1: null/undefined doesn't count as success.
if (value === null || value === undefined) throw new Error("extraction returned null/undefined");
// Fail-safe 2: empty object doesn't count as success (legitimate empty arrays do).
if (typeof value === "object" && !Array.isArray(value) && Object.keys(value).length === 0) {
throw new Error("extraction returned empty object");
}
consecutiveQuotaErrors = 0;
return { ok: true, value };
} catch (err) {
const cls = classify(err);
const msg = (err as Error).message ?? String(err);
errors.push(`attempt ${attempt + 1} [${cls}]: ${msg}`);
if (cls === "permanent") return { ok: false, class: "permanent", errors };
if (cls === "quota_exhausted") {
consecutiveQuotaErrors++;
if (consecutiveQuotaErrors >= 3) quotaPaused = true; // trip the global pause
return { ok: false, class: "quota_exhausted", errors };
}
// Transient or rate_limit — backoff and retry. Rate-limit gets a longer base.
const base = cls === "rate_limit" ? baseDelayMs * 4 : baseDelayMs;
await new Promise((r) => setTimeout(r, delayMs(attempt, base, maxDelayMs)));
}
}
return { ok: false, class: "transient", errors };
}
Design decisions
- Classify before deciding. One classifier function maps the error to one of four classes; the loop body branches on the class. No regex-spaghetti in the loop body.
- Permanent errors fail fast. A 404 won't fix itself with retries; no point burning budget waiting for it.
- Quota exhaustion is sticky and process-global. One quota signal pauses every concurrent extractor (via a module-level
quotaPausedflag). Otherwise concurrent extractors would each rediscover the quota wall and waste a retry round each. - Three consecutive quota errors trips the global pause. Single quota error might be transient (rate-limit edge case mis-classified); three in a row is the wall. The
consecutiveQuotaErrorscounter resets on first success. - Empty object ≠ success. A common LLM failure mode is returning
{}— semantically empty but technically valid JSON. The fail-safe catches this; empty arrays (legitimately "no items found") still count as success. - Rate-limit gets a longer base backoff (4× the transient base). The server is explicitly telling you to slow down; respect it.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Quota-aware: one extractor's wall pauses all extractors | Process-global quota state doesn't survive restart — pause re-evaluates from zero |
| Permanent failures don't waste retry budget | Misclassification of a transient error as permanent stops the work too early |
| Empty-object guard catches a real LLM failure mode | Adds latency to every successful extraction (the structural check) |
| Class travels with the failure — consumers can adapt | Classifier is heuristic; adding a new provider means adding a new branch |
Citations
OLD-legacy: packages/research_agent/src/extractors/resilient-extraction-wrapper.ts—extractWithResilience()with 10-retry max, quota-error monitor, fail-safes for null/empty extractions.OLD-legacy: packages/research_agent/src/extractors/error-diagnosis.ts—diagnoseError()+calculateRetryDelay()+saveExtractionDiagnostic()for the per-failure diagnostic trail.OLD-legacy: packages/core/src/quota-monitor.ts— process-levelisQuotaError/recordQuotaError/shouldPauseProcessing/resetQuotaErrorCountthat the wrapper consults.OLD-legacy: packages/core/src/retry.ts— lower-level retry-with-jitter utility for direct OpenAI SDK calls (used by the per-step LLM helpers in P01).
P10. Canonical Anchor Dereference with Result-Typed Errors
Problem A consumer wants to reference a single leaf inside a nested document — "chapter 4 of book X, the third takeaway" — using one opaque string token, and have your system resolve it back to the exact value. The token must round-trip (you can hand it out, the consumer hands it back, you get the value). Bad tokens must fail cleanly with a typed error, not throw — the calling code shouldn't need try/catch to handle the routine "this anchor no longer resolves."
The Pattern
export type AnchorSection = "takeaways" | "key_points" | "stories" | "questions";
export interface AnchorPath {
documentId: string;
scope: "root" | { groupId: string };
section: AnchorSection;
itemIndex: number;
}
// Format: <documentId>#<scope>.<section>.<itemIndex>
// Examples:
// how_to_measure_anything#root.takeaways.3
// how_to_measure_anything#ch4.takeaways.0
export function parseAnchor(anchor: string): AnchorPath | null {
const [documentId, frag] = anchor.split("#", 2);
if (!documentId || !frag) return null;
const parts = frag.split(".");
if (parts.length !== 3) return null;
const [scopeStr, section, indexStr] = parts;
if (!isSection(section)) return null;
const itemIndex = Number(indexStr);
if (!Number.isInteger(itemIndex) || itemIndex < 0) return null;
const scope: AnchorPath["scope"] = scopeStr === "root" ? "root" : { groupId: scopeStr };
return { documentId, scope, section: section as AnchorSection, itemIndex };
}
function isSection(s: string): boolean {
return ["takeaways", "key_points", "stories", "questions"].includes(s);
}
interface DocumentShape {
root?: Partial<Record<AnchorSection, string[]>>;
groups?: Array<{ id?: string } & Partial<Record<AnchorSection, string[]>>>;
}
export interface DereferencedBlock {
anchor: string;
documentId: string;
scope: string;
section: AnchorSection;
itemIndex: number;
text: string;
}
export type DereferenceResult =
| { ok: true; block: DereferencedBlock }
| { ok: false; code: "invalid_anchor"; message: string }
| { ok: false; code: "not_found"; message: string };
export function dereferenceAnchor(anchor: string, document: unknown): DereferenceResult {
const parsed = parseAnchor(anchor);
if (!parsed) return { ok: false, code: "invalid_anchor", message: `Anchor '${anchor}' is not in the expected shape.` };
const doc = (document ?? {}) as DocumentShape;
const sectionItems = readSectionItems(parsed, doc);
if (!sectionItems) return { ok: false, code: "not_found", message: `Anchor '${anchor}' did not resolve.` };
if (parsed.itemIndex >= sectionItems.length) {
return { ok: false, code: "not_found", message: `itemIndex ${parsed.itemIndex} out of range; section has ${sectionItems.length} items.` };
}
return {
ok: true,
block: {
anchor,
documentId: parsed.documentId,
scope: parsed.scope === "root" ? "root" : parsed.scope.groupId,
section: parsed.section,
itemIndex: parsed.itemIndex,
text: sectionItems[parsed.itemIndex],
},
};
}
function readSectionItems(p: AnchorPath, doc: DocumentShape): string[] | null {
if (p.scope === "root") {
const items = doc.root?.[p.section];
return Array.isArray(items) ? items : null;
}
const group = doc.groups?.find((g) => g.id === p.scope.groupId);
if (!group) return null;
const items = group[p.section];
return Array.isArray(items) ? items : null;
}
Design decisions
- String-encoded path with explicit grammar.
<documentId>#<scope>.<section>.<itemIndex>is greppable, copy-pasteable, log-friendly. Hash sign separates identity from location. - Two error codes, three result variants.
invalid_anchormeans the token is malformed;not_foundmeans the token is well-formed but the underlying data has moved. Consumers can adapt differently —invalid_anchoris probably a bug;not_foundis probably a stale citation. - Returns a Result envelope, never throws. Dereferencing a missing anchor is a normal state (citations go stale, documents get re-extracted). Throwing would force try/catch at every call site.
- The parser is reversible. Same
AnchorPathalways serializes back to the same anchor string. This is what makes citations stable across re-extractions. - Section is a closed enum.
takeaways | key_points | stories | questions— new sections require an explicit type bump. Prevents accidental support for sections that nothing else in the system understands. - Out-of-range index is
not_found, notinvalid_anchor. The token is structurally valid; the data just doesn't have an item at that position.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Citations round-trip stably across consumers | Document re-extraction can renumber items, breaking citations silently |
| Result envelope rules out an entire class of try/catch noise | Adding a new addressable section is a type change, not a config |
| One opaque token replaces a path-walking API | Tokens leak document-structure assumptions to the consumer |
| Greppable in logs and citation databases | Index-based addressing is fragile; named addressing (e.g. takeaway-ids) is more robust |
Citations
lib/v1/canonical-anchors.ts— fulldereferenceAnchor()withDereferenceResultdiscriminated union, scope-vs-chapter handling, out-of-range detection.lib/v1/annotations-store.ts—parseAnchor/AnchorPath/AnchorSectiondefinitions consumed bycanonical-anchors.ts. Same module also persists annotations keyed by anchor (one annotation per addressable leaf).
P11. REST + MCP Co-Located Shape Helpers
Problem
The same capability is exposed as a REST endpoint AND an MCP tool — a browser hits /api/v1/library/items?kind=book&limit=20, an LLM agent calls library_list_items({kind: "book", limit: 20}). Both must return identical payloads (same field names, same density, same sort). Implementing the shaping logic twice means inevitable drift: the REST handler gets a new field, the MCP handler doesn't, and a year later they're producing different objects from the same underlying data.
The Pattern
// shared shape helpers — both REST and MCP import from here.
import type { Item } from "./types";
export type OutputDensity = "rich" | "summary";
export interface ItemSummary {
id: string;
title: string;
status: string;
updated_at: string;
}
export function projectSummary(item: Item): ItemSummary {
return {
id: item.id,
title: item.title,
status: item.status,
updated_at: item.updated_at,
};
}
export function projectItem(item: Item, density: OutputDensity): Item | ItemSummary {
return density === "summary" ? projectSummary(item) : item;
}
export interface ListFilter { kind?: string; q?: string; }
export interface ListOptions { limit?: number; offset?: number; density?: OutputDensity; }
export interface ListResult { items: Array<Item | ItemSummary>; total: number; limit: number; offset: number; }
export function listItems(all: Item[], filter: ListFilter, options: ListOptions): ListResult {
const filtered = all.filter((i) => matches(i, filter));
filtered.sort((a, b) => (a.updated_at < b.updated_at ? 1 : -1));
const limit = clamp(options.limit ?? 50, 1, 500);
const offset = Math.max(0, options.offset ?? 0);
const density = options.density ?? "rich";
return {
items: filtered.slice(offset, offset + limit).map((i) => projectItem(i, density)),
total: filtered.length,
limit,
offset,
};
}
function matches(item: Item, f: ListFilter): boolean {
if (f.kind && item.kind !== f.kind) return false;
if (f.q && !item.title.toLowerCase().includes(f.q.toLowerCase())) return false;
return true;
}
function clamp(n: number, lo: number, hi: number): number { return Math.max(lo, Math.min(hi, n)); }
// rest-handler.ts — thin parser around the shared shape helper.
import { listItems, type OutputDensity } from "./shape";
export async function GET(req: Request): Promise<Response> {
const url = new URL(req.url);
const filter = { kind: url.searchParams.get("kind") ?? undefined, q: url.searchParams.get("q") ?? undefined };
const options = {
limit: Number(url.searchParams.get("limit")) || undefined,
offset: Number(url.searchParams.get("offset")) || 0,
density: (url.searchParams.get("output_density") as OutputDensity) || "rich",
};
const result = listItems(loadAll(), filter, options);
return Response.json(result);
}
// mcp-handler.ts — thin parser around the same helper.
import { listItems, type OutputDensity } from "./shape";
export const tool = {
name: "list_items",
inputSchema: { /* JSON Schema */ },
handler: async (input: { kind?: string; q?: string; limit?: number; offset?: number; output_density?: OutputDensity }) => {
const result = listItems(loadAll(), { kind: input.kind, q: input.q }, {
limit: input.limit, offset: input.offset, density: input.output_density ?? "rich",
});
return { content: [{ type: "text", text: JSON.stringify(result) }] };
},
};
Design decisions
- Shape helper is the contract. REST and MCP are thin adapters — they parse their transport-specific input, hand it to the shape helper, and return the shape helper's output. Behavior changes happen in one file.
- Density flag travels through the helper, not around it. Both surfaces accept
output_density(REST as a query param, MCP as a tool input field). The helper decides whatrichvssummaryactually contains; the adapters don't. - Filters and options have explicit types.
ListFilterandListOptionsare exported; the test suite tests the helper alone. REST and MCP adapter tests verify only the parsing layer. - Clamping in the helper, not the adapters. Limits, offsets, density defaults all live in the helper. An adapter that parses
limit=99999doesn't need to know the cap — the helper enforces it. - No code generation. The helper is hand-written TypeScript that both surfaces import. Codegen-from-OpenAPI is heavier and brittler for the small set of endpoints; manual co-location is enough discipline.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| REST and MCP drift becomes a TypeScript error, not a runtime divergence | Adapter-specific behavior (e.g., HTTP cache headers) has to be expressed outside the helper |
| Tests target the helper, not the adapters | If REST and MCP eventually need different shapes, the abstraction has to break |
| New surfaces (CLI, gRPC) plug in by writing one new adapter | Helper module must stay serializable — no class returns, no functions in the output |
| Density flag is a clean opt-in, not a separate endpoint | Three densities (rich/summary/state) starts to clutter the projection module |
Citations
lib/v1/library-routes-helpers.ts—projectSummary/projectState/projectItemdensity projections shared between/api/v1/library/items(REST) and the MCPlibrary_*tools.lib/v1/passages-search-helpers.ts—parseSearchRequest+loadChunkIndex+ scoring shared between/api/v1/passages/searchand MCPpassages_search.docs/CONTRACT-CHANGELOG.md(v1.2.0 entry) — the explicit rule "REST and MCP must serve identical payloads" that motivated the co-location pattern.- See also: DevPlane P14 (Parallel API Surfaces over a Shared Core) for the generalized form spanning REST + MCP + CLI.
P12. Vendored Consumer Contract with Explicit Semver Ratification
Problem A capability you maintain (a typed schema, a request/response surface, an SDK contract) is consumed by several downstream apps in your portfolio. You don't want to publish to npm — the producer and consumers are in different repos but one developer's filesystem, the iteration is fast, and a npm publish-cycle would slow everything. You want consumers to vendor (copy) the contract verbatim and re-vendor on each bump, with explicit ratification of every breaking decision so the diff is tiny and reviewable.
The Pattern
/**
* <capability> — Canonical Zod contract (v1.0.0)
* ===================================================
*
* This file is the **canonical home** of the contract surface. It is NOT
* vendored from anywhere — consumers vendor FROM here.
*
* Consumer vendoring pattern: copy this file verbatim into a consumer repo
* at a stable location (e.g. `src/lib/<capability>/contract.ts`), pin the
* `CONTRACT_VERSION` constant, and add a header comment of the form:
*
* Vendored from <producer-repo> @ <SHA>
* packages/<capability>/contracts/types.ts
*
* On contract bumps, re-vendor verbatim and bump the consumer's recorded
* `CONTRACT_VERSION` to match. See ./README.md for the explicit decisions
* encoded in this surface and the semver policy.
*/
import { z } from "zod";
export const CONTRACT_VERSION = "1.0.0" as const;
export const ItemSchema = z.object({
id: z.string().min(1),
title: z.string().min(1),
description: z.string().optional(),
// ...
});
export type Item = z.infer<typeof ItemSchema>;
// Endpoint registry — consumers wire their HTTP adapter from this map.
export const DATA_CONTRACTS = {
list_items: { method: "GET", path: "/v1/items", response: z.array(ItemSchema) },
get_item: { method: "GET", path: "/v1/items/:id", response: ItemSchema },
} as const;
And the companion README documents what was decided and why:
# `<capability>` contracts — canonical Zod surface
**Status:** v1.0.0 — first stable contract surface.
**Canonical home:** this directory. Consumers vendor FROM here.
## Decisions encoded in this surface
These are called out so re-vendor reviewers know what changed.
### 1. Casing: camelCase (not snake_case)
The donor schema used snake_case. v1.0.0 ratifies camelCase as canonical.
Why: TS dominance in the consumer set; eliminates per-consumer transforms.
### 2. Field name: `brief` (not `brief_statement`)
Shorter, less verbose, same semantics.
### 3. `metadata.source` enum has FOUR values
The donor had three; we add `ai_drafted` because LLM-authoring is real.
Future: enum is additive-only across minor bumps; removal = major bump.
### 4. Version: 1.0.0 (first stable, not pre-release)
Earlier consumer integrations pinned 0.1.0 as bootstrap. v1.0.0 declares
this surface partner-ready.
## Semver policy
| Bump | Triggers |
|---|---|
| Patch (1.0.x) | Doc-only; defaults; non-rejecting refinements. |
| Minor (1.x.0) | Additive optional fields. Additive enum values. New endpoints. |
| Major (x.0.0) | Field rename. Field removal. Type/required change. Enum reordering. |
Consumers should re-vendor on every minor; majors trigger explicit
coordination filed as cards in each downstream queue.
Design decisions
- Vendoring, not npm. The contract is one file. Publishing to npm adds a registry round-trip, a version-resolution layer, and CI dependencies that don't pay back for the small consumer set. Vendoring with explicit
CONTRACT_VERSIONconstant + SHA header gives precise provenance without infrastructure. - Producer-side documents the ratification decisions. When a downstream pre-existed the canonical contract and introduced its own conventions, the canonical surface ratifies (or rejects) each one explicitly. Re-vendor reviewers don't need to re-litigate.
- Semver bumps have explicit triggers in the README. "Renaming a field is major" is not opinion; it's the policy. Removes argument time during PRs.
- The contract file is types-only. No business logic, no helpers, no class instances. Vendoring stays clean; the consumer's bundle doesn't accidentally inherit producer-side implementation.
- The endpoint registry (
DATA_CONTRACTS) is part of the contract. Consumers wire their HTTP adapters from this map; a path change in the producer is a type-level visible change in every consumer.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Bootstrap cost is one copy — no npm registry, no package publish | No transitive dependency tracking — consumers must self-update on bumps |
Re-vendor diff is verbatim — reviewers can diff -u to verify | Drift possible if consumers patch the vendored copy locally |
| Ratification doc captures hard-won decisions inline | Documentation work scales with consumer count |
| Contract changes surface as type errors in every consumer's typecheck after re-vendor | Consumers can lag a version with no enforcement (only social) |
Citations
packages/job-family-agent/contracts/types.ts— full canonical Zod surface forjob-family-agentv1.0.0 withJOB_FAMILY_CONTRACT_VERSIONconstant.packages/job-family-agent/contracts/README.md— ratification doc covering the four decisions (casing, brief, enum, version) carried forward from the Performix bootstrap vendor.docs/AGENT-ASSIGNMENTS.mdMF-063 — the assignment card that produced the contract and the explicit policy "consumers vendor FROM here; nobody vendors INTO here."
P13. Multi-Candidate Consolidation Pipeline (Score → Consolidate → Fallback)
Problem You run an extraction task that's hard for any single LLM call to nail — the document is long, the structure is irregular, the model hallucinates. Instead of one extraction, you fire N parallel candidate extractions with different prompts / models / temperatures, then need to produce one consolidated artifact. You want: a scoring pass that rates each candidate against the source, a merge pass that combines them, and a robust fallback when the merger itself fails.
The Pattern
import OpenAI from "openai";
export interface Candidate<T> { id: string; output: T; }
export interface CandidateScore { id: string; faithfulness: number; completeness: number; clarity: number; comment: string; }
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
// PHASE 1 — score each candidate against the source.
async function scoreOne<T>(source: string, candidate: Candidate<T>, promptTemplate: string): Promise<CandidateScore> {
const userPrompt = `${promptTemplate}\n\nSOURCE:\n${source}\n\nCANDIDATE:\n${JSON.stringify(candidate.output)}\n\nReturn JSON with faithfulness, completeness, clarity (1-10 each), and a comment.`;
try {
const completion = await openai.chat.completions.create({
model: "gpt-4o",
messages: [
{ role: "system", content: "You are an evaluator. Return ONLY a JSON object." },
{ role: "user", content: userPrompt },
],
temperature: 0,
});
const text = completion.choices[0]?.message?.content ?? "";
const json = stripMarkdownFences(text);
const parsed = JSON.parse(json);
return {
id: candidate.id,
faithfulness: clamp(parsed.faithfulness, 1, 10),
completeness: clamp(parsed.completeness, 1, 10),
clarity: clamp(parsed.clarity, 1, 10),
comment: parsed.comment ?? "no comment",
};
} catch (err) {
return { id: candidate.id, faithfulness: 1, completeness: 1, clarity: 1, comment: `scoring failed: ${(err as Error).message}` };
}
}
export async function scoreCandidates<T>(source: string, candidates: Candidate<T>[], promptTemplate: string): Promise<CandidateScore[]> {
// Sequential to avoid rate limits.
const out: CandidateScore[] = [];
for (const c of candidates) out.push(await scoreOne(source, c, promptTemplate));
return out;
}
// PHASE 2 — consolidate the candidates (LLM-driven merge with fallback).
export async function consolidateSection<T>(source: string, sectionName: string, candidates: Candidate<T>[], promptTemplate: string): Promise<T | null> {
const userPrompt = `${promptTemplate}\n\nSOURCE (truncated):\n${source.slice(0, 50000)}\n\nSECTION: ${sectionName}\n\nCANDIDATES:\n${JSON.stringify(candidates, null, 2)}\n\nReturn the consolidated section as JSON only.`;
try {
const completion = await openai.chat.completions.create({
model: "gpt-4o",
messages: [
{ role: "system", content: "You are a consolidation agent. Return ONLY JSON." },
{ role: "user", content: userPrompt },
],
temperature: 0,
});
const text = completion.choices[0]?.message?.content ?? "";
const json = stripMarkdownFences(text);
return JSON.parse(json) as T;
} catch (err) {
// Fallback: return the FIRST candidate's section. Never crash the pipeline.
console.error(`consolidation failed for ${sectionName}: ${(err as Error).message}`);
return candidates[0]?.output ?? null;
}
}
function stripMarkdownFences(text: string): string {
let s = text.trim();
if (s.startsWith("```")) {
const lines = s.split("\n");
s = lines.slice(1, -1).join("\n").trim();
}
return s;
}
function clamp(n: number, lo: number, hi: number): number {
return Math.max(lo, Math.min(hi, Number(n) || lo));
}
Design decisions
- Score is per-candidate, three axes, 1–10. Faithfulness (matches the source), completeness (covers what should be covered), clarity (readable). Three is enough to differentiate; more axes invites prompt drift.
- Sequential scoring, not parallel. Rate limits matter more than wall-clock latency on these long-document calls. A 20-second penalty per call is preferable to half the calls failing on a 429.
- Consolidation is per-section, not whole-document. Sections are independent — merging the "key takeaways" section doesn't help or hurt the "tools and frameworks" section. Per-section means a section-level failure doesn't poison the whole consolidation.
- Markdown fence-stripping is mandatory. Every LLM provider occasionally wraps "pure JSON" output in
```json ... ```despite explicit instructions. Strip and re-try parse; failing that, fall back. - Score-failure returns lowest score with comment. Better to mark a candidate as "we couldn't score this" than to omit it from the rank. The consolidation can decide whether to drop low-scored or "scoring failed" candidates.
- Consolidation-failure falls back to first candidate's section. Never let the pipeline crash on a merger hiccup — the first candidate is at least real extracted content, not nothing.
- Temperature 0 for both scoring and consolidating. Determinism beats creativity at this layer.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Per-section consolidation isolates failures | N calls per section per source — expensive |
| Fallback path means the pipeline never crashes on merger errors | "Fallback fired" can mask quality regressions if not surfaced |
| Score breakdown supports per-section quality tracking over time | Scoring is itself an LLM call — the judges aren't independent of the candidates' models |
| Markdown-fence stripping defends against the most common LLM output bug | Doesn't catch all output-format mistakes (e.g., extra prose before/after JSON) |
Citations
OLD-legacy: packages/referee/src/score.ts—scoreCandidatesAgainstBook()with the three-axis 1–10 rubric, fence-stripping, per-candidate failure fallback.OLD-legacy: packages/referee/src/consolidate.ts—consolidateSummaries()with per-section LLM consolidation and first-candidate fallback on merger failure.OLD-legacy: packages/referee/src/refine.ts— secondary refinement pass that applies the same shape to a single best-pick candidate when consolidation is overkill.OLD-legacy: packages/research_agent/src/quality-control.ts— same shape applied to research-paper extractions.
P14. Cache-on-mtime Read-Through Snapshot
Problem You serve a frequently-read JSON snapshot (a content-state report, a registry index, a precomputed view) from a file that regenerates periodically — every minute, every hour, every deploy. Reading it on every request is wasteful (parse cost adds up); caching it forever stales when the regenerator writes a new version. You want a cache that's automatically invalidated when the underlying file changes, with no manual cache-busting and no separate refresh API.
The Pattern
import "server-only";
import * as fs from "node:fs";
import * as path from "node:path";
export interface Snapshot {
version: string;
generated_at: string;
items: unknown[];
}
let cached: { mtimeMs: number; snapshot: Snapshot } | null = null;
function snapshotPath(): string {
return path.resolve(process.cwd(), "data", "snapshot.json");
}
export function loadSnapshot(): Snapshot | null {
const p = snapshotPath();
let stat: fs.Stats;
try {
stat = fs.statSync(p);
} catch {
return null; // file missing — consumer treats as "no snapshot yet"
}
// mtime-keyed cache: if the regenerator wrote a new version, mtimeMs changes,
// we drop the cached value and re-parse. No explicit invalidation needed.
if (cached && cached.mtimeMs === stat.mtimeMs) {
return cached.snapshot;
}
try {
const raw = fs.readFileSync(p, "utf-8");
const parsed = JSON.parse(raw) as Snapshot;
cached = { mtimeMs: stat.mtimeMs, snapshot: parsed };
return parsed;
} catch {
// parse error: don't blow up; return null so consumer can serve "no data" gracefully
return null;
}
}
// Optional test hook for forcing re-load (e.g., between integration tests).
export function resetSnapshotCache(): void {
cached = null;
}
Design decisions
mtimeMsis the cache key. A regenerator that writes the snapshot updates the file's mtime; the next read sees the changed mtime, drops the cached parsed value, re-parses. No watchers, no inotify, no signals.fs.statSyncis cheap. Stat is a metadata-only call — much cheaper than reading the file. The cache pays for itself the moment the snapshot is ≥10× the size of a stat call (i.e., always).- Missing file returns
null, doesn't throw. The snapshot may not have been generated yet (fresh deploy, never-run regenerator). Consumers seenulland serve the "no data" UI; they're not interrupted by an unhandled exception. - Parse errors return
nulltoo. A half-written snapshot from a racy regenerator surfaces asnullinstead of a thrown SyntaxError. The regenerator should write atomically (P02-style temp+rename); if it doesn't, this defends. - Module-level cache, no class. One cache per process. Multiple processes (multiple Vercel function instances) each maintain their own cache; that's the right granularity since each is independent.
server-onlyimport. Marks the module as forbidden in client bundles — prevents an accidental client-side import that would attemptfs.statSyncin the browser at build time.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| No explicit invalidation — regenerator just writes the file | Cache lives in process memory; multi-instance deployments don't share |
Cheap probe (statSync) on every read | Parse cost paid only once per mtime change |
| Tolerates missing + malformed snapshots gracefully | A constantly-rewritten file defeats the cache — high-frequency regeneration removes the benefit |
| One function, no setup | Test isolation requires the resetSnapshotCache() hook |
Citations
lib/content-state/snapshot.ts—loadContentStateSnapshot()with mtime-keyed cache, missing-file →null, parse-error →null. Backs the/api/v1/state/{health,items}routes and the Console's Integrity Dashboard loaders.
This catalog is the production-validated record of engineering shapes that have hardened in Meta Factory. The OLD-vs-PROD split (P03) is the load-bearing decision; the rest are local consequences of producing one durable artifact tree (P02) that a thin host can serve cheaply via shared shape helpers (P11) over typed contracts (P12).