peopleanalyst

parts / meta-factory

MetaFactory — reusable patterns

A two-repo content-ingestion engine: a heavy local pipeline (collector, organizer, quality gate, classifier) emits durable canonical-output artifacts per source; a thin cloud API host serves them over REST + MCP. Patterns extract the load-bearing seams — the pipeline-vs-host split, the checkpoint-resumable staged pipeline, the SHA-256 mirror with restore-verify, multi-source reconciliation with quality-flag gates, and the vendored consumer-contract pattern with explicit semver ratification.

14 patterns·source: people-analyst/meta-factory + people-analyst/meta-factory-prod/docs/REUSABLE_PATTERNS.md

Meta Factory — Reusable Engineering Patterns

Production-validated patterns from the Meta Factory codebase, stripped of business context, written to be dropped into any new system.

Each pattern is copy-paste-ready TypeScript. Domain-specific terminology has been removed. Citations at the bottom of each entry point to the concrete implementation; sources are split across two repos — the active host meta-factory-prod (paths shown bare) and the heavy ingestion engine in the legacy meta-factory repo (cited as OLD-legacy: prefix). See P03 for the rationale behind the split.


Pattern Index

#Pattern NameCore TechnologyProblem Solved
P01Checkpoint-Resumable Staged PipelineNode fs + per-step JSON state fileRun a long, multi-step LLM extraction that can crash on step 7 and resume without re-running steps 1–6
P02Canonical-Output as Durable Stage BoundaryFilesystem tree + per-stage JSONDecouple expensive producer pipelines from cheap consumer reads so consumers can ship before producers stabilize
P03Pipeline-vs-Host Two-Repo SplitFilesystem + asymmetric env wiringRun heavy, machine-bound ingestion locally while shipping a thin cloud API host that reads its outputs
P04Cryptographic Content Mirror with Restore-VerifySHA-256 + Supabase Storage + JSON manifestTreat extracted artifacts as deletable cache that can be byte-identically restored from the cloud
P05Three-Tier Resource Resolver with Bundled Snapshot Fallbackenv-driven precedence + bundled JSONServe the same API in local dev (live FS), cloud (bundled snapshot), and uninitialized (empty fallback) without code branches
P06Asset Registry with Deterministic Domain-Typed IDsTypeScript types + path resolution tableEnd scattered path-guessing across a polyglot ingestion tree by giving every artifact one globally unique addressable id
P07Multi-Source Reconciler with Quality-Flag GatesPure reducer + validator chainMerge per-item state from N backends into one view, with explicit quality gates that block known failure modes
P08Multi-Backend Mount Status with Last-Seen Cachefs.existsSync probe + per-backend cacheTolerate intermittent storage (external drives, cloud mounts) without losing the previous-session content listing
P09Resilient Extraction Wrapper with Error-Class-Aware RetryTry/catch loop + quota-error fast-pauseWrap third-party API calls with retry logic that distinguishes transient failures (back off) from quota exhaustion (stop)
P10Canonical Anchor Dereference with Result-Typed ErrorsString-encoded path + discriminated unionAddress a single leaf inside a nested JSON document with one opaque string, fail cleanly on missing/malformed addresses
P11REST + MCP Co-Located Shape HelpersShared helper module + density flagServe byte-identical payloads from REST and MCP surfaces without copy-paste drift
P12Vendored Consumer Contract with Explicit Semver RatificationPlain copy + version constant + ratification docPublish a typed contract that downstream apps vendor (not npm-install), with explicit decisions captured so re-vendor diffs stay tiny
P13Multi-Candidate Consolidation Pipeline (Score → Consolidate → Fallback)LLM judge + LLM merger + first-candidate fallbackTurn N parallel best-effort extractions into one consolidated artifact, with a graceful fallback when the merger itself fails
P14Cache-on-mtime Read-Through Snapshotfs.statSync + cached parsed valueServe a frequently-read JSON snapshot cheaply, auto-invalidating when the file regenerates

Patterns

P01. Checkpoint-Resumable Staged Pipeline

Problem You run a long, multi-step pipeline against an expensive input (a book, a large transcript, a research paper) — each step is an LLM call that takes minutes and costs real money. Step 7 of 11 fails. You want to resume from step 7 on the next invocation, not re-run steps 1–6. You also want the resumption logic to be a property of the pipeline runner, not threaded through every step.

The Pattern

import * as fs from "node:fs";
import * as path from "node:path";

interface StepState {
  stepId: string;
  status: "pending" | "running" | "completed" | "failed" | "skipped";
  startedAt?: string;
  completedAt?: string;
  error?: string;
  outputFile?: string;
}

interface PipelineState {
  runId: string;
  startedAt: string;
  lastUpdated: string;
  steps: StepState[];
  completed: boolean;
}

const statePath = (runDir: string) => path.join(runDir, "state.json");

function loadState(runDir: string): PipelineState | null {
  const p = statePath(runDir);
  if (!fs.existsSync(p)) return null;
  try {
    return JSON.parse(fs.readFileSync(p, "utf-8")) as PipelineState;
  } catch {
    return null;
  }
}

function saveState(runDir: string, state: PipelineState): void {
  state.lastUpdated = new Date().toISOString();
  fs.writeFileSync(statePath(runDir), JSON.stringify(state, null, 2));
}

function isStepCompleted(state: PipelineState, stepId: string): boolean {
  const step = state.steps.find((s) => s.stepId === stepId);
  // Completion requires THREE things: status, output file path, and file present on disk.
  // Status alone lies after manual file deletion.
  return !!step && step.status === "completed" && !!step.outputFile && fs.existsSync(step.outputFile);
}

export interface PipelineStep {
  id: string;
  run: (ctx: Record<string, unknown>) => Promise<{ output: string; outputFile: string }>;
  outputKey: string;
}

export async function runPipeline(
  runDir: string,
  steps: PipelineStep[],
  opts: { resume?: boolean } = {}
) {
  fs.mkdirSync(runDir, { recursive: true });

  let state = opts.resume ? loadState(runDir) : null;
  if (!state) {
    state = {
      runId: `run-${Date.now()}`,
      startedAt: new Date().toISOString(),
      lastUpdated: new Date().toISOString(),
      steps: steps.map((s) => ({ stepId: s.id, status: "pending" })),
      completed: false,
    };
  }

  const context: Record<string, unknown> = {};
  for (const step of steps) {
    // Resume: skip if previously completed AND output file still on disk.
    if (opts.resume && isStepCompleted(state, step.id)) {
      const cached = fs.readFileSync(state.steps.find((s) => s.stepId === step.id)!.outputFile!, "utf-8");
      context[step.outputKey] = cached;
      continue;
    }

    const s = state.steps.find((x) => x.stepId === step.id)!;
    s.status = "running";
    s.startedAt = new Date().toISOString();
    saveState(runDir, state);

    try {
      const result = await step.run(context);
      context[step.outputKey] = result.output;
      s.status = "completed";
      s.completedAt = new Date().toISOString();
      s.outputFile = result.outputFile;
      s.error = undefined;
    } catch (err) {
      s.status = "failed";
      s.error = (err as Error).message;
      saveState(runDir, state);
      throw err; // Next resume picks up here.
    }
    saveState(runDir, state);
  }

  state.completed = true;
  saveState(runDir, state);
  return context;
}

Design decisions

  • State.json next to outputs. Per-run state lives in runs/<inputId>/<runId>/state.json alongside the actual step output files. One directory carries everything needed to resume; no separate state-database to keep in sync.
  • isStepCompleted triple-checks: status + output path + file-on-disk. Trusting the status flag alone is a hazard — a manual rm of an output file leaves the status saying completed but the file gone. Three checks make resumption robust against half-clean states.
  • Failures throw and persist; success persists then continues. On crash, the failed step's state is on disk; the unfinished steps stay pending. Resumption is "rerun the first non-completed step."
  • Context is built from cached outputs on resume. The runner reads the cached output files back into the same context map that the step would have populated. Steps after the resume point can't tell they're in a resumed run.
  • One run directory per invocation. A new invocation gets a new runId; resumption is explicit ({ resume: true, runId }). Avoids the ambiguity of "which prior partial run am I picking up?"

Tradeoffs

StrengthsWeaknesses
Recovering from a step-7 failure costs zero re-runs of steps 1–6One state file per run; long-tail runs proliferate on disk
Manual mid-pipeline editing is supported — delete an output file to force that step to re-runSteps must be pure-functions of the context they're given; hidden state breaks resumption
New steps drop into the pipeline with zero schema migrationState schema changes require a one-time state.json patch or full re-run
Trivial to introspect — cat state.json tells you exactly where the pipeline isConcurrent runs against the same runDir will corrupt the state file (no lock)

Citations

  • OLD-legacy: packages/collector/src/state.ts — full PipelineState shape + load/save/isCompleted helpers.
  • OLD-legacy: packages/collector/src/pipeline.ts — the runner that checks isStepCompleted, restores context from cached outputs, persists step-by-step.
  • OLD-legacy: packages/research_agent/src/processing-state.ts — same shape applied to the research-paper extraction pipeline (different stages, identical resumption logic).
  • See also: DevPlane P01 (Load–Modify–Save State File) for the simpler one-shot variant; this pattern adds the resumption discipline on top.

P02. Canonical-Output as Durable Stage Boundary

Problem You have an expensive producer pipeline (slow, model-driven, hardware-bound) and a fast consumer surface (a REST API, a web UI, an MCP server) that wants to query the producer's output. If the consumer reads the producer's in-memory representation, the two are coupled — the API can't ship until the pipeline stabilizes, and any pipeline restart blocks reads. You want the producer to emit a durable artifact the consumer reads from disk, decoupling them at a stable contract.

The Pattern

import * as fs from "node:fs";
import * as path from "node:path";

// The contract — what the consumer agrees to read.
export interface CanonicalDocument {
  source_id: string;
  setup?: Record<string, unknown>;
  structure?: { chapters?: Array<{ id?: string; [k: string]: unknown }> };
  provenance: {
    pipeline_version: string;
    generated_at: string;
  };
}

// Producer-side: atomic write so a half-finished producer can't be read mid-flight.
export function emitCanonical(rootDir: string, sourceId: string, doc: CanonicalDocument): void {
  const dir = path.join(rootDir, "canonical_outputs", sourceId);
  fs.mkdirSync(dir, { recursive: true });
  const finalPath = path.join(dir, "summary.json");
  const tmpPath = finalPath + ".tmp";
  fs.writeFileSync(tmpPath, JSON.stringify(doc, null, 2));
  fs.renameSync(tmpPath, finalPath); // atomic on POSIX
}

// Consumer-side: read with explicit "missing" envelope, never throw on missing source.
export type ReadResult =
  | { status: "ok"; document: CanonicalDocument }
  | { status: "missing"; source_id: string }
  | { status: "malformed"; source_id: string; error: string };

export function readCanonical(rootDir: string, sourceId: string): ReadResult {
  const finalPath = path.join(rootDir, "canonical_outputs", sourceId, "summary.json");
  if (!fs.existsSync(finalPath)) return { status: "missing", source_id: sourceId };
  try {
    const raw = fs.readFileSync(finalPath, "utf-8");
    const doc = JSON.parse(raw) as CanonicalDocument;
    return { status: "ok", document: doc };
  } catch (err) {
    return { status: "malformed", source_id: sourceId, error: (err as Error).message };
  }
}

// Per-document side artifacts (chunks, classifications, processing state) live as
// SIBLING files inside the same per-source directory:
//   canonical_outputs/<sourceId>/summary.json           (main artifact)
//   canonical_outputs/<sourceId>/chunks.json            (extracted leaf units)
//   canonical_outputs/<sourceId>/classification.json    (taxonomy tags)
//   canonical_outputs/<sourceId>/processing-state.json  (pipeline step progress)
//   canonical_outputs/<sourceId>/chapters/<chId>/...    (per-subunit artifacts)
//
// The producer can update one sibling without touching the others; consumers
// who only want chunks can skip the summary parse cost entirely.

Design decisions

  • Atomic write via temp + rename. The producer is slow; partial writes are the failure mode that bites hardest. POSIX rename gives all-or-nothing semantics; no consumer ever sees a half-written summary.
  • One directory per source, sibling files per concern. The directory is the unit of identity; files inside are independently regenerable. A re-extract chunks only invocation overwrites chunks.json and leaves the rest alone.
  • Read returns a discriminated envelope. "Missing" is a normal state during pipeline bring-up; "malformed" is real corruption. Consumers pattern-match instead of try/catching.
  • Provenance baked into the artifact. pipeline_version and generated_at travel with the document. Consumers can detect "this was extracted with an old pipeline" without a separate index.
  • No write API for consumers. The contract is read-only by construction — the consumer surface never has the option to mutate, which means the producer is the single writer and the cache-coherence question never arises.

Tradeoffs

StrengthsWeaknesses
Producer and consumer can deploy independentlyEventual-consistency window between pipeline finish and consumer-visible update
Reads are cheap (one file read per source)Storage cost scales linearly with sources (no compression)
Pipeline restart doesn't block reads of already-emitted artifactsSchema migrations require re-running the pipeline on every source
Trivially backupable (the directory IS the data)No cross-source queries without an in-memory index layer (see P11)

Citations

  • OLD-legacy: packages/collector/src/pipeline.ts — producer emits canonical_outputs/<bookId>/summary.json + per-chapter siblings under chapters/ch<n>/.
  • lib/v1/passages-search-helpers.ts — consumer loads canonical_outputs/<bookId>/chunks.json + per-chapter chunks into an in-memory search index without ever touching the producer.
  • lib/v1/content-storage.ts — same directory layout mirrored to Supabase Storage so the cloud-deployed host can read the canonical artifacts even when the producer machine is offline.

P03. Pipeline-vs-Host Two-Repo Split

Problem You're running an expensive ingestion pipeline (LLM-heavy, hours per source, dependent on local hardware and large external storage) and a thin cloud API host that serves the pipeline's outputs to consumers. Co-locating them in one repo makes the cloud deploy carry hundreds of MB of pipeline machinery it'll never run; co-locating also tempts the pipeline to import "convenience" helpers from the host that quietly become load-bearing and break the cloud build. You want the two responsibilities in two repos with a deliberate one-way data dependency.

The Pattern

┌──────────────────────────────────────────────────────────────────────┐
│ Repo A: <product>-pipeline (heavy, local)                            │
│                                                                      │
│   packages/collector/      ─┐                                        │
│   packages/organizer/       │  staged producer (P01)                 │
│   packages/quality-gate/    │                                        │
│   packages/classifier/     ─┘                                        │
│           │                                                          │
│           ▼                                                          │
│   canonical_outputs/<sourceId>/{summary.json,chunks.json,...}        │
│   asset-registry.json   ◄── single source of truth for identity (P06)│
│                                                                      │
│   [Independently invoked: `npm run pipeline -- <sourceId>`]          │
└──────────────────────────────────────────────────────────────────────┘
                          │
                          │ A produces; B reads. Never the reverse.
                          ▼
┌──────────────────────────────────────────────────────────────────────┐
│ Repo B: <product>-host (thin, cloud)                                 │
│                                                                      │
│   app/api/v1/.../route.ts   ◄── REST surface                         │
│   app/mcp/...               ◄── MCP surface                          │
│   lib/v1/registry.ts        ◄── resolution order:                    │
│                                  1. live `<A>/asset-registry.json`   │
│                                  2. bundled snapshot under lib/v1/   │
│                                  3. empty (404 to consumers)         │
│   lib/content-state/mirror.ts ◄── cloud mirror of A's outputs        │
│                                  (P04 — SHA-256 + restore-verify)    │
│                                                                      │
│   [Deployed to Vercel; reads via env var META_FACTORY_ROOT in dev,   │
│    bundled snapshot + cloud bucket in prod.]                         │
└──────────────────────────────────────────────────────────────────────┘

Design decisions

  • The data dependency points one way. The host reads the pipeline's outputs; the pipeline never reads from the host. If a host-side feature needs new producer logic, it lands in the pipeline repo, the pipeline re-runs, the outputs migrate via P04's mirror, the host re-reads. No circular "host calls back into pipeline" anti-pattern.
  • Identity lives with the pipeline. The asset-registry.json that names every artifact is in the pipeline repo; the host snapshots it (P05). This avoids a class of bugs where the host invents an id the pipeline didn't produce.
  • The host deploys without the pipeline. Cloud builds don't pull the pipeline's node_modules (hundreds of dependencies, image-processing libs, LLM SDK retry shims, etc.). Build times are minutes, not hours.
  • Each repo has its own README, CI, deployment story. No "monorepo with packages but you still need to remember which scripts only work in which subdirectory."
  • The host can serve an empty registry gracefully. If the producer hasn't run yet (or its machine is offline), the host returns 404s for source lookups but stays up. Health checks pass; the API surface doesn't crash.

Tradeoffs

StrengthsWeaknesses
Cloud deploy stays small + fast — pipeline deps don't shipTwo repos to update when the contract changes
Pipeline can churn on local hardware without redeploying the hostDrift risk: pipeline emits a new field, host doesn't read it yet
One-way data flow rules out a class of circular bugs"Where does code go?" requires a judgment call until the split is internalized
Each repo's package.json reflects only what it actually runsBootstrapping a new sister-host means recreating the resolution-order glue from scratch

Citations

  • lib/v1/env.ts — the META_FACTORY_ROOT env var that lets the host point at any pipeline checkout in dev. In prod the var is unset and the bundled snapshot serves.
  • lib/v1/registry.ts — three-tier resolver: live filesystem → bundled snapshot → empty (see P05 for the generalized pattern).
  • lib/content-state/mirror.ts — the one-way mirror that pushes pipeline outputs to cloud storage so the host can serve them when the pipeline machine is offline.
  • docs/decisions/MF-DEC-1.md + docs/decisions/MF-DEC-19.md — the explicit decisions that locked this topology.
  • OLD-legacy: packages/ (collector/organizer/quality-gate/classifier/research_agent/deep_research_agent) — the entire pipeline tree that the host never imports.

P04. Cryptographic Content Mirror with Restore-Verify

Problem Your producer pipeline emits artifacts to a local filesystem (a per-source directory under canonical_outputs/). The disk is your machine — it can die, get reformatted, run out of space. You want every artifact byte-identically restorable from cloud storage, and you want a hash-verified manifest so a "restore" operation can prove byte-for-byte fidelity, not just "we got some bytes back."

The Pattern

import * as fs from "node:fs";
import * as path from "node:path";
import * as crypto from "node:crypto";

export interface FileManifest {
  relpath: string;     // relative to <sourceDir>
  hash: string;        // sha256 hex (full)
  size_bytes: number;
  mtime: string;
}

export interface MirrorManifest {
  version: "1.0";
  source_id: string;
  generated_at: string;
  bucket: string;
  cloud_prefix: string;
  files: FileManifest[];
  cloud_synced: boolean;
}

function hashFile(filePath: string): { hash: string; size: number; mtime: Date } {
  const data = fs.readFileSync(filePath);
  const hash = crypto.createHash("sha256").update(data).digest("hex");
  const stat = fs.statSync(filePath);
  return { hash, size: stat.size, mtime: stat.mtime };
}

function walkRel(rootDir: string, sub = ""): string[] {
  const out: string[] = [];
  const dir = sub ? path.join(rootDir, sub) : rootDir;
  for (const entry of fs.readdirSync(dir, { withFileTypes: true })) {
    if (entry.name.startsWith(".") && entry.name !== ".mirror-manifest.json") continue;
    if (entry.name.startsWith("._")) continue; // macOS junk
    const rel = sub ? path.join(sub, entry.name) : entry.name;
    const abs = path.join(rootDir, rel);
    if (entry.isDirectory()) out.push(...walkRel(rootDir, rel));
    else if (entry.isFile() && entry.name !== ".mirror-manifest.json") out.push(rel);
  }
  return out;
}

export function buildManifest(sourceDir: string, sourceId: string, bucket: string): MirrorManifest {
  const files: FileManifest[] = [];
  for (const rel of walkRel(sourceDir)) {
    const abs = path.join(sourceDir, rel);
    const h = hashFile(abs);
    files.push({ relpath: rel, hash: h.hash, size_bytes: h.size, mtime: h.mtime.toISOString() });
  }
  return {
    version: "1.0",
    source_id: sourceId,
    generated_at: new Date().toISOString(),
    bucket,
    cloud_prefix: `archive/${sourceId}`,
    files,
    cloud_synced: false,
  };
}

// Mirror = upload each file + write manifest. Set cloud_synced once all uploads succeed.
export async function mirror(sourceDir: string, sourceId: string, bucket: string,
                              upload: (relpath: string, abs: string, target: string) => Promise<void>) {
  const manifest = buildManifest(sourceDir, sourceId, bucket);
  for (const file of manifest.files) {
    const abs = path.join(sourceDir, file.relpath);
    const target = `${manifest.cloud_prefix}/${file.relpath}`;
    await upload(file.relpath, abs, target);
  }
  manifest.cloud_synced = true;
  fs.writeFileSync(path.join(sourceDir, ".mirror-manifest.json"), JSON.stringify(manifest, null, 2));
  return manifest;
}

// Restore = pull each file from cloud + verify hash matches manifest.
export async function restore(sourceDir: string, manifest: MirrorManifest,
                                download: (target: string) => Promise<Buffer>): Promise<{ ok: number; mismatched: string[] }> {
  fs.mkdirSync(sourceDir, { recursive: true });
  const mismatched: string[] = [];
  let ok = 0;
  for (const file of manifest.files) {
    const target = `${manifest.cloud_prefix}/${file.relpath}`;
    const data = await download(target);
    const actualHash = crypto.createHash("sha256").update(data).digest("hex");
    if (actualHash !== file.hash) {
      mismatched.push(`${file.relpath}: manifest=${file.hash.slice(0, 8)}… actual=${actualHash.slice(0, 8)}…`);
      continue;
    }
    const abs = path.join(sourceDir, file.relpath);
    fs.mkdirSync(path.dirname(abs), { recursive: true });
    fs.writeFileSync(abs, data);
    ok++;
  }
  return { ok, mismatched };
}

Design decisions

  • SHA-256 per file, full hex. Truncated hashes save bytes in the manifest at the cost of "two truncations collided" forensic ambiguity. The manifest is small; full hex pays for itself the first time you need to investigate a mismatch.
  • Manifest lives WITH the files (.mirror-manifest.json next to the artifacts). After restore, the manifest is self-describing — anyone reading the restored directory can immediately verify integrity without external state.
  • cloud_synced flag set last. The flag flips to true only after every file uploads successfully. A partial upload leaves the flag false, surfaced honestly to operators.
  • Restore is verify-first, not write-first. A hash mismatch never writes the file to disk — the restored directory is either correct or absent for that path, never a mix of "verified bytes" and "unverified bytes."
  • macOS junk filtered out (._* files, .DS_Store). These leak into directory walks on Mac dev machines and would otherwise pollute every manifest with churn that has no semantic meaning.

Tradeoffs

StrengthsWeaknesses
Disk loss is recoverable; integrity is provable byte-for-byteFull file rewrites on every upload (no delta)
Manifest is self-describing — restore needs no external stateHash compute cost scales linearly with corpus size
Discriminated-result restore (ok count + mismatched list) surfaces partial failureDoesn't dedupe across sources (same chunk in two manifests = two cloud copies)
Works against any object store with put/get (S3, Supabase Storage, R2, GCS)No GC of old manifest versions in the cloud — cleanup is operator's job

Citations

  • lib/content-state/mirror.ts — full MirrorManifest shape + hashFile + buildManifest + mirrorBook + restoreBook against Supabase Storage. The "working = delete locally, restore, hashes match byte-for-byte" invariant is documented at the top.
  • scripts/restore-canonical.ts — CLI restore entry point that prints mismatched list when hashes diverge.

P05. Three-Tier Resource Resolver with Bundled Snapshot Fallback

Problem Your application reads a chunk of structured data (a registry, a config, a snapshot) that has three legitimate sources depending on environment: a live filesystem file in local dev (so dev edits show up immediately), a bundled snapshot baked into the build for cloud deploys (where the live file isn't available), and an empty fallback so the app stays up if both are missing (uninitialized state isn't a crash). You want one resolver and three explicit branches, with the chosen branch announced in logs so debugging "why is my dev pulling from prod?" is one log line away.

The Pattern

import * as fs from "node:fs";
import * as path from "node:path";

export interface RegistryData {
  version: "1.0";
  last_updated: string;
  assets: Record<string, unknown>;
}

const SNAPSHOT_PATH = path.join(process.cwd(), "lib", "data", "registry.snapshot.json");
const SNAPSHOT_VIRTUAL_ROOT = "<bundled-snapshot>";

function getLiveRoot(): string | null {
  const raw = process.env.RESOURCE_ROOT?.trim();
  return raw && raw.length > 0 ? path.resolve(raw) : null;
}

function tryParseRegistry(raw: string): RegistryData | null {
  try {
    const data = JSON.parse(raw) as RegistryData;
    if (!data.assets || typeof data.assets !== "object") return null;
    return data;
  } catch {
    return null;
  }
}

function emptyRegistryData(): RegistryData {
  return { version: "1.0", last_updated: new Date().toISOString(), assets: {} };
}

export function loadRegistry(): { root: string; data: RegistryData; source: "filesystem" | "bundled-snapshot" | "empty" } {
  const root = getLiveRoot();

  // Tier 1 — Live filesystem (preferred in local dev).
  if (root) {
    const livePath = path.join(root, "registry.json");
    if (fs.existsSync(livePath)) {
      try {
        const parsed = tryParseRegistry(fs.readFileSync(livePath, "utf-8"));
        if (parsed) {
          console.log(`[resolver] using filesystem registry at ${livePath}`);
          return { root, data: parsed, source: "filesystem" };
        }
      } catch {
        // fall through to snapshot
      }
    }
  }

  // Tier 2 — Bundled snapshot (preferred in cloud deploys).
  if (fs.existsSync(SNAPSHOT_PATH)) {
    try {
      const parsed = tryParseRegistry(fs.readFileSync(SNAPSHOT_PATH, "utf-8"));
      if (parsed) {
        console.log(`[resolver] using bundled snapshot (${SNAPSHOT_VIRTUAL_ROOT})`);
        return { root: SNAPSHOT_VIRTUAL_ROOT, data: parsed, source: "bundled-snapshot" };
      }
    } catch {
      // fall through to empty
    }
  }

  // Tier 3 — Empty (never crashes; consumers get 404s).
  console.warn(`[resolver] no live file at ${root ?? "<unset>"} and no bundled snapshot; serving empty`);
  return { root: SNAPSHOT_VIRTUAL_ROOT, data: emptyRegistryData(), source: "empty" };
}

Design decisions

  • Three tiers, explicit order. Each tier covers a real deployment posture: live filesystem (dev), bundled snapshot (cloud), empty (uninitialized). The order is encoded in code, not config — reading the function tells you everything.
  • Live filesystem wins when present. Dev iterations don't require re-running a snapshot script; edit the live file, refresh, see the change.
  • Bundled snapshot is a build artifact, not a runtime fetch. The snapshot is generated by a script (npm run snapshot) before deploy and committed to the repo. Cloud build picks it up automatically; no extra deploy plumbing.
  • Empty fallback returns a real object, not null. Consumers don't need to null-check the registry; they just see an empty assets map and serve 404s for lookups. The API stays up.
  • Every branch announces itself in logs. Without this, "why is the cloud serving stale data?" debugging is brutal. With it, you grep one line.
  • Loaded source travels with the data. The caller can distinguish "live data" from "bundled snapshot data" at runtime — useful for cache-control headers, debug surfaces, or refusing to mutate when the source is read-only-by-policy (snapshot).

Tradeoffs

StrengthsWeaknesses
Same code serves dev, cloud, and uninitialized statesThree branches means three things to test (in practice the empty branch is often under-tested)
Dev iteration doesn't require regenerating the snapshotSnapshot can drift from live if the refresh script isn't part of the deploy pipeline
App stays up when both sources are missingThe console.warn for empty is the only signal — easy to miss without log monitoring
Source-of-data attribution is preserved in the return valueConsumers can write code that branches on source, defeating the abstraction

Citations

  • lib/v1/registry.tsloadRegistry() with the three-tier resolution against META_FACTORY_ROOT env, bundled snapshot at lib/v1/data/asset-registry.snapshot.json, empty fallback. Source is reported as "filesystem" | "bundled-snapshot" | "empty".
  • scripts/snapshot-registry.ts — the build-time script that regenerates the bundled snapshot from the current live registry. Run before deploy.
  • lib/content-state/snapshot.ts — same pattern for the larger content-state snapshot served to integrity dashboards.

P06. Asset Registry with Deterministic Domain-Typed IDs

Problem Your ingestion tree has dozens of artifact types across multiple domains — source PDFs, extracted text, metadata blobs, derived structured outputs, models. Without a discipline, every consumer constructs its own path to find anything: path.join(root, "books", "text", ${id}.txt) here, path.join(root, "research", "raw", id, "metadata.json") there. The result is fragile string-building scattered across the codebase. You want one globally-unique addressable id per artifact and one place to resolve any id to an absolute path.

The Pattern

import * as fs from "node:fs";
import * as path from "node:path";

export type AssetDomain = "books" | "research" | "compensation" | "onet" | "bls";
export type AssetType =
  | "source_pdf" | "extracted_text" | "metadata" | "construct" | "model" | "registry_data";
export type StorageTier = "hot" | "warm" | "cold";

// Convention: <domain>:<asset_type>:<source_id>  e.g.  books:extracted_text:my_book_id
export interface Asset {
  asset_id: string;
  domain: AssetDomain;
  asset_type: AssetType;
  source_id: string;
  display_name: string;
  storage_tier: StorageTier;
  storage_path: string; // relative to repo root
  registered_at: string;
  parent_id?: string;
  derived_from?: string[];
  content_hash?: string;
  status: "registered" | "text_extracted" | "metadata_extracted" | "fully_processed";
}

export interface RegistryData {
  version: "1.0";
  last_updated: string;
  assets: Record<string, Asset>;
}

export class AssetRegistry {
  private data: RegistryData;
  constructor(private readonly path: string) {
    this.data = fs.existsSync(path)
      ? JSON.parse(fs.readFileSync(path, "utf-8"))
      : { version: "1.0", last_updated: new Date().toISOString(), assets: {} };
  }

  static buildId(domain: string, assetType: string, sourceId: string): string {
    return `${domain}:${assetType}:${sourceId}`;
  }

  get(assetId: string): Asset | undefined {
    return this.data.assets[assetId];
  }

  has(assetId: string): boolean {
    return assetId in this.data.assets;
  }

  resolveAbsolutePath(asset: Asset, repoRoot: string): string {
    return path.resolve(repoRoot, asset.storage_path);
  }

  register(asset: Omit<Asset, "asset_id" | "registered_at">): Asset {
    const asset_id = AssetRegistry.buildId(asset.domain, asset.asset_type, asset.source_id);
    const full: Asset = { ...asset, asset_id, registered_at: new Date().toISOString() };
    this.data.assets[asset_id] = full;
    this.flush();
    return full;
  }

  private flush(): void {
    this.data.last_updated = new Date().toISOString();
    const tmp = this.path + ".tmp";
    fs.writeFileSync(tmp, JSON.stringify(this.data, null, 2));
    fs.renameSync(tmp, this.path);
  }
}

// Consumers depend on this — never on raw path joins.
export class StorageResolver {
  constructor(private readonly registry: AssetRegistry, private readonly repoRoot: string) {}

  resolve(assetId: string): { asset: Asset; absolutePath: string; exists: boolean } | null {
    const asset = this.registry.get(assetId);
    if (!asset) return null;
    const absolutePath = this.registry.resolveAbsolutePath(asset, this.repoRoot);
    return { asset, absolutePath, exists: fs.existsSync(absolutePath) };
  }

  readText(assetId: string): string | null {
    const r = this.resolve(assetId);
    return r && r.exists ? fs.readFileSync(r.absolutePath, "utf-8") : null;
  }

  readJSON<T>(assetId: string): T | null {
    const text = this.readText(assetId);
    return text ? (JSON.parse(text) as T) : null;
  }
}

Design decisions

  • <domain>:<type>:<source_id> is the entire ID grammar. Three colon-separated segments. Globally unique, parse-trivial, sort-stable, grep-friendly. The id IS the natural-language description of the artifact.
  • Storage path is data, not code. The registry stores storage_path as a relative string; consumers never compute it. Moving an artifact from hot to cold storage is a registry edit, not a code change.
  • The resolver is the only place that joins paths. Every consumer asks the resolver "give me the absolute path for this id" — there's exactly one path-construction routine in the codebase. When the storage layout changes, you change one function.
  • Status enum tracks lifecycle. registered → text_extracted → metadata_extracted → fully_processed is the canonical lifecycle; consumers can filter "give me everything that's reached fully_processed" without inspecting individual artifacts.
  • Atomic write on flush. The registry is the load-bearing identity map; a half-written asset-registry.json would brick the entire system. Temp + rename gives all-or-nothing semantics.

Tradeoffs

StrengthsWeaknesses
One id grammar across all domainsRegistry file grows linearly with artifacts (10k+ entries is still fine, 1M is not)
Path construction lives in one fileAdding a new domain or asset type requires a TypeScript type bump
Status enum gates downstream consumers cleanlyNo multi-writer support — pipeline must serialize registry writes
Self-describing: cat asset-registry.json reveals the whole inventoryLookup is O(1) but full-table scans (find all domain=books) are O(n) — needs an index for large stores

Citations

  • OLD-legacy: packages/core/src/asset-registry.ts — full AssetRegistry class with id-construction helper, persistence, status enum, derived-from links.
  • OLD-legacy: packages/core/src/storage-resolver.tsStorageResolver with readText / readJSON / per-domain ergonomics so consumers never call fs.readFileSync against raw paths.
  • lib/v1/types.ts — the host-side V1RegistryAsset mirror of the pipeline-side Asset shape (necessary because the host doesn't import from the pipeline; see P03).
  • See also: Principia P04 (Deterministic Idempotent IDs from Content Fingerprints) for the related-but-distinct pattern of deriving ids from content rather than from declaration.

P07. Multi-Source Reconciler with Quality-Flag Gates

Problem Your view of "what content exists" is assembled from multiple backends — a primary asset registry, a snapshot bundle, on-disk pipeline outputs, a cloud storage listing. Each backend knows part of the picture; none knows the whole. You want one merged view per item, with explicit gates that block known failure modes (mislabeled files, format mismatches, corrupted extractions) from masquerading as healthy state.

The Pattern

type Backend = "registry" | "snapshot" | "filesystem" | "cloud";
type MountStatus = "online" | "offline" | "unknown";

export interface SourceLocation {
  backend: Backend;
  path: string;
  mount_status: MountStatus;
  last_seen_at?: string;
}

export interface ExtractionState {
  status: "unprocessed" | "running" | "done" | "failed";
  populated_count?: number;
  generated_at?: string;
}

export interface QualityFlag {
  validator: string;
  severity: "block" | "warn";
  message: string;
  evidence?: Record<string, unknown>;
}

export interface ContentItem {
  id: string;
  source_locations: SourceLocation[];
  extraction?: ExtractionState;
  quality_flags: QualityFlag[];
  state: "complete" | "in_progress" | "failed" | "unprocessed" | "source_missing" | "source_known_offline" | "quality_flagged";
}

// Quality validators — each runs against a partially-merged item and adds flags.
type Validator = (item: Omit<ContentItem, "state">) => QualityFlag[];

const validators: Validator[] = [
  // Example: detect when the on-disk text contradicts the declared title.
  (item) => {
    const flags: QualityFlag[] = [];
    const txt = item.source_locations.find((l) => l.path.endsWith(".txt") && l.mount_status === "online");
    if (!txt) return flags;
    // (read first 2KB; compare detected title to item.id-derived expectation; flag if disagreement)
    return flags;
  },
];

// Quality-flag-first state derivation: any block-severity flag wins, period.
function deriveState(item: Omit<ContentItem, "state">): ContentItem["state"] {
  if (item.quality_flags.some((f) => f.severity === "block")) return "quality_flagged";

  const hasOnline = item.source_locations.some((l) => l.mount_status === "online" && l.last_seen_at);
  const hasOffline = item.source_locations.some((l) => l.mount_status === "offline");
  if (!hasOnline && !hasOffline) return "source_missing";
  if (!hasOnline && hasOffline) return "source_known_offline";

  const ext = item.extraction;
  if (ext?.status === "running") return "in_progress";
  if (ext?.status === "failed") return "failed";
  if (ext?.status === "done" && (ext.populated_count ?? 0) > 0) return "complete";
  return "unprocessed";
}

// Reconciler — pure function over backend snapshots.
export function reconcile(
  registry: Map<string, { id: string; locations: SourceLocation[] }>,
  filesystem: Map<string, { id: string; extraction: ExtractionState; locations: SourceLocation[] }>,
): ContentItem[] {
  const idsAll = new Set([...registry.keys(), ...filesystem.keys()]);
  const out: ContentItem[] = [];
  for (const id of idsAll) {
    const r = registry.get(id);
    const f = filesystem.get(id);
    const merged: Omit<ContentItem, "state"> = {
      id,
      source_locations: [...(r?.locations ?? []), ...(f?.locations ?? [])],
      extraction: f?.extraction,
      quality_flags: [],
    };
    for (const v of validators) merged.quality_flags.push(...v(merged));
    out.push({ ...merged, state: deriveState(merged) });
  }
  return out;
}

Design decisions

  • Reconciler is a pure function over snapshots. Inputs are immutable maps from each backend; output is the merged view. No backends touched during merge, no side effects — trivial to test.
  • Quality flags trump everything. A block-severity flag overrides any extraction state. "Looks done but the title doesn't match the content" must not display as healthy.
  • State enum encodes the full reasoning. source_missingsource_known_offlinequality_flaggedunprocessed. Operators reading a dashboard understand exactly what state each item is in without inspecting the raw data.
  • Validators are pluggable but called in-line. A new validator is one function added to the list. No async ordering, no priority resolution — quality flags just accumulate.
  • Evidence travels with each flag. A flag without evidence is just an opinion; with evidence (matching scores, file paths, first-2KB samples), an operator can audit the call.
  • Locations from multiple backends are concatenated, not merged. Two backends reporting the same path produce two entries — preserves provenance over dedup convenience.

Tradeoffs

StrengthsWeaknesses
Block-severity validators catch real failure modes before they pollute downstreamValidators with high false-positive rates poison the operator's attention
State enum is rich enough to drive a kanban-style operator UI directlyAdding a new state requires updating every consumer that pattern-matches on it
Pure function makes reconcile trivially testable + cache-friendlyFull re-reconcile on any backend change — no incremental updates
Locations from N backends preserved means provenance is auditableConcatenated locations can grow unbounded for items in many backends

Citations

  • lib/content-state/reconcile.ts — full reconcile() over OLD-registry / library-snapshot / canonical-outputs backends with deriveState() quality-flag-first logic.
  • lib/content-state/validators.ts — the quality validators (title contradiction, format mismatch, run-together HTML detection) wired into reconcile.
  • lib/content-state/backends.ts — per-backend readers that produce the Map<id, ...> snapshots the reconciler consumes.

P08. Multi-Backend Mount Status with Last-Seen Cache

Problem Your content lives across multiple storage backends — external drives that come and go, cloud mounts that lag, local-disk caches. A backend being offline shouldn't make its content invisible; the system should remember "we saw this file on the external drive yesterday, the drive is offline now, but the file is still known to exist." You want per-backend mount-status reporting and a cache that remembers the last-seen listing.

The Pattern

import * as fs from "node:fs";
import * as path from "node:path";

export type Backend = "external-drive" | "local-cache" | "cloud" | "secondary-mount";
export type MountStatus = "online" | "offline" | "unknown";

const ROOTS: Record<Backend, string> = {
  "external-drive": "/Volumes/External/data",
  "local-cache": "/Users/me/data-cache",
  "cloud": "/Volumes/Cloud/data",
  "secondary-mount": "/Volumes/Secondary/data",
};

export function detectMountStatus(): Record<Backend, MountStatus> {
  const out = {} as Record<Backend, MountStatus>;
  for (const [b, root] of Object.entries(ROOTS) as [Backend, string][]) {
    out[b] = fs.existsSync(root) ? "online" : "offline";
  }
  return out;
}

interface BackendCache {
  version: "1.0";
  last_updated: string;
  listings: Partial<Record<Backend, { last_seen_at: string; paths: string[] }>>;
}

const CACHE_PATH = path.resolve(__dirname, "data", "backend-cache.json");

function loadCache(): BackendCache {
  try {
    return JSON.parse(fs.readFileSync(CACHE_PATH, "utf8"));
  } catch {
    return { version: "1.0", last_updated: new Date().toISOString(), listings: {} };
  }
}

function saveCache(cache: BackendCache): void {
  fs.mkdirSync(path.dirname(CACHE_PATH), { recursive: true });
  fs.writeFileSync(CACHE_PATH, JSON.stringify(cache, null, 2));
}

// When a backend is online: rescan and update its cache entry.
// When offline: return the last-seen listing so consumers can still reason about content.
export function listBackend(backend: Backend): { listing: string[]; status: MountStatus; last_seen_at: string } {
  const status = detectMountStatus()[backend];
  const cache = loadCache();

  if (status === "online") {
    const root = ROOTS[backend];
    const listing = walkRel(root);
    const now = new Date().toISOString();
    cache.listings[backend] = { last_seen_at: now, paths: listing };
    cache.last_updated = now;
    saveCache(cache);
    return { listing, status, last_seen_at: now };
  }

  // Offline: serve from cache, mark status honestly.
  const cached = cache.listings[backend];
  return {
    listing: cached?.paths ?? [],
    status,
    last_seen_at: cached?.last_seen_at ?? "never",
  };
}

function walkRel(root: string, sub = ""): string[] {
  const out: string[] = [];
  const dir = sub ? path.join(root, sub) : root;
  for (const entry of fs.readdirSync(dir, { withFileTypes: true })) {
    if (entry.name.startsWith(".") || entry.name.startsWith("._")) continue;
    const rel = sub ? path.join(sub, entry.name) : entry.name;
    if (entry.isDirectory()) out.push(...walkRel(root, rel));
    else if (entry.isFile()) out.push(rel);
  }
  return out;
}

Design decisions

  • fs.existsSync on the root is the mount probe. No need for mount parsing, no platform-specific calls. If the directory resolves, the backend is online.
  • Cache writes only on online detection. Offline backends never overwrite their cache — the last good listing stays authoritative until the drive comes back.
  • Caller gets {listing, status, last_seen_at} always. Status is part of the response, not a separate query. Consumers see "this listing is from yesterday because the drive is offline" in one read.
  • "Never seen" returns empty listing with last_seen_at: "never". Distinguishable from "seen once a long time ago" — useful for first-run UX.
  • Macros / system junk filtered. .DS_Store, ._* AppleDouble files. These pollute listings with churn that has no semantic meaning.
  • Cache is one file, one JSON blob. No per-backend file, no schema-per-backend complication. The version field exists for future migration but starts as "1.0".

Tradeoffs

StrengthsWeaknesses
Intermittent mounts don't blank out the catalogCache can become stale silently — a deleted file shows in listing until rescan
Mount status travels with every listing — no separate queryWalk cost scales with backend size; large drives need pagination or batched walks
Cache is portable, git-diffable JSONNo mtime/size tracking — just paths; richer cache needs schema bump
One probe, one file, no platform-specific codefs.existsSync lies in edge cases (broken symlinks, permission errors)

Citations

  • lib/content-state/backends.ts — full detectMountStatus() + per-backend cache for Seagate, local-storage, OLD-registry-path, iCloud, Dropbox, cloud-supabase. Cache lives at lib/content-state/data/backend-cache.json.

P09. Resilient Extraction Wrapper with Error-Class-Aware Retry

Problem You wrap a third-party API call (an LLM extractor, a search service, an enrichment provider) that fails for distinct reasons: transient network blips, server-side 5xx, hit-the-rate-limit 429s, and "you've burned your quota" — and the right response is different for each. Naive exponential backoff treats them all the same and either over-retries on permanent failures (wasting budget) or under-retries on transient ones. You want one wrapper that classifies the error and adapts.

The Pattern

type ErrorClass = "transient" | "rate_limit" | "quota_exhausted" | "permanent";

function classify(err: unknown): ErrorClass {
  const e = err as { status?: number; code?: string; message?: string };
  if (e.status === 429) return "rate_limit";
  if (e.status && e.status >= 500) return "transient";
  if (e.code === "ETIMEDOUT" || e.code === "ECONNRESET") return "transient";
  if (typeof e.message === "string" && /quota|billing|exceeded/i.test(e.message)) {
    return "quota_exhausted";
  }
  return "permanent";
}

const delayMs = (attempt: number, base: number, cap: number) =>
  Math.min(cap, base * 2 ** attempt) + Math.floor(Math.random() * 250);

// Process-level quota state — shared across all extractors so one quota signal
// pauses every concurrent extractor, not just the one that saw the error.
let quotaPaused = false;
let consecutiveQuotaErrors = 0;

export interface ExtractionResult<T> { ok: true; value: T } | { ok: false; class: ErrorClass; errors: string[] };

export async function extractWithResilience<T>(
  fn: () => Promise<T>,
  opts: { maxRetries?: number; baseDelayMs?: number; maxDelayMs?: number } = {}
): Promise<ExtractionResult<T>> {
  const maxRetries = opts.maxRetries ?? 10;
  const baseDelayMs = opts.baseDelayMs ?? 3000;
  const maxDelayMs = opts.maxDelayMs ?? 5 * 60_000;
  const errors: string[] = [];

  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    if (quotaPaused) {
      return { ok: false, class: "quota_exhausted", errors: ["paused: prior quota exhaustion"] };
    }
    try {
      const value = await fn();
      // Fail-safe 1: null/undefined doesn't count as success.
      if (value === null || value === undefined) throw new Error("extraction returned null/undefined");
      // Fail-safe 2: empty object doesn't count as success (legitimate empty arrays do).
      if (typeof value === "object" && !Array.isArray(value) && Object.keys(value).length === 0) {
        throw new Error("extraction returned empty object");
      }
      consecutiveQuotaErrors = 0;
      return { ok: true, value };
    } catch (err) {
      const cls = classify(err);
      const msg = (err as Error).message ?? String(err);
      errors.push(`attempt ${attempt + 1} [${cls}]: ${msg}`);

      if (cls === "permanent") return { ok: false, class: "permanent", errors };
      if (cls === "quota_exhausted") {
        consecutiveQuotaErrors++;
        if (consecutiveQuotaErrors >= 3) quotaPaused = true; // trip the global pause
        return { ok: false, class: "quota_exhausted", errors };
      }

      // Transient or rate_limit — backoff and retry. Rate-limit gets a longer base.
      const base = cls === "rate_limit" ? baseDelayMs * 4 : baseDelayMs;
      await new Promise((r) => setTimeout(r, delayMs(attempt, base, maxDelayMs)));
    }
  }
  return { ok: false, class: "transient", errors };
}

Design decisions

  • Classify before deciding. One classifier function maps the error to one of four classes; the loop body branches on the class. No regex-spaghetti in the loop body.
  • Permanent errors fail fast. A 404 won't fix itself with retries; no point burning budget waiting for it.
  • Quota exhaustion is sticky and process-global. One quota signal pauses every concurrent extractor (via a module-level quotaPaused flag). Otherwise concurrent extractors would each rediscover the quota wall and waste a retry round each.
  • Three consecutive quota errors trips the global pause. Single quota error might be transient (rate-limit edge case mis-classified); three in a row is the wall. The consecutiveQuotaErrors counter resets on first success.
  • Empty object ≠ success. A common LLM failure mode is returning {} — semantically empty but technically valid JSON. The fail-safe catches this; empty arrays (legitimately "no items found") still count as success.
  • Rate-limit gets a longer base backoff (4× the transient base). The server is explicitly telling you to slow down; respect it.

Tradeoffs

StrengthsWeaknesses
Quota-aware: one extractor's wall pauses all extractorsProcess-global quota state doesn't survive restart — pause re-evaluates from zero
Permanent failures don't waste retry budgetMisclassification of a transient error as permanent stops the work too early
Empty-object guard catches a real LLM failure modeAdds latency to every successful extraction (the structural check)
Class travels with the failure — consumers can adaptClassifier is heuristic; adding a new provider means adding a new branch

Citations

  • OLD-legacy: packages/research_agent/src/extractors/resilient-extraction-wrapper.tsextractWithResilience() with 10-retry max, quota-error monitor, fail-safes for null/empty extractions.
  • OLD-legacy: packages/research_agent/src/extractors/error-diagnosis.tsdiagnoseError() + calculateRetryDelay() + saveExtractionDiagnostic() for the per-failure diagnostic trail.
  • OLD-legacy: packages/core/src/quota-monitor.ts — process-level isQuotaError / recordQuotaError / shouldPauseProcessing / resetQuotaErrorCount that the wrapper consults.
  • OLD-legacy: packages/core/src/retry.ts — lower-level retry-with-jitter utility for direct OpenAI SDK calls (used by the per-step LLM helpers in P01).

P10. Canonical Anchor Dereference with Result-Typed Errors

Problem A consumer wants to reference a single leaf inside a nested document — "chapter 4 of book X, the third takeaway" — using one opaque string token, and have your system resolve it back to the exact value. The token must round-trip (you can hand it out, the consumer hands it back, you get the value). Bad tokens must fail cleanly with a typed error, not throw — the calling code shouldn't need try/catch to handle the routine "this anchor no longer resolves."

The Pattern

export type AnchorSection = "takeaways" | "key_points" | "stories" | "questions";

export interface AnchorPath {
  documentId: string;
  scope: "root" | { groupId: string };
  section: AnchorSection;
  itemIndex: number;
}

// Format: <documentId>#<scope>.<section>.<itemIndex>
// Examples:
//   how_to_measure_anything#root.takeaways.3
//   how_to_measure_anything#ch4.takeaways.0
export function parseAnchor(anchor: string): AnchorPath | null {
  const [documentId, frag] = anchor.split("#", 2);
  if (!documentId || !frag) return null;
  const parts = frag.split(".");
  if (parts.length !== 3) return null;
  const [scopeStr, section, indexStr] = parts;
  if (!isSection(section)) return null;
  const itemIndex = Number(indexStr);
  if (!Number.isInteger(itemIndex) || itemIndex < 0) return null;
  const scope: AnchorPath["scope"] = scopeStr === "root" ? "root" : { groupId: scopeStr };
  return { documentId, scope, section: section as AnchorSection, itemIndex };
}

function isSection(s: string): boolean {
  return ["takeaways", "key_points", "stories", "questions"].includes(s);
}

interface DocumentShape {
  root?: Partial<Record<AnchorSection, string[]>>;
  groups?: Array<{ id?: string } & Partial<Record<AnchorSection, string[]>>>;
}

export interface DereferencedBlock {
  anchor: string;
  documentId: string;
  scope: string;
  section: AnchorSection;
  itemIndex: number;
  text: string;
}

export type DereferenceResult =
  | { ok: true; block: DereferencedBlock }
  | { ok: false; code: "invalid_anchor"; message: string }
  | { ok: false; code: "not_found"; message: string };

export function dereferenceAnchor(anchor: string, document: unknown): DereferenceResult {
  const parsed = parseAnchor(anchor);
  if (!parsed) return { ok: false, code: "invalid_anchor", message: `Anchor '${anchor}' is not in the expected shape.` };

  const doc = (document ?? {}) as DocumentShape;
  const sectionItems = readSectionItems(parsed, doc);
  if (!sectionItems) return { ok: false, code: "not_found", message: `Anchor '${anchor}' did not resolve.` };
  if (parsed.itemIndex >= sectionItems.length) {
    return { ok: false, code: "not_found", message: `itemIndex ${parsed.itemIndex} out of range; section has ${sectionItems.length} items.` };
  }

  return {
    ok: true,
    block: {
      anchor,
      documentId: parsed.documentId,
      scope: parsed.scope === "root" ? "root" : parsed.scope.groupId,
      section: parsed.section,
      itemIndex: parsed.itemIndex,
      text: sectionItems[parsed.itemIndex],
    },
  };
}

function readSectionItems(p: AnchorPath, doc: DocumentShape): string[] | null {
  if (p.scope === "root") {
    const items = doc.root?.[p.section];
    return Array.isArray(items) ? items : null;
  }
  const group = doc.groups?.find((g) => g.id === p.scope.groupId);
  if (!group) return null;
  const items = group[p.section];
  return Array.isArray(items) ? items : null;
}

Design decisions

  • String-encoded path with explicit grammar. <documentId>#<scope>.<section>.<itemIndex> is greppable, copy-pasteable, log-friendly. Hash sign separates identity from location.
  • Two error codes, three result variants. invalid_anchor means the token is malformed; not_found means the token is well-formed but the underlying data has moved. Consumers can adapt differently — invalid_anchor is probably a bug; not_found is probably a stale citation.
  • Returns a Result envelope, never throws. Dereferencing a missing anchor is a normal state (citations go stale, documents get re-extracted). Throwing would force try/catch at every call site.
  • The parser is reversible. Same AnchorPath always serializes back to the same anchor string. This is what makes citations stable across re-extractions.
  • Section is a closed enum. takeaways | key_points | stories | questions — new sections require an explicit type bump. Prevents accidental support for sections that nothing else in the system understands.
  • Out-of-range index is not_found, not invalid_anchor. The token is structurally valid; the data just doesn't have an item at that position.

Tradeoffs

StrengthsWeaknesses
Citations round-trip stably across consumersDocument re-extraction can renumber items, breaking citations silently
Result envelope rules out an entire class of try/catch noiseAdding a new addressable section is a type change, not a config
One opaque token replaces a path-walking APITokens leak document-structure assumptions to the consumer
Greppable in logs and citation databasesIndex-based addressing is fragile; named addressing (e.g. takeaway-ids) is more robust

Citations

  • lib/v1/canonical-anchors.ts — full dereferenceAnchor() with DereferenceResult discriminated union, scope-vs-chapter handling, out-of-range detection.
  • lib/v1/annotations-store.tsparseAnchor / AnchorPath / AnchorSection definitions consumed by canonical-anchors.ts. Same module also persists annotations keyed by anchor (one annotation per addressable leaf).

P11. REST + MCP Co-Located Shape Helpers

Problem The same capability is exposed as a REST endpoint AND an MCP tool — a browser hits /api/v1/library/items?kind=book&limit=20, an LLM agent calls library_list_items({kind: "book", limit: 20}). Both must return identical payloads (same field names, same density, same sort). Implementing the shaping logic twice means inevitable drift: the REST handler gets a new field, the MCP handler doesn't, and a year later they're producing different objects from the same underlying data.

The Pattern

// shared shape helpers — both REST and MCP import from here.
import type { Item } from "./types";

export type OutputDensity = "rich" | "summary";

export interface ItemSummary {
  id: string;
  title: string;
  status: string;
  updated_at: string;
}

export function projectSummary(item: Item): ItemSummary {
  return {
    id: item.id,
    title: item.title,
    status: item.status,
    updated_at: item.updated_at,
  };
}

export function projectItem(item: Item, density: OutputDensity): Item | ItemSummary {
  return density === "summary" ? projectSummary(item) : item;
}

export interface ListFilter { kind?: string; q?: string; }
export interface ListOptions { limit?: number; offset?: number; density?: OutputDensity; }
export interface ListResult { items: Array<Item | ItemSummary>; total: number; limit: number; offset: number; }

export function listItems(all: Item[], filter: ListFilter, options: ListOptions): ListResult {
  const filtered = all.filter((i) => matches(i, filter));
  filtered.sort((a, b) => (a.updated_at < b.updated_at ? 1 : -1));
  const limit = clamp(options.limit ?? 50, 1, 500);
  const offset = Math.max(0, options.offset ?? 0);
  const density = options.density ?? "rich";
  return {
    items: filtered.slice(offset, offset + limit).map((i) => projectItem(i, density)),
    total: filtered.length,
    limit,
    offset,
  };
}

function matches(item: Item, f: ListFilter): boolean {
  if (f.kind && item.kind !== f.kind) return false;
  if (f.q && !item.title.toLowerCase().includes(f.q.toLowerCase())) return false;
  return true;
}

function clamp(n: number, lo: number, hi: number): number { return Math.max(lo, Math.min(hi, n)); }
// rest-handler.ts — thin parser around the shared shape helper.
import { listItems, type OutputDensity } from "./shape";
export async function GET(req: Request): Promise<Response> {
  const url = new URL(req.url);
  const filter = { kind: url.searchParams.get("kind") ?? undefined, q: url.searchParams.get("q") ?? undefined };
  const options = {
    limit: Number(url.searchParams.get("limit")) || undefined,
    offset: Number(url.searchParams.get("offset")) || 0,
    density: (url.searchParams.get("output_density") as OutputDensity) || "rich",
  };
  const result = listItems(loadAll(), filter, options);
  return Response.json(result);
}
// mcp-handler.ts — thin parser around the same helper.
import { listItems, type OutputDensity } from "./shape";
export const tool = {
  name: "list_items",
  inputSchema: { /* JSON Schema */ },
  handler: async (input: { kind?: string; q?: string; limit?: number; offset?: number; output_density?: OutputDensity }) => {
    const result = listItems(loadAll(), { kind: input.kind, q: input.q }, {
      limit: input.limit, offset: input.offset, density: input.output_density ?? "rich",
    });
    return { content: [{ type: "text", text: JSON.stringify(result) }] };
  },
};

Design decisions

  • Shape helper is the contract. REST and MCP are thin adapters — they parse their transport-specific input, hand it to the shape helper, and return the shape helper's output. Behavior changes happen in one file.
  • Density flag travels through the helper, not around it. Both surfaces accept output_density (REST as a query param, MCP as a tool input field). The helper decides what rich vs summary actually contains; the adapters don't.
  • Filters and options have explicit types. ListFilter and ListOptions are exported; the test suite tests the helper alone. REST and MCP adapter tests verify only the parsing layer.
  • Clamping in the helper, not the adapters. Limits, offsets, density defaults all live in the helper. An adapter that parses limit=99999 doesn't need to know the cap — the helper enforces it.
  • No code generation. The helper is hand-written TypeScript that both surfaces import. Codegen-from-OpenAPI is heavier and brittler for the small set of endpoints; manual co-location is enough discipline.

Tradeoffs

StrengthsWeaknesses
REST and MCP drift becomes a TypeScript error, not a runtime divergenceAdapter-specific behavior (e.g., HTTP cache headers) has to be expressed outside the helper
Tests target the helper, not the adaptersIf REST and MCP eventually need different shapes, the abstraction has to break
New surfaces (CLI, gRPC) plug in by writing one new adapterHelper module must stay serializable — no class returns, no functions in the output
Density flag is a clean opt-in, not a separate endpointThree densities (rich/summary/state) starts to clutter the projection module

Citations

  • lib/v1/library-routes-helpers.tsprojectSummary / projectState / projectItem density projections shared between /api/v1/library/items (REST) and the MCP library_* tools.
  • lib/v1/passages-search-helpers.tsparseSearchRequest + loadChunkIndex + scoring shared between /api/v1/passages/search and MCP passages_search.
  • docs/CONTRACT-CHANGELOG.md (v1.2.0 entry) — the explicit rule "REST and MCP must serve identical payloads" that motivated the co-location pattern.
  • See also: DevPlane P14 (Parallel API Surfaces over a Shared Core) for the generalized form spanning REST + MCP + CLI.

P12. Vendored Consumer Contract with Explicit Semver Ratification

Problem A capability you maintain (a typed schema, a request/response surface, an SDK contract) is consumed by several downstream apps in your portfolio. You don't want to publish to npm — the producer and consumers are in different repos but one developer's filesystem, the iteration is fast, and a npm publish-cycle would slow everything. You want consumers to vendor (copy) the contract verbatim and re-vendor on each bump, with explicit ratification of every breaking decision so the diff is tiny and reviewable.

The Pattern

/**
 * <capability> — Canonical Zod contract (v1.0.0)
 * ===================================================
 *
 * This file is the **canonical home** of the contract surface. It is NOT
 * vendored from anywhere — consumers vendor FROM here.
 *
 * Consumer vendoring pattern: copy this file verbatim into a consumer repo
 * at a stable location (e.g. `src/lib/<capability>/contract.ts`), pin the
 * `CONTRACT_VERSION` constant, and add a header comment of the form:
 *
 *   Vendored from <producer-repo> @ <SHA>
 *   packages/<capability>/contracts/types.ts
 *
 * On contract bumps, re-vendor verbatim and bump the consumer's recorded
 * `CONTRACT_VERSION` to match. See ./README.md for the explicit decisions
 * encoded in this surface and the semver policy.
 */

import { z } from "zod";

export const CONTRACT_VERSION = "1.0.0" as const;

export const ItemSchema = z.object({
  id: z.string().min(1),
  title: z.string().min(1),
  description: z.string().optional(),
  // ...
});
export type Item = z.infer<typeof ItemSchema>;

// Endpoint registry — consumers wire their HTTP adapter from this map.
export const DATA_CONTRACTS = {
  list_items: { method: "GET", path: "/v1/items", response: z.array(ItemSchema) },
  get_item:   { method: "GET", path: "/v1/items/:id", response: ItemSchema },
} as const;

And the companion README documents what was decided and why:

# `<capability>` contracts — canonical Zod surface

**Status:** v1.0.0 — first stable contract surface.
**Canonical home:** this directory. Consumers vendor FROM here.

## Decisions encoded in this surface

These are called out so re-vendor reviewers know what changed.

### 1. Casing: camelCase (not snake_case)
The donor schema used snake_case. v1.0.0 ratifies camelCase as canonical.
Why: TS dominance in the consumer set; eliminates per-consumer transforms.

### 2. Field name: `brief` (not `brief_statement`)
Shorter, less verbose, same semantics.

### 3. `metadata.source` enum has FOUR values
The donor had three; we add `ai_drafted` because LLM-authoring is real.
Future: enum is additive-only across minor bumps; removal = major bump.

### 4. Version: 1.0.0 (first stable, not pre-release)
Earlier consumer integrations pinned 0.1.0 as bootstrap. v1.0.0 declares
this surface partner-ready.

## Semver policy

| Bump | Triggers |
|---|---|
| Patch (1.0.x) | Doc-only; defaults; non-rejecting refinements. |
| Minor (1.x.0) | Additive optional fields. Additive enum values. New endpoints. |
| Major (x.0.0) | Field rename. Field removal. Type/required change. Enum reordering. |

Consumers should re-vendor on every minor; majors trigger explicit
coordination filed as cards in each downstream queue.

Design decisions

  • Vendoring, not npm. The contract is one file. Publishing to npm adds a registry round-trip, a version-resolution layer, and CI dependencies that don't pay back for the small consumer set. Vendoring with explicit CONTRACT_VERSION constant + SHA header gives precise provenance without infrastructure.
  • Producer-side documents the ratification decisions. When a downstream pre-existed the canonical contract and introduced its own conventions, the canonical surface ratifies (or rejects) each one explicitly. Re-vendor reviewers don't need to re-litigate.
  • Semver bumps have explicit triggers in the README. "Renaming a field is major" is not opinion; it's the policy. Removes argument time during PRs.
  • The contract file is types-only. No business logic, no helpers, no class instances. Vendoring stays clean; the consumer's bundle doesn't accidentally inherit producer-side implementation.
  • The endpoint registry (DATA_CONTRACTS) is part of the contract. Consumers wire their HTTP adapters from this map; a path change in the producer is a type-level visible change in every consumer.

Tradeoffs

StrengthsWeaknesses
Bootstrap cost is one copy — no npm registry, no package publishNo transitive dependency tracking — consumers must self-update on bumps
Re-vendor diff is verbatim — reviewers can diff -u to verifyDrift possible if consumers patch the vendored copy locally
Ratification doc captures hard-won decisions inlineDocumentation work scales with consumer count
Contract changes surface as type errors in every consumer's typecheck after re-vendorConsumers can lag a version with no enforcement (only social)

Citations

  • packages/job-family-agent/contracts/types.ts — full canonical Zod surface for job-family-agent v1.0.0 with JOB_FAMILY_CONTRACT_VERSION constant.
  • packages/job-family-agent/contracts/README.md — ratification doc covering the four decisions (casing, brief, enum, version) carried forward from the Performix bootstrap vendor.
  • docs/AGENT-ASSIGNMENTS.md MF-063 — the assignment card that produced the contract and the explicit policy "consumers vendor FROM here; nobody vendors INTO here."

P13. Multi-Candidate Consolidation Pipeline (Score → Consolidate → Fallback)

Problem You run an extraction task that's hard for any single LLM call to nail — the document is long, the structure is irregular, the model hallucinates. Instead of one extraction, you fire N parallel candidate extractions with different prompts / models / temperatures, then need to produce one consolidated artifact. You want: a scoring pass that rates each candidate against the source, a merge pass that combines them, and a robust fallback when the merger itself fails.

The Pattern

import OpenAI from "openai";

export interface Candidate<T> { id: string; output: T; }
export interface CandidateScore { id: string; faithfulness: number; completeness: number; clarity: number; comment: string; }

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

// PHASE 1 — score each candidate against the source.
async function scoreOne<T>(source: string, candidate: Candidate<T>, promptTemplate: string): Promise<CandidateScore> {
  const userPrompt = `${promptTemplate}\n\nSOURCE:\n${source}\n\nCANDIDATE:\n${JSON.stringify(candidate.output)}\n\nReturn JSON with faithfulness, completeness, clarity (1-10 each), and a comment.`;
  try {
    const completion = await openai.chat.completions.create({
      model: "gpt-4o",
      messages: [
        { role: "system", content: "You are an evaluator. Return ONLY a JSON object." },
        { role: "user", content: userPrompt },
      ],
      temperature: 0,
    });
    const text = completion.choices[0]?.message?.content ?? "";
    const json = stripMarkdownFences(text);
    const parsed = JSON.parse(json);
    return {
      id: candidate.id,
      faithfulness: clamp(parsed.faithfulness, 1, 10),
      completeness: clamp(parsed.completeness, 1, 10),
      clarity: clamp(parsed.clarity, 1, 10),
      comment: parsed.comment ?? "no comment",
    };
  } catch (err) {
    return { id: candidate.id, faithfulness: 1, completeness: 1, clarity: 1, comment: `scoring failed: ${(err as Error).message}` };
  }
}

export async function scoreCandidates<T>(source: string, candidates: Candidate<T>[], promptTemplate: string): Promise<CandidateScore[]> {
  // Sequential to avoid rate limits.
  const out: CandidateScore[] = [];
  for (const c of candidates) out.push(await scoreOne(source, c, promptTemplate));
  return out;
}

// PHASE 2 — consolidate the candidates (LLM-driven merge with fallback).
export async function consolidateSection<T>(source: string, sectionName: string, candidates: Candidate<T>[], promptTemplate: string): Promise<T | null> {
  const userPrompt = `${promptTemplate}\n\nSOURCE (truncated):\n${source.slice(0, 50000)}\n\nSECTION: ${sectionName}\n\nCANDIDATES:\n${JSON.stringify(candidates, null, 2)}\n\nReturn the consolidated section as JSON only.`;
  try {
    const completion = await openai.chat.completions.create({
      model: "gpt-4o",
      messages: [
        { role: "system", content: "You are a consolidation agent. Return ONLY JSON." },
        { role: "user", content: userPrompt },
      ],
      temperature: 0,
    });
    const text = completion.choices[0]?.message?.content ?? "";
    const json = stripMarkdownFences(text);
    return JSON.parse(json) as T;
  } catch (err) {
    // Fallback: return the FIRST candidate's section. Never crash the pipeline.
    console.error(`consolidation failed for ${sectionName}: ${(err as Error).message}`);
    return candidates[0]?.output ?? null;
  }
}

function stripMarkdownFences(text: string): string {
  let s = text.trim();
  if (s.startsWith("```")) {
    const lines = s.split("\n");
    s = lines.slice(1, -1).join("\n").trim();
  }
  return s;
}

function clamp(n: number, lo: number, hi: number): number {
  return Math.max(lo, Math.min(hi, Number(n) || lo));
}

Design decisions

  • Score is per-candidate, three axes, 1–10. Faithfulness (matches the source), completeness (covers what should be covered), clarity (readable). Three is enough to differentiate; more axes invites prompt drift.
  • Sequential scoring, not parallel. Rate limits matter more than wall-clock latency on these long-document calls. A 20-second penalty per call is preferable to half the calls failing on a 429.
  • Consolidation is per-section, not whole-document. Sections are independent — merging the "key takeaways" section doesn't help or hurt the "tools and frameworks" section. Per-section means a section-level failure doesn't poison the whole consolidation.
  • Markdown fence-stripping is mandatory. Every LLM provider occasionally wraps "pure JSON" output in ```json ... ``` despite explicit instructions. Strip and re-try parse; failing that, fall back.
  • Score-failure returns lowest score with comment. Better to mark a candidate as "we couldn't score this" than to omit it from the rank. The consolidation can decide whether to drop low-scored or "scoring failed" candidates.
  • Consolidation-failure falls back to first candidate's section. Never let the pipeline crash on a merger hiccup — the first candidate is at least real extracted content, not nothing.
  • Temperature 0 for both scoring and consolidating. Determinism beats creativity at this layer.

Tradeoffs

StrengthsWeaknesses
Per-section consolidation isolates failuresN calls per section per source — expensive
Fallback path means the pipeline never crashes on merger errors"Fallback fired" can mask quality regressions if not surfaced
Score breakdown supports per-section quality tracking over timeScoring is itself an LLM call — the judges aren't independent of the candidates' models
Markdown-fence stripping defends against the most common LLM output bugDoesn't catch all output-format mistakes (e.g., extra prose before/after JSON)

Citations

  • OLD-legacy: packages/referee/src/score.tsscoreCandidatesAgainstBook() with the three-axis 1–10 rubric, fence-stripping, per-candidate failure fallback.
  • OLD-legacy: packages/referee/src/consolidate.tsconsolidateSummaries() with per-section LLM consolidation and first-candidate fallback on merger failure.
  • OLD-legacy: packages/referee/src/refine.ts — secondary refinement pass that applies the same shape to a single best-pick candidate when consolidation is overkill.
  • OLD-legacy: packages/research_agent/src/quality-control.ts — same shape applied to research-paper extractions.

P14. Cache-on-mtime Read-Through Snapshot

Problem You serve a frequently-read JSON snapshot (a content-state report, a registry index, a precomputed view) from a file that regenerates periodically — every minute, every hour, every deploy. Reading it on every request is wasteful (parse cost adds up); caching it forever stales when the regenerator writes a new version. You want a cache that's automatically invalidated when the underlying file changes, with no manual cache-busting and no separate refresh API.

The Pattern

import "server-only";
import * as fs from "node:fs";
import * as path from "node:path";

export interface Snapshot {
  version: string;
  generated_at: string;
  items: unknown[];
}

let cached: { mtimeMs: number; snapshot: Snapshot } | null = null;

function snapshotPath(): string {
  return path.resolve(process.cwd(), "data", "snapshot.json");
}

export function loadSnapshot(): Snapshot | null {
  const p = snapshotPath();
  let stat: fs.Stats;
  try {
    stat = fs.statSync(p);
  } catch {
    return null; // file missing — consumer treats as "no snapshot yet"
  }

  // mtime-keyed cache: if the regenerator wrote a new version, mtimeMs changes,
  // we drop the cached value and re-parse. No explicit invalidation needed.
  if (cached && cached.mtimeMs === stat.mtimeMs) {
    return cached.snapshot;
  }

  try {
    const raw = fs.readFileSync(p, "utf-8");
    const parsed = JSON.parse(raw) as Snapshot;
    cached = { mtimeMs: stat.mtimeMs, snapshot: parsed };
    return parsed;
  } catch {
    // parse error: don't blow up; return null so consumer can serve "no data" gracefully
    return null;
  }
}

// Optional test hook for forcing re-load (e.g., between integration tests).
export function resetSnapshotCache(): void {
  cached = null;
}

Design decisions

  • mtimeMs is the cache key. A regenerator that writes the snapshot updates the file's mtime; the next read sees the changed mtime, drops the cached parsed value, re-parses. No watchers, no inotify, no signals.
  • fs.statSync is cheap. Stat is a metadata-only call — much cheaper than reading the file. The cache pays for itself the moment the snapshot is ≥10× the size of a stat call (i.e., always).
  • Missing file returns null, doesn't throw. The snapshot may not have been generated yet (fresh deploy, never-run regenerator). Consumers see null and serve the "no data" UI; they're not interrupted by an unhandled exception.
  • Parse errors return null too. A half-written snapshot from a racy regenerator surfaces as null instead of a thrown SyntaxError. The regenerator should write atomically (P02-style temp+rename); if it doesn't, this defends.
  • Module-level cache, no class. One cache per process. Multiple processes (multiple Vercel function instances) each maintain their own cache; that's the right granularity since each is independent.
  • server-only import. Marks the module as forbidden in client bundles — prevents an accidental client-side import that would attempt fs.statSync in the browser at build time.

Tradeoffs

StrengthsWeaknesses
No explicit invalidation — regenerator just writes the fileCache lives in process memory; multi-instance deployments don't share
Cheap probe (statSync) on every readParse cost paid only once per mtime change
Tolerates missing + malformed snapshots gracefullyA constantly-rewritten file defeats the cache — high-frequency regeneration removes the benefit
One function, no setupTest isolation requires the resetSnapshotCache() hook

Citations

  • lib/content-state/snapshot.tsloadContentStateSnapshot() with mtime-keyed cache, missing-file → null, parse-error → null. Backs the /api/v1/state/{health,items} routes and the Console's Integrity Dashboard loaders.

This catalog is the production-validated record of engineering shapes that have hardened in Meta Factory. The OLD-vs-PROD split (P03) is the load-bearing decision; the rest are local consequences of producing one durable artifact tree (P02) that a thin host can serve cheaply via shared shape helpers (P11) over typed contracts (P12).