peopleanalyst

parts / principia

Principia — reusable patterns

A source-graded registry of organizational science exposed over a versioned REST + MCP contract. Pattern catalog is scaffolded upstream and will populate as the engagement-family ingest pipeline matures.

14 patterns·source: people-analyst/principia/docs/REUSABLE_PATTERNS.md

Principia — Reusable Engineering Patterns

Production-validated patterns from the Principia codebase, stripped of business context, written to be dropped into any new system.

Each pattern is copy-paste-ready TypeScript. Domain-specific terminology has been removed. Citations at the bottom of each entry point to the concrete implementation in this repo.


Pattern Index

#Pattern NameCore TechnologyProblem Solved
P01Single-Table JSONB-per-Row Registry with JSON Bootstrap MigrationPostgres JSONB + bundled JSONPersist a heterogeneous, schema-evolving entity catalog without per-table DDL and migrate from a file-backed predecessor in one shot
P02Provenance-Merging Upsert WrapperPlain object map + JSONUpsert records that accumulate "where did this come from" trails across many ingest passes without losing earlier sources
P03Pluggable Persistence Behind a Thin Async InterfaceTypeScript interface + factorySwap a JSON-file store for a database without rewriting ingest, resolve, MCP, and REST layers
P04Deterministic Idempotent IDs from Content FingerprintsFNV-1a / SHA-256 hashFile the same logical work twice and get one row, not two — without needing a lookup-then-insert round-trip
P05Multi-Vendor LLM Consensus with Vote Adjudicationfetch + parallel Promise.allGet a defensible "verified" verdict on extracted claims without trusting a single model
P06Vendored-Local Type Mirrors Awaiting Upstream Package PublishTypeScript structural typesShip code that depends on an unpublished package version by mirroring the not-yet-released shape locally
P07Idempotent Background Job Queue with Exponential-Backoff RetryRegistryStore + Map<type, handler>Drive long-running enrichment work that survives crashes, retries on transient failure, and dedupes concurrent enqueues
P08License-Gated Field Stripping at the Response BoundaryAllowlist set + handler-side stripReturn matching records to callers without leaking text whose source license forbids republication
P09Cron-Driven Watchlist Scheduler with Cadence-Based Next-RunRegistryStore + interval mapRun persistent searches against external sources on a per-monitor cadence without a job-scheduler dependency
P10Read-Only Gap Detector with Stable Gap IDsPure function over store snapshotSurface "what's missing" from a registry as actionable records without ever mutating the registry the curator is auditing
P11Process-Singleton Resource with Resolution-Order FallbackglobalThis cache + env-var precedenceBuild a heavy resource (DB pool, ingest pipeline) once per process, with a deterministic resolution order across dev/preview/prod
P12Redis-Primary, In-Memory-Fallback Rate LimiterUpstash REST + per-instance sliding windowEnforce global rate limits in production while never failing local dev or a Redis blip
P13Curator-Mediated Promotion (Read-Only Detection, Approval-Only Mutation)Job handlers + explicit policy boundaryBuild an extraction / verification pipeline that proposes changes but never auto-promotes — the curator stays in the loop
P14Opaque Base64 Cursor with Strict Decoderbase64url + JSON envelopePaginate REST list endpoints in a way that lets you swap offset-based for keyset later without breaking clients

Patterns

P01. Single-Table JSONB-per-Row Registry with JSON Bootstrap Migration

Problem You're building a registry of heterogeneous entity types (twenty kinds today, more next quarter). Each entity has a stable id + a payload whose shape will keep evolving. A column-per-field schema means a migration on every new entity type; a column-per-entity-type means twenty tables with near-identical CRUD. You also have an existing JSON-file store with real production data that needs to migrate into the database on first deploy without a separate migration script.

The Pattern

import postgres, { type Sql } from "postgres";
import * as fs from "node:fs";
import * as path from "node:path";

type EntityType = string;

interface Row<T> {
  id: string;
  type: EntityType;
  entity: T;
  provenance: { source: string; at: string }[];
  created_at: string;
  updated_at: string;
}

export class JsonbRegistry {
  private readonly sql: Sql;
  private readonly table = "registry_rows";
  private readonly bundledRoot?: string;
  private inited = false;

  constructor(connectionString: string, opts: { bundledRoot?: string } = {}) {
    // `prepare: false` is safer behind PgBouncer transaction-pooling.
    this.sql = postgres(connectionString, { prepare: false });
    this.bundledRoot = opts.bundledRoot;
  }

  async init(): Promise<void> {
    if (this.inited) return;
    await this.sql.unsafe(`
      CREATE TABLE IF NOT EXISTS ${this.table} (
        type        text NOT NULL,
        id          text NOT NULL,
        entity      jsonb NOT NULL,
        provenance  jsonb NOT NULL DEFAULT '[]'::jsonb,
        created_at  timestamptz NOT NULL DEFAULT now(),
        updated_at  timestamptz NOT NULL DEFAULT now(),
        PRIMARY KEY (type, id)
      );
    `);
    await this.sql.unsafe(
      `CREATE INDEX IF NOT EXISTS ${this.table}_type_idx ON ${this.table} (type);`,
    );
    // One-shot bootstrap migration: if the table is empty AND a bundled
    // JSON corpus exists, ingest it. Subsequent boots see populated table
    // and skip — so this is safe on every redeploy.
    if (this.bundledRoot && (await this.isEmpty())) {
      await this.migrateBundled(this.bundledRoot);
    }
    this.inited = true;
  }

  private async isEmpty(): Promise<boolean> {
    const [{ count }] = await this
      .sql<{ count: number }[]>`SELECT COUNT(*)::int AS count FROM ${this.sql(this.table)}`;
    return count === 0;
  }

  private async migrateBundled(root: string): Promise<void> {
    if (!fs.existsSync(root)) return;
    for (const file of fs.readdirSync(root).filter((f) => f.endsWith(".json"))) {
      const parsed = JSON.parse(fs.readFileSync(path.join(root, file), "utf-8"));
      for (const r of parsed.rows ?? []) {
        await this.sql`
          INSERT INTO ${this.sql(this.table)}
            (type, id, entity, provenance, created_at, updated_at)
          VALUES (${parsed.type}, ${r.id},
                  ${this.sql.json(r.entity)},
                  ${this.sql.json(r.provenance ?? [])},
                  ${r.created_at}, ${r.updated_at})
          ON CONFLICT (type, id) DO NOTHING
        `;
      }
    }
  }

  async get<T>(type: EntityType, id: string): Promise<Row<T> | null> {
    await this.init();
    const rows = await this.sql<Row<T>[]>`
      SELECT type, id, entity, provenance, created_at, updated_at
      FROM ${this.sql(this.table)} WHERE type = ${type} AND id = ${id} LIMIT 1
    `;
    return rows[0] ?? null;
  }
}

Design decisions

  • One table, not one per type. A new entity type lands without a migration. The cost is no per-type column constraints — moved to application-layer Zod schemas.
  • JSONB, not JSON. Indexable; GIN-index-eligible later when you find the queries you actually run; preserves key order on read.
  • Composite primary key (type, id). Lets the same id namespace across types (a citation and a construct can both be "q12" without collision) and is the natural shard key if you ever scale out.
  • Bundled-JSON bootstrap is a one-shot. The "empty table → ingest bundled corpus" check runs every boot but only acts when truly empty. Means redeploys are no-ops; first-ever deploy seeds itself.
  • prepare: false for the connection. Required behind PgBouncer transaction-pooling (Neon's pooled endpoint, Supabase's pool). Prepared statements don't survive the transaction boundary there.
  • Findability tradeoff accepted. findBy(predicate) scans + filters in JS; not pushing predicates to SQL. Fine at registry scale (< 1M rows); revisit when one type crosses 100k.

Tradeoffs

StrengthsWeaknesses
New entity types ship without DDL changesNo per-field type checking at the DB layer; relies on Zod / handler discipline
Bootstrap migration is automatic — no separate migration runnerCan't run SQL aggregations over entity fields without ->>/-> casting
Same code path works against any Postgres (Neon, Supabase, self-hosted)Per-row JSONB rewrite on every update is more expensive than column-targeted UPDATE
Composite PK gives natural multi-tenancy via the type axisCross-entity-type joins require expanding into application code

Citations

  • packages/registry/src/store/postgres-store.ts — full implementation, including init() + maybeMigrateBundled() + put with provenance merging.
  • packages/registry/src/store/types.tsRegistryEntityType union (20+ entity types share the one table).

P02. Provenance-Merging Upsert Wrapper

Problem A record may be ingested from many different upstream sources over its lifetime — a citation discovered first via a meta-analysis, then re-encountered via a watchlist hit, then re-validated via a third-party API. You want every upsert to append to a provenance trail (which source files contributed, when), not overwrite the trail. The naive "last write wins" loses the audit history that makes the registry defensible.

The Pattern

interface Provenance {
  source_path: string;
  ingested_at: string;
  note?: string;
}

interface Row<T> {
  id: string;
  type: string;
  entity: T;
  provenance: Provenance[];
  created_at: string;
  updated_at: string;
}

// The merge: dedupe on `source_path`, keep the earliest existing entry.
function mergeProvenance(existing: Provenance[], incoming: Provenance[]): Provenance[] {
  const out = new Map<string, Provenance>();
  for (const p of existing) out.set(p.source_path, p);
  for (const p of incoming) {
    if (!out.has(p.source_path)) out.set(p.source_path, p);
  }
  return [...out.values()];
}

export async function upsert<T>(
  store: Map<string, Row<T>>,
  row: Row<T>,
): Promise<Row<T>> {
  const key = `${row.type}:${row.id}`;
  const now = new Date().toISOString();
  const existing = store.get(key);

  if (existing) {
    const merged: Row<T> = {
      ...row,
      provenance: mergeProvenance(existing.provenance, row.provenance),
      created_at: existing.created_at, // never moves
      updated_at: now,
    };
    store.set(key, merged);
    return merged;
  }

  const created: Row<T> = {
    ...row,
    created_at: row.created_at ?? now,
    updated_at: now,
  };
  store.set(key, created);
  return created;
}

Design decisions

  • Provenance is a first-class field, not metadata-tucked-into-entity. Means consumers can audit "where did this row come from" without parsing per-entity-type internals.
  • Dedupe key is source_path. Same source ingesting the same row twice doesn't bloat the trail. Different source paths (even pointing to the same paper) preserve both — that's the audit signal.
  • Earliest wins on dedupe. First-seen-at is more valuable than last-seen-at for provenance — "we've known this since T" is the discovery question.
  • created_at never moves on upsert. Once-and-only-once. updated_at carries the "last touched" semantic.
  • Same merge function in JSON store and Postgres store. Identical semantics across backends so consumers can't observe a difference.

Tradeoffs

StrengthsWeaknesses
Full ingest history preserved across all passesProvenance arrays grow unbounded — needs periodic compaction at scale
Audit "why is this row here" trivially answerableEvery upsert pays the merge cost (linear in provenance length)
Same row can carry contributions from many domainsNo structured query over provenance — it's an audit log, not an index
Mergeable across JSON ↔ DB backends without translationAdding a new provenance field is a soft schema change with no enforcement

Citations

  • packages/registry/src/store/json-store.tsmergeProvenance() + put() flow.
  • packages/registry/src/store/postgres-store.ts — same merge inside SELECT FOR UPDATE transaction for concurrent-safety.

P03. Pluggable Persistence Behind a Thin Async Interface

Problem You start with a JSON-file store (zero dependencies, easy local dev), but you know Postgres is coming. If every ingest adapter, resolver, MCP handler, and REST route imports the JSON store directly, the eventual swap becomes a fan-out refactor across the whole codebase. You want the seam now, even though you have only one implementation.

The Pattern

export type EntityType = string;

export interface Row<T> {
  id: string;
  type: EntityType;
  entity: T;
  provenance: { source_path: string; ingested_at: string }[];
  created_at: string;
  updated_at: string;
}

// The seam. Every consumer depends on THIS, not on a concrete class.
export interface Store {
  get<T>(type: EntityType, id: string): Promise<Row<T> | null>;
  put<T>(row: Row<T>): Promise<Row<T>>;
  iterate<T>(type: EntityType): Promise<Row<T>[]>;
  count(type: EntityType): Promise<number>;
  delete(type: EntityType, id: string): Promise<boolean>;
  findBy<T>(type: EntityType, p: (r: Row<T>) => boolean): Promise<Row<T>[]>;
}

// All methods async, even when the JSON impl is sync. That's the discipline:
// callers can't accidentally tie themselves to sync semantics.
export class JsonStore implements Store {
  // ... fs.readFileSync wrapped in async wrappers
}

export class PostgresStore implements Store {
  // ... real async I/O
}

// Resolution at process boot.
export async function buildStore(env: NodeJS.ProcessEnv): Promise<Store> {
  if (env.DATABASE_URL) return new PostgresStore(env.DATABASE_URL);
  if (env.STORE_ROOT) return new JsonStore(env.STORE_ROOT);
  return new JsonStore(path.join(os.tmpdir(), "store"));
}

Design decisions

  • Every method returns a Promise, even when one implementation can answer synchronously. This forces callers to write code that survives the eventual swap.
  • findBy(predicate) is the escape hatch. A real indexed-query interface is hard to express generically across JSON and SQL backends. Predicate-over-iterate works against both; you accept the scan cost until you have evidence a real index helps.
  • Implementations are interchangeable at the constructor level. No factory framework — just new JsonStore(root) or new PostgresStore(url) and pass either one to consumers.
  • Resolution function is the only place that knows about both. Every other file imports the Store interface. The factory is the single bottleneck for "which backend am I on" decisions.

Tradeoffs

StrengthsWeaknesses
Adding a new backend (SQLite, DuckDB, Cloudflare D1) is a new class — no consumer changesAsync-everywhere has a small ergonomic cost for sync-backed implementations
Tests use the JSON store; production uses Postgres; same code paths exercisedPredicate-based findBy punts the indexing question until later
Consumers can't accidentally depend on backend-specific behaviorImplementations must agree on subtle semantics (timestamp format, merge rules) by convention not by interface
Production swap is one-line in the factory, not a fan-out refactorSome power moves (transactions, CTEs) don't survive the abstraction

Citations

  • packages/registry/src/store/types.tsStore (called RegistryStore) interface definition.
  • packages/registry/src/store/json-store.ts + packages/registry/src/store/postgres-store.ts — two implementations.
  • apps/web/lib/store.tsgetStore() factory with DATABASE_URL-first resolution.

P04. Deterministic Idempotent IDs from Content Fingerprints

Problem You want to file work into a queue (a verification job, an acquisition request, a research task) from many call sites — schedulers, REST handlers, manual scripts — and you want the same logical work to collapse to one row regardless of how often it's filed. The naive approach (random UUID + duplicate check) needs a query round-trip and a lookup index; you want enqueue to be one writer-only call.

The Pattern

import * as crypto from "node:crypto";

// FNV-1a 32-bit — fast, deterministic, ~no collisions at curator scale.
function hash32(s: string): string {
  let h = 0x811c9dc5;
  for (let i = 0; i < s.length; i++) {
    h ^= s.charCodeAt(i);
    h = Math.imul(h, 0x01000193);
  }
  return (h >>> 0).toString(16).padStart(8, "0");
}

// Strip everything that isn't load-bearing for identity.
function normalize(s: string): string {
  return s.toLowerCase().replace(/[^a-z0-9]+/g, "").trim();
}

interface JobInput {
  job_type: string;
  subject_id: string;
}

// Derive a deterministic id from the inputs that define identity.
export function deriveJobId(input: JobInput): string {
  return `job.${input.job_type}.${input.subject_id}`;
}

// Composite shape — when there's no single canonical key.
interface RequestInput {
  doi?: string;
  title: string;
  first_author: string;
  year?: number;
}

export function deriveRequestId(req: RequestInput): string {
  if (req.doi) return `req:doi:${req.doi.toLowerCase()}`;
  const composite = [
    normalize(req.title),
    normalize(req.first_author),
    req.year ?? "noyr",
  ].join("|");
  // Truncated human-readable prefix + hash disambiguator.
  const prefix = normalize(req.title).slice(0, 24) || "untitled";
  return `req:${prefix}:${req.year ?? "noyr"}:${hash32(composite)}`;
}

// SHA-256 for content-shaped ids (more deterministic across runtimes than FNV).
export function deriveContentId(prefix: string, content: string): string {
  const h = crypto.createHash("sha256").update(content).digest("hex").slice(0, 12);
  return `${prefix}_${h}`;
}

Design decisions

  • DOI / canonical key wins when present. No need to fingerprint; the source already published one. The id-derivation function is a switch: external canonical id first, hash-based fallback second.
  • Normalization before hashing. Strip case, punctuation, whitespace variance. Two paths to the same logical entity must produce the same hash — otherwise you've made dedup a probabilistic feature.
  • Human-readable prefix on hash-based ids. acq:engagementatwork:2019:8a3f9b1c is debuggable in raw-JSON tail-files; acq:8a3f9b1c is not.
  • Hash family by use case. FNV-1a is fast and good enough for collision-resistance at curator scale (<1M rows). SHA-256 (truncated to 12 hex chars) when the id might cross runtimes or you need a stronger guarantee.
  • Enqueue becomes UPSERT, not check-then-insert. With deterministic ids, the database's ON CONFLICT DO NOTHING is the dedup. No race; no lookup round-trip.

Tradeoffs

StrengthsWeaknesses
Idempotency is a property of the id derivation, not the call pathBad normalization rule means false collisions you can't easily unwind
Concurrent enqueues from N callers produce 1 row, deterministicallyRe-deriving an id after a normalization change orphans the old rows
Ids are debuggable when read by handHash prefixes leak source-field hints to anyone reading raw data
No "duplicate check before insert" code anywhereSchema changes that should shift identity (e.g., versioning a job type) require explicit migration

Citations

  • packages/registry/src/loop/enrichment-queue.tsderiveJobId (simple concatenation, reliable for typed subjects).
  • packages/registry/src/loop/acquisition-queue.tsderiveAcquisitionRequestId (composite hash with human-readable prefix).
  • packages/registry/src/resolve/canonical-variable-resolver.tsmintVariableId (FNV-1a over normalized name).
  • packages/registry/src/loop/canonical-survey-item-dedup.tscanonicalItemId (SHA-256 prefix).

P05. Multi-Vendor LLM Consensus with Vote Adjudication

Problem You need to extract structured claims from unstructured text (a paper, a webpage, a transcript) and downstream consumers will act on those claims. A single LLM call has a non-trivial hallucination rate — too risky for "verified" output. You want a verdict whose confidence comes from independent agreement across vendors, not from one model's self-reported confidence.

The Pattern

interface Env {
  OPENAI_API_KEY?: string;
  ANTHROPIC_API_KEY?: string;
  GEMINI_API_KEY?: string;
  BUDGET_USD?: string;
}

interface VendorResponse {
  provider: string;
  doi?: string | null;
  title?: string | null;
  authors?: string[] | null;
  cost_usd: number;
  error?: string;
}

type Verdict =
  | "verified_strong_consensus"   // ≥2 agree on canonical id
  | "verified_metadata_consensus"  // ≥2 agree on title + authors
  | "disagreement"
  | "insufficient_providers"
  | "error";

async function callOpenAI(prompt: string, key: string): Promise<VendorResponse> { /* ... */ }
async function callAnthropic(prompt: string, key: string): Promise<VendorResponse> { /* ... */ }
async function callGemini(prompt: string, key: string): Promise<VendorResponse> { /* ... */ }

function consensus(responses: VendorResponse[]): { verdict: Verdict; result?: object } {
  const ok = responses.filter((r) => !r.error && (r.title || r.doi));
  if (ok.length < 2) return { verdict: "insufficient_providers" };

  // Strong consensus: ≥2 vendors return the same canonical id.
  const counts = new Map<string, number>();
  for (const r of ok) if (r.doi) counts.set(r.doi, (counts.get(r.doi) ?? 0) + 1);
  for (const [id, n] of counts) {
    if (n >= 2) {
      const match = ok.find((r) => r.doi === id)!;
      return { verdict: "verified_strong_consensus", result: match };
    }
  }

  // Metadata consensus: same title + author overlap across ≥2.
  const anchor = ok[0]!;
  let agree = 1;
  for (let i = 1; i < ok.length; i++) {
    if (sameTitle(anchor.title, ok[i]!.title) && authorOverlap(anchor.authors, ok[i]!.authors) >= 0.5) {
      agree++;
    }
  }
  if (agree >= 2) return { verdict: "verified_metadata_consensus", result: anchor };

  return { verdict: "disagreement" };
}

export async function validate(prompt: string, env: Env) {
  const tasks: Promise<VendorResponse>[] = [];
  if (env.OPENAI_API_KEY) tasks.push(callOpenAI(prompt, env.OPENAI_API_KEY));
  if (env.ANTHROPIC_API_KEY) tasks.push(callAnthropic(prompt, env.ANTHROPIC_API_KEY));
  if (env.GEMINI_API_KEY) tasks.push(callGemini(prompt, env.GEMINI_API_KEY));

  if (tasks.length === 0) {
    return { verdict: "insufficient_providers" as const, cost_usd: 0 };
  }

  const responses = await Promise.all(tasks);
  const cost = responses.reduce((s, r) => s + r.cost_usd, 0);
  const budget = Number(env.BUDGET_USD ?? "2");
  if (cost > budget) return { verdict: "error" as const, cost_usd: cost };

  return { ...consensus(responses), cost_usd: cost, responses };
}

Design decisions

  • Independent vendors, not multiple calls to one model. Cross-vendor agreement guards against shared training-data biases. Two calls to the same model are nearly perfectly correlated.
  • Strong consensus uses canonical ids first. When two vendors both return the same DOI, the verdict is rock-solid. Fallback to metadata fuzzy-match only when ids are absent.
  • insufficient_providers is a first-class verdict. "Only one model available" is not "verified by one model" — the result is honest absence-of-consensus, not a fake yes.
  • Per-call budget guard. Multi-vendor fan-out racks up tokens fast. The budget is a circuit breaker that returns error rather than silently overspending.
  • Strict JSON output format. Force response_format: { type: "json_object" } (OpenAI), system-prompt JSON requirement (Anthropic/Gemini), and parse defensively — most vendors leak markdown code fences around the JSON.
  • Errored vendors don't count. Drop them from the consensus pool; only successful structured outputs vote.

Tradeoffs

StrengthsWeaknesses
Defensible "verified" verdict backed by cross-vendor agreement3× the per-call cost (and latency, if not parallelized)
Disagreement is itself a signal — surfaces "uncertain" claims to curatorVendor pricing changes silently; cost estimates drift
Tolerates individual vendor outages without falling to zero confidenceAdding a 4th vendor is a new branch, not a config change
Honest "insufficient providers" prevents accidental single-model trustStrict JSON enforcement breaks when a vendor changes their wrapper

Citations

  • packages/literature/src/multimodel/validate-non-doi.ts — full implementation across OpenAI, Anthropic, Gemini with consensus + budget guard.
  • packages/registry/src/loop/handlers/verify-effect-size-claim.ts — same shape for verifying extracted claims against cited sources (PRN-032 novelty verification).

P06. Vendored-Local Type Mirrors Awaiting Upstream Package Publish

Problem You're building against a shared package whose new version isn't published yet. The team has agreed on the new types — they live in a draft PR, a coordinator brief, a planning doc — but the package registry won't resolve them until the upstream owner pushes. You don't want to wait; you also don't want a local fork that diverges.

The Pattern

// File: src/types/upstream-mirror.ts
//
// MIRRORS: @canonical/shared-types@0.11.0 (not yet published).
// When v0.11.0 lands on the registry, swap this file for:
//   export type { Intervention, DeliveryModality } from "@canonical/shared-types";
// The shapes below are STRUCTURALLY IDENTICAL — no consumer changes needed.

/** Mirrors `DeliveryModality` from @canonical/shared-types@0.11.0. */
export type DeliveryModality =
  | "training" | "coaching" | "tech" | "policy" | "other";

/** Mirrors `CostBand` from @canonical/shared-types@0.11.0. */
export type CostBand = "low" | "medium" | "high" | "enterprise";

/**
 * Mirrors `Intervention` from @canonical/shared-types@0.11.0.
 * Structurally identical — TypeScript's structural typing means consumers
 * can `import type { Intervention } from "./upstream-mirror"` today and
 * `from "@canonical/shared-types"` post-publish without touching call sites.
 */
export interface Intervention {
  intervention_id: string;
  name: string;
  targets: string[];
  mechanism: string;
  delivery_modality: DeliveryModality[];
  cost_band?: CostBand;
}

// Re-export at the package boundary so internal callers always import from
// here, never directly from `@canonical/shared-types`. The swap becomes
// a one-file edit.
//
// File: packages/registry/src/index.ts
export type { Intervention, DeliveryModality } from "./types/upstream-mirror";

Design decisions

  • Header comment is mandatory. Reader has to know it's a mirror, what version it tracks, and what triggers the swap. Without that, future contributors edit the local copy and create real drift.
  • Structural identity is the contract. TypeScript's structural typing means a local Intervention and the eventual published Intervention are interchangeable as long as the field-by-field shape matches. The swap is literally s/from ".\/upstream-mirror"/from "@canonical\/shared-types"/g.
  • Internal callers always import from the package barrel. Never from the mirror file directly, never from the not-yet-published package. The barrel is the single bottleneck.
  • Mirrors live in a dedicated module. Not scattered in types.ts for each capability. Future swap is grep-able: find . -name "*-types.ts" -path "*mirror*".
  • Bounded lifetime. Mirrors should ship with an explicit swap ticket (track in your assignment queue, your TODO list, somewhere watched). Mirrors that persist beyond their upstream publish become real forks.

Tradeoffs

StrengthsWeaknesses
Unblocks parallel work — consumers don't wait for upstream to publishMirror can drift if upstream changes spec before publishing
Swap is a one-file edit, not a fan-out refactorDiscipline-only — nothing enforces the structural-identity claim
Header comment makes intent legibleLong-lived mirrors are an architectural smell
TypeScript catches divergence at the consumer boundary on swapReviewing a mirror requires diffing against an unpublished spec

Citations

  • packages/registry/src/loop/intervention-types.ts — mirrors Intervention from @people-analyst/measurement-core@0.11.0 ahead of publish.
  • packages/registry/src/loop/equivalence-types.ts, critique-types.ts, verification-types.ts — same pattern across four other deltas.
  • packages/registry/src/index.ts — barrel re-export so consumers never see the mirror.

P07. Idempotent Background Job Queue with Exponential-Backoff Retry

Problem You have background work (refresh metadata from an external API, run a slow LLM extraction, recompute a derived view) that needs to survive process restarts, retry on transient failure, and not double-run when two callers enqueue the same job concurrently. You don't want to operate a separate job-runner service.

The Pattern

type JobStatus = "pending" | "running" | "complete" | "failed" | "skipped";
type Priority = "high" | "normal" | "low";

interface Job {
  job_id: string;            // deterministic — same logical work → same id
  job_type: string;
  subject_id: string;
  status: JobStatus;
  priority: Priority;
  scheduled_at: string;
  started_at?: string;
  completed_at?: string;
  error?: string;
  retries: number;
  next_retry_at?: string;
}

interface Handler {
  job_type: string;
  handle(job: Job, ctx: { now: Date }): Promise<{ produced?: string[] }>;
}

interface RetryPolicy {
  max_retries: number;
  base_delay_ms: number;
  max_delay_ms: number;
}

const DEFAULT_RETRY: RetryPolicy = {
  max_retries: 3,
  base_delay_ms: 5_000,
  max_delay_ms: 5 * 60_000,
};

function backoff(retries: number, p: RetryPolicy): number {
  return Math.min(p.base_delay_ms * 2 ** retries, p.max_delay_ms);
}

export class Worker {
  private readonly handlers: Map<string, Handler>;
  constructor(
    private readonly store: Store,
    handlers: Handler[],
    private readonly policy = DEFAULT_RETRY,
  ) {
    this.handlers = new Map(handlers.map((h) => [h.job_type, h]));
  }

  /** Idempotent enqueue. Same (job_type, subject_id) twice = one row. */
  async enqueue(input: { job_type: string; subject_id: string; priority?: Priority }) {
    const job_id = `job.${input.job_type}.${input.subject_id}`;
    const existing = await this.store.get<Job>("job", job_id);
    if (existing) {
      const s = existing.entity.status;
      if (s === "pending" || s === "running") return { job_id, created: false };
      // Terminal status — caller wants another pass. Re-file to pending.
      await this.persist({ ...existing.entity, status: "pending", retries: 0,
                            error: undefined, next_retry_at: undefined });
      return { job_id, created: false };
    }
    await this.persist({
      job_id, job_type: input.job_type, subject_id: input.subject_id,
      status: "pending", priority: input.priority ?? "normal",
      scheduled_at: new Date().toISOString(), retries: 0,
    });
    return { job_id, created: true };
  }

  /** Run every pending + due job. */
  async tick(now = new Date()) {
    const pending = await this.store.findBy<Job>("job", (r) => r.entity.status === "pending");
    const due = pending.filter((r) => this.isDue(r.entity, now));
    due.sort((a, b) => /* high > normal > low; oldest first within tier */ 0);

    for (const row of due) {
      const handler = this.handlers.get(row.entity.job_type);
      if (!handler) {
        await this.persist({ ...row.entity, status: "skipped",
                              error: `no handler for ${row.entity.job_type}` });
        continue;
      }
      await this.runOne(row.entity, handler, now);
    }
  }

  private async runOne(job: Job, handler: Handler, now: Date) {
    await this.persist({ ...job, status: "running", started_at: now.toISOString() });
    try {
      const result = await handler.handle(job, { now });
      await this.persist({ ...job, status: "complete",
                            completed_at: new Date().toISOString() });
    } catch (err) {
      const retries = job.retries + 1;
      if (retries > this.policy.max_retries) {
        await this.persist({ ...job, status: "failed",
                              error: (err as Error).message, retries });
      } else {
        const nextRetryAt = new Date(Date.now() + backoff(job.retries, this.policy));
        await this.persist({ ...job, status: "pending", retries,
                              next_retry_at: nextRetryAt.toISOString(),
                              error: (err as Error).message });
      }
    }
  }

  private isDue(job: Job, now: Date): boolean {
    const scheduledOk = Date.parse(job.scheduled_at) <= now.getTime();
    const retryOk = !job.next_retry_at || Date.parse(job.next_retry_at) <= now.getTime();
    return scheduledOk && retryOk;
  }

  private async persist(job: Job) {
    await this.store.put({ id: job.job_id, type: "job", entity: job,
                           provenance: [], created_at: "", updated_at: "" });
  }
}

Design decisions

  • Deterministic job_id is the dedup primitive. Same (job_type, subject_id) enqueued from N callers produces 1 row. No lookup-then-insert race.
  • Re-enqueue of a terminal job is "refile". A complete or failed job, re-enqueued, is the caller saying "do it again." Reset retries and scheduled_at; status → pending.
  • Status transitions are append-only by convention. pending → running → complete (or failed, or back to pending for retry). The store retains the latest state; the provenance trail carries the history.
  • Exponential backoff capped at max_delay_ms. Otherwise a job that keeps failing with max_retries=10 would schedule its 10th retry days out — useless for visibility.
  • No next_retry_at clearing on success. Failed-then-succeeded jobs retain the trail of attempts in their provenance.
  • Worker is a function, not a service. tick() is called by cron, by a script, by an admin button. The worker doesn't own its scheduling cadence.

Tradeoffs

StrengthsWeaknesses
Survives restarts — state lives in the store, not in process memorySingle-process by default; needs locking semantics for multi-worker
Idempotent at enqueue and at handler level (the handlers must be idempotent too)Polling overhead per tick — not push-driven
New job types are new handlers — no schema changesNo native cron expression — cadence lives outside the worker
Retry policy is per-worker, not per-job — keeps reasoning simpleBackoff is wall-clock, not service-specific — a flaky upstream can starve other jobs

Citations

  • packages/registry/src/loop/enrichment-queue.ts — full EnrichmentQueue + EnrichmentWorker with retry policy, priority ordering, no-handler-skip.
  • packages/registry/src/loop/handlers/refresh-citation-metadata.ts — minimal handler example wrapping an upstream API call.
  • packages/registry/src/loop/verify-enqueue.ts — composing the queue with a deterministic-id helper to file verification jobs idempotently from many call sites.

P08. License-Gated Field Stripping at the Response Boundary

Problem Your catalog includes records whose source carries usage restrictions (proprietary text, attribution-required licenses, copyrighted item wording). You're building a public-facing API. Some fields can be returned safely; others can't. If even one handler forgets the check, you've published copyrighted text under your name. You want the gate to live at the response boundary, not threaded through every reader.

The Pattern

import { z } from "zod";

// Closed allowlist — defaults to "deny" for anything not explicitly listed.
const PUBLISHABLE_LICENSES = new Set([
  "public_domain",
  "open",
  "attribution_required", // if attribution is rendered alongside
]);

interface Parent {
  parent_id: string;
  usage_restrictions?: {
    field_license?: string;
    publisher?: string;
  };
}

interface Item {
  item_id: string;
  parent_id: string;
  text?: string;   // license-gated
  ordinal?: number; // safe
}

function resolveLicense(parent: Parent | null): string | null {
  return parent?.usage_restrictions?.field_license ?? null;
}

function isPublishable(license: string | null): boolean {
  if (!license) return false; // unknown → restricted by default
  return PUBLISHABLE_LICENSES.has(license);
}

function stripGatedFields(item: Item): Omit<Item, "text"> {
  const { text: _t, ...rest } = item;
  return rest;
}

// Discriminated union response — caller pattern-matches on status.
export const LookupOutput = z.discriminatedUnion("status", [
  z.object({ status: z.literal("ok"), item: z.object({}).passthrough().nullable() }),
  z.object({ status: z.literal("restricted"), reason: z.string(), parent_id: z.string() }),
]);

export async function lookupItem(store: Store, id: string) {
  const itemRow = await store.get<Item>("item", id);
  if (!itemRow) return { status: "ok" as const, item: null };

  const parentRow = await store.get<Parent>("parent", itemRow.entity.parent_id);
  const license = resolveLicense(parentRow?.entity ?? null);

  if (!isPublishable(license)) {
    return {
      status: "restricted" as const,
      reason: `parent license="${license ?? "unknown"}" — field rendering forbidden`,
      parent_id: itemRow.entity.parent_id,
    };
  }
  return { status: "ok" as const, item: itemRow.entity };
}

// Bulk search: never elide silently — surface the restricted count.
export async function searchItems(store: Store, q: string) {
  const all = await store.findBy<Item>("item", (r) => /* ... */ true);
  let restricted = 0;
  const matches = await Promise.all(all.map(async (row) => {
    const parent = await store.get<Parent>("parent", row.entity.parent_id);
    if (!isPublishable(resolveLicense(parent?.entity ?? null))) {
      restricted++;
      return stripGatedFields(row.entity);
    }
    return row.entity;
  }));
  return { matches, restricted_count: restricted };
}

Design decisions

  • Default-deny allowlist, not denylist. Unknown license → restricted. A new license value lands without anyone updating code; nothing accidentally leaks.
  • Gate at the response boundary, not in the store. The store carries everything; the public API strips. Internal consumers (research scripts, admin UI) get the unfiltered view; only public-facing handlers run the gate.
  • { status: "restricted", reason } is a distinct response shape. Not a null, not a stripped object pretending to be complete. The caller has to acknowledge the gate happened — discriminated union forces it.
  • Search surfaces restricted_count explicitly. Silent elision is worse than honest "we found 14 matches but can't show you the text of 9 of them."
  • License resolution has a legacy/back-compat fallback. Older data uses permissions.usage_type; newer data uses usage_restrictions.field_license. The resolver checks both — but the allowlist is one set.

Tradeoffs

StrengthsWeaknesses
Default-deny prevents accidental leaks on new license valuesPublic handlers pay a parent-lookup per item — N+1 if not joined
Gate logic lives in one module — auditable in one placeInternal vs public boundary must be enforced by handler hygiene
Discriminated union forces callers to handle the restricted caseAdding a new gateable field requires editing the strip function
Restricted-count gives honest "we have more, but" signal in searchLicense vocabulary drift across sources is a real maintenance cost

Citations

  • packages/registry/src/mcp/handlers/items.ts — full implementation with PUBLISHABLE_LICENSES allowlist, resolveLicense legacy-fallback, lookup discriminated union, and bulk-search restricted_count.

P09. Cron-Driven Watchlist Scheduler with Cadence-Based Next-Run

Problem You want persistent searches against external sources (a search query, an RSS feed, a third-party API monitor) that run on their own cadence and route matches into downstream processing. You don't want every search to be a separate cron line; you don't want one giant "search everything" job either. Each search should declare its own cadence and route, and the scheduler should fire only the ones that are due.

The Pattern

type Cadence = "hourly" | "daily" | "weekly" | "monthly";
type SourceName = "scholar" | "openalex" | "rss" | "manual";

const CADENCE_MS: Record<Cadence, number> = {
  hourly:  60 * 60 * 1000,
  daily:   24 * 60 * 60 * 1000,
  weekly:   7 * 24 * 60 * 60 * 1000,
  monthly: 30 * 24 * 60 * 60 * 1000,
};

interface Watchlist {
  watchlist_id: string;
  description: string;
  query: { sources: SourceName[]; keywords?: string[] };
  cadence: Cadence;
  last_run_at?: string;
  next_run_at: string;
  matches_to_date: number;
  routes_to: "downstream_queue" | "review_queue";
  active: boolean;
}

interface Match { external_id: string; title: string; url?: string; }
interface Source { name: SourceName; search(w: Watchlist): Promise<Match[]>; }

export class Scheduler {
  private readonly sources: Map<SourceName, Source>;
  constructor(private readonly store: Store, sources: Source[]) {
    this.sources = new Map(sources.map((s) => [s.name, s]));
  }

  /** Fire every active watchlist whose next_run_at ≤ now. */
  async runDue(now = new Date()) {
    const watchlists = await this.store.iterate<Watchlist>("watchlist");
    const stats = { considered: 0, run: 0, not_due: 0, inactive: 0 };

    for (const row of watchlists) {
      stats.considered++;
      const w = row.entity;
      if (!w.active) { stats.inactive++; continue; }
      if (Date.parse(w.next_run_at) > now.getTime()) { stats.not_due++; continue; }
      await this.runOne(w, now);
      stats.run++;
    }
    return stats;
  }

  private async runOne(w: Watchlist, now: Date) {
    let routed = 0;
    let errors: string[] = [];
    for (const sourceName of w.query.sources) {
      const source = this.sources.get(sourceName);
      if (!source) { errors.push(`no adapter: ${sourceName}`); continue; }
      try {
        const matches = await source.search(w);
        for (const m of matches) {
          const stored = await this.routeMatch(w, m);
          if (stored === "new") routed++;
        }
      } catch (err) {
        errors.push((err as Error).message);
        // Continue with remaining sources — one bad source ≠ whole tick fails.
      }
    }
    // Always advance next_run_at — even on partial failure — so we don't busy-loop.
    const next = new Date(now.getTime() + CADENCE_MS[w.cadence]).toISOString();
    await this.store.put({
      ...w as object,
      last_run_at: now.toISOString(),
      next_run_at: next,
      matches_to_date: w.matches_to_date + routed,
    });
  }

  /** Idempotent on a deterministic match-id derived from source + external_id. */
  private async routeMatch(w: Watchlist, m: Match): Promise<"new" | "exists"> {
    const id = `${w.query.sources[0]}:${m.external_id}`;
    const existing = await this.store.get("downstream_item", id);
    if (existing) return "exists";
    await this.store.put({ id, type: "downstream_item", entity: m, /* ... */ });
    return "new";
  }
}

Design decisions

  • Watchlists are data, not code. A new search isn't a new code path — it's a new row in the watchlist table. Add / pause / tune from an admin UI; no deploy.
  • next_run_at advances unconditionally per tick. Even on source error, we move the cursor forward. Otherwise a chronically-failing source busy-loops the scheduler.
  • Per-watchlist source loop is independent. One bad source within a watchlist's source list doesn't kill the whole watchlist — we collect errors and continue.
  • active: boolean separates "paused" from "deleted". Deletion loses history; pausing keeps the row + matches_to_date + last_run_at for forensics.
  • Match routing is idempotent at the downstream id. The scheduler's job is "deliver matches at least once"; downstream deduplication catches duplicates from re-runs.
  • Cadence is enum, not duration. "Daily" is more legible than 86400000 in audit logs and admin UIs. The enum-to-ms table is one place; adding "quarterly" is two lines.

Tradeoffs

StrengthsWeaknesses
New watchlists ship without code changesNo "run after X event" cadence — purely time-driven
Sources are pluggable (mock for tests, real for prod)Source adapter quality varies; failure modes are heterogeneous
Per-watchlist failure isolationWatchlists with overlapping queries waste API calls
Pause-vs-delete semantics preserve audit trailCadence drift over long runs — next_run_at is wall-clock, not aligned

Citations

  • packages/registry/src/loop/literature-monitor.ts — full LiteratureMonitorScheduler with cadence table, per-source loop, idempotent routing, and pluggable LiteratureSource adapters.
  • packages/registry/src/loop/sources/index.ts — adapter-factory that wires real sources from env vars.

P10. Read-Only Gap Detector with Stable Gap IDs

Problem You want to surface "what's missing" from a registry — incomplete records, under-evidenced claims, stale watchlists, unverified citations — as actionable items. You want curators to act on them (manually, or by filing follow-up work). You explicitly don't want the detector to fix anything itself, because the registry's value depends on every change being curator-approved.

The Pattern

type GapType =
  | "thin_evidence"        // record has fewer than N supporting sources
  | "unverified_claim"     // extraction status: needs review
  | "no_linked_evidence"   // verified record but no downstream linkage
  | "stale_watchlist";     // not run in 2× cadence

interface Gap {
  gap_id: string;             // deterministic — stable across runs
  gap_type: GapType;
  subjects: string[];          // entity ids the gap concerns
  detected_at: string;
  severity: "low" | "med" | "high";
  suggested_action: string;    // human-readable next step
  context: Record<string, unknown>;
}

function hash32(s: string): string {
  let h = 0x811c9dc5;
  for (let i = 0; i < s.length; i++) {
    h ^= s.charCodeAt(i);
    h = Math.imul(h, 0x01000193);
  }
  return (h >>> 0).toString(16).padStart(8, "0");
}

function gapId(type: GapType, subjects: string[]): string {
  const sorted = [...subjects].sort().join("|");
  return `gap:${type}:${hash32(sorted)}`;
}

// PURE FUNCTION over store snapshot. No store mutations. No side effects.
export async function detectGaps(store: Store): Promise<Gap[]> {
  const out: Gap[] = [];
  const now = new Date().toISOString();

  // Thin evidence: records with < 3 supporting sources.
  const records = await store.iterate<{ supports: string[] }>("record");
  for (const row of records) {
    if ((row.entity.supports ?? []).length < 3) {
      out.push({
        gap_id: gapId("thin_evidence", [row.id]),
        gap_type: "thin_evidence",
        subjects: [row.id],
        detected_at: now,
        severity: "med",
        suggested_action: `find ≥${3 - (row.entity.supports ?? []).length} more sources`,
        context: { current_support_count: (row.entity.supports ?? []).length },
      });
    }
  }

  // Stale watchlists: not run in 2× cadence.
  const cadenceMs = { daily: 86_400_000, weekly: 7 * 86_400_000 };
  const watchlists = await store.iterate<{
    cadence: keyof typeof cadenceMs;
    last_run_at?: string;
  }>("watchlist");
  for (const row of watchlists) {
    const cadence = cadenceMs[row.entity.cadence];
    const last = row.entity.last_run_at ? Date.parse(row.entity.last_run_at) : 0;
    if (Date.now() - last > 2 * cadence) {
      out.push({
        gap_id: gapId("stale_watchlist", [row.id]),
        gap_type: "stale_watchlist",
        subjects: [row.id],
        detected_at: now,
        severity: "low",
        suggested_action: "investigate scheduler / source health",
        context: { last_run_at: row.entity.last_run_at, cadence: row.entity.cadence },
      });
    }
  }

  return out;
}

Design decisions

  • Read-only. The detector reads; it never writes. A separate consumer (a digest script, a follow-up task creator, a notification) can act on the output. This separation is the load-bearing discipline — auto-mutation in the detector path would betray the audit promise.
  • Deterministic gap_id from sorted subjects. Re-running detection produces the same id for the same gap. Consumers persisting gaps (or filing follow-up work from them) can dedupe.
  • Gap types are a closed enum. Each one carries a suggested_action template so the consumer renders human-readable guidance, not raw gap-type strings.
  • Severity is bucketed, not raw. "Med" reads as actionable; "score: 0.42" reads as artifact. Severity buckets are stable across small data shifts.
  • subjects[] is a list, not a single id. Some gaps involve multiple entities (a missing edge between two nodes; a triple that's missing one corner). Singleton case is the array of length 1.
  • Pure function signature. No environment access, no env-var reads, no Date.now() outside the explicit detected_at stamp — trivially unit-testable.

Tradeoffs

StrengthsWeaknesses
Curator stays in the loop — nothing auto-fixedDetection latency = how often you call it; gaps linger until next run
Same gap across runs produces the same id (idempotent downstream)Adding a new gap type means a new code path, not a new row
Trivially unit-testableLong iteration over large stores is O(N) per type
Severity + suggested_action make output renderable directlyHard-coded thresholds (< 3 sources) drift from true policy over time

Citations

  • packages/registry/src/loop/gap-detector.ts — full detectRegistryGaps() with five gap types, deterministic gap_id, suggested-task-type mapping.
  • packages/registry/src/loop/research-task.tsupsertResearchTaskFromGap(), the curator-policy-compliant consumer that turns gaps into trackable follow-up items.

P11. Process-Singleton Resource with Resolution-Order Fallback

Problem A heavy resource (DB connection pool, ingest pipeline, model client, secret-resolver) needs to be built once per process and shared across every request handler. You also want a deterministic order for which backend to use: production env-var first, dev override second, bundled default third, tmp fallback fourth. Lazy build, no race on first request, no rebuild on hot-reload.

The Pattern

import * as fs from "node:fs";
import * as os from "node:os";
import * as path from "node:path";

// Persist across HMR / module-graph re-imports in dev.
declare global {
  var __sharedResource: Resource | undefined;
  var __sharedResourceReady: Promise<Resource> | undefined;
}

interface Resource { /* whatever shape */ }

function resolveBackend(env: NodeJS.ProcessEnv): {
  kind: "primary" | "env-override" | "bundled" | "tmp";
  config: string;
} {
  // 1. Primary: a real backend if its env var is set.
  if (env.DATABASE_URL) return { kind: "primary", config: env.DATABASE_URL };

  // 2. Env override for local dev (e.g., explicit JSON store path).
  if (env.RESOURCE_ROOT) return { kind: "env-override", config: env.RESOURCE_ROOT };

  // 3. Bundled default — what production uses when no env is set.
  const bundled = path.join(process.cwd(), "data/bundled");
  if (fs.existsSync(bundled)) return { kind: "bundled", config: bundled };

  // 4. Tmp fallback — last resort for local dev with no setup.
  return { kind: "tmp", config: path.join(os.tmpdir(), "shared-resource") };
}

function redact(s: string): string {
  try {
    const u = new URL(s);
    if (u.password) u.password = "***";
    return u.toString();
  } catch { return "<unparseable>"; }
}

async function build(): Promise<Resource> {
  const { kind, config } = resolveBackend(process.env);
  console.log(`[shared-resource] using ${kind} backend at ${redact(config)}`);
  // ... build the resource based on `kind`
  return {} as Resource;
}

/** Lazy, race-safe, idempotent. */
export function getResource(): Promise<Resource> {
  if (globalThis.__sharedResource) {
    return Promise.resolve(globalThis.__sharedResource);
  }
  if (!globalThis.__sharedResourceReady) {
    globalThis.__sharedResourceReady = build().then((r) => {
      globalThis.__sharedResource = r;
      return r;
    });
  }
  return globalThis.__sharedResourceReady;
}

Design decisions

  • Cache the promise, not the resolved value. Two concurrent first-callers both await the same in-flight build; we don't kick off two parallel builds. (Caching only the resolved value is a classic race-condition footgun.)
  • globalThis cache survives HMR. Module re-imports during dev hot-reload would re-build the resource if it lived in module scope. globalThis lives across the boundary.
  • Resolution order is explicit and code-encoded. Not a config file. Reading the function tells you everything about backend selection.
  • Each branch announces itself. console.log at boot tells you which backend resolved. Without this, "why is my dev pulling from production?" debugging is brutal.
  • Sensitive config is redacted in logs. Connection strings often include credentials; strip them before logging.
  • Fallback isn't a silent default. Logging kind: "tmp" makes "I'm running on an ephemeral local backend" visible — so nobody ships against the wrong env.

Tradeoffs

StrengthsWeaknesses
Build cost paid once per processFirst request pays the build latency
Concurrent first-callers don't raceBuild failures fail every caller — no per-call retry
Same code works for dev / preview / prod via resolution orderResolution order changes are easy to get wrong; needs tests
Surviving HMR avoids dev churnglobalThis is a typed-loophole; needs declare global boilerplate

Citations

  • apps/web/lib/store.tsgetStore() with DATABASE_URLSTORE_ROOT → bundled → tmp fallback, promise-caching on globalThis, redacted-URL logging.

P12. Redis-Primary, In-Memory-Fallback Rate Limiter

Problem You want per-consumer rate limits in production where many serverless instances share traffic — a per-instance counter is useless because each instance only sees a fraction of the load. But local dev shouldn't require Redis, and a Redis outage shouldn't 500 your whole API.

The Pattern

interface RateLimitResult {
  allowed: boolean;
  limit: number;
  remaining: number;
  reset_at: string;
  retry_after_seconds: number;
}

interface RateSpec { count: number; window_ms: number; }

const buckets: Map<string, { hits: number[]; }> = new Map();

function getRedis(): { incr: (k: string) => Promise<number>;
                       expire: (k: string, s: number) => Promise<unknown>; } | null {
  if (!process.env.REDIS_URL) return null;
  // Lazy build a singleton client; return null when unavailable.
  return /* upstash redis instance or similar */ null;
}

export async function check(consumer: string, spec: RateSpec, now = Date.now()): Promise<RateLimitResult> {
  const redis = getRedis();
  if (redis) {
    try {
      return await checkRedis(redis, consumer, spec, now);
    } catch (err) {
      // Fail-open to in-memory so a Redis blip doesn't 500 the API.
      console.warn("[rate-limit] Redis check failed; using in-memory", err);
      // fall through
    }
  }
  return checkInMemory(consumer, spec, now);
}

async function checkRedis(redis: NonNullable<ReturnType<typeof getRedis>>,
                          consumer: string, spec: RateSpec, now: number): Promise<RateLimitResult> {
  // Fixed-window counter. Key includes the window index so we get
  // automatic per-window isolation without explicit cleanup.
  const windowIndex = Math.floor(now / spec.window_ms);
  const key = `ratelimit:${consumer}:${windowIndex}`;
  const windowSec = Math.max(1, Math.floor(spec.window_ms / 1000));
  const count = await redis.incr(key);
  if (count === 1) await redis.expire(key, windowSec + 1);

  const resetMs = (windowIndex + 1) * spec.window_ms;
  if (count > spec.count) {
    return {
      allowed: false, limit: spec.count, remaining: 0,
      reset_at: new Date(resetMs).toISOString(),
      retry_after_seconds: Math.max(1, Math.ceil((resetMs - now) / 1000)),
    };
  }
  return {
    allowed: true, limit: spec.count,
    remaining: Math.max(0, spec.count - count),
    reset_at: new Date(resetMs).toISOString(),
    retry_after_seconds: 0,
  };
}

function checkInMemory(consumer: string, spec: RateSpec, now: number): RateLimitResult {
  // True sliding window — per-instance only. OK for dev; not a substitute
  // for Redis in production with multiple instances.
  const bucket = buckets.get(consumer) ?? { hits: [] };
  buckets.set(consumer, bucket);
  const cutoff = now - spec.window_ms;
  while (bucket.hits.length > 0 && bucket.hits[0]! <= cutoff) bucket.hits.shift();

  if (bucket.hits.length >= spec.count) {
    const resetMs = bucket.hits[0]! + spec.window_ms;
    return {
      allowed: false, limit: spec.count, remaining: 0,
      reset_at: new Date(resetMs).toISOString(),
      retry_after_seconds: Math.max(1, Math.ceil((resetMs - now) / 1000)),
    };
  }
  bucket.hits.push(now);
  const resetMs = bucket.hits[0]! + spec.window_ms;
  return {
    allowed: true, limit: spec.count,
    remaining: Math.max(0, spec.count - bucket.hits.length),
    reset_at: new Date(resetMs).toISOString(),
    retry_after_seconds: 0,
  };
}

Design decisions

  • Redis fixed-window, not sliding-log. Sliding-log needs more keys per check; fixed-window is one INCR + one EXPIRE. Acceptable edge case: boundary bursts can admit 2× the limit briefly. Document it; don't hide it.
  • EXPIRE unconditionally each call. NX-conditional EXPIRE requires a Lua script on Upstash REST. Re-setting the TTL on every INCR is harmless and keeps the pipeline at two ops.
  • Fail-open on Redis error. A Redis outage should not take down rate limiting (and thus the API). Log it loudly, fall back to in-memory, and surface the degradation in metrics.
  • In-memory is per-instance. Document this explicitly — the effective ceiling becomes instances × limit. Production must run with Redis to get a meaningful global ceiling.
  • Retry-After semantics in the response. When allowed: false, populate retry_after_seconds so the API can forward it as an HTTP header.
  • Reset timestamp in result, even when allowed. Callers can show "X requests remaining, resets in N seconds" without a second probe.

Tradeoffs

StrengthsWeaknesses
Production gets globally-correct limits via RedisDual implementations — two code paths to keep in sync
Local dev works out of the box without RedisBoundary-burst edge case admits 2× briefly
Redis outages degrade gracefully (in-memory fallback)In-memory fallback in production with multiple instances is incorrect
Single check() API across both pathsFailover detection is per-process — concurrent processes can disagree

Citations

  • packages/registry/src/rest/rate-limit.ts — full checkRestRateLimit with parse-rate, env-var-driven per-consumer specs, fail-open fallback, test-only state resetters.
  • packages/registry/src/rest/upstash-client.ts — client lazy-init that returns null when env is absent.

P13. Curator-Mediated Promotion (Read-Only Detection, Approval-Only Mutation)

Problem Your pipeline extracts structured claims from messy sources (LLM extractors, automated validators, third-party APIs). Each step produces "candidate" output — best-effort, frequently wrong, sometimes hallucinated. You want a system that surfaces candidates clearly but never lets an unsupervised extractor write to the canonical store. The promotion from "candidate" to "canonical" must be an explicit human action.

The Pattern

// Three statuses, three jurisdictions:
//   - "unverified" — extractor output; lives in the store but tagged
//   - "verified" — multi-vendor consensus (P05); machine-promoted, machine-trusted
//   - "rejected" — curator marked as wrong; never re-enqueue
type VerificationStatus = "unverified" | "verified" | "disputed" | "rejected";

interface Claim {
  claim_id: string;
  payload: { /* the actual claim */ };
  verification_status: VerificationStatus;
  verification_log: VerificationEntry[];
}

interface VerificationEntry {
  pass_id: string;        // deterministic from (verifier + timestamp + subject)
  verifier: string;
  verified_at: string;
  verdict: "confirmed" | "rejected" | "uncertain";
  evidence?: string;
  note: string;
}

// Verification handler: writes to the LOG, may flip status to verified/disputed.
// NEVER promotes to "canonical-truth" outside its narrow lane.
export class VerificationHandler {
  readonly job_type = "verify_claim";

  async handle(job: Job, ctx: { store: Store; now: Date }) {
    const row = await ctx.store.get<Claim>("claim", job.subject_id);
    if (!row) throw new Error(`claim not found: ${job.subject_id}`);

    // Curator-policy: rejected rows never re-enter the pipeline.
    if (row.entity.verification_status === "rejected") {
      return { note: "skipped: previously curator-rejected" };
    }

    const responses = await runMultiVendor(row.entity);
    const adjudication = adjudicate(responses);

    const newEntry: VerificationEntry = {
      pass_id: derivePassId(job, ctx.now),
      verifier: `multi-vendor:${responses.map((r) => r.provider).join("+")}`,
      verified_at: ctx.now.toISOString(),
      verdict: adjudication.verdict,
      evidence: adjudication.evidence,
      note: adjudication.note,
    };

    // Append to log; flip status based on adjudication.
    // The pipeline machine-promotes "uncertain → uncertain" and
    // "verified → verified" but NEVER "anything → canonical" without curator.
    await ctx.store.put({
      ...row,
      entity: {
        ...row.entity,
        verification_status: adjudication.status, // verified | disputed | unverified
        verification_log: [...row.entity.verification_log, newEntry],
      },
    });
    return { produced: [job.subject_id] };
  }
}

// Curator-side: explicit action. Lives in admin UI / CLI, not the pipeline.
export async function curatorPromote(store: Store, claim_id: string, note: string) {
  const row = await store.get<Claim>("claim", claim_id);
  if (!row) throw new Error("not found");
  if (row.entity.verification_status !== "verified") {
    throw new Error("only verified claims can be curator-promoted");
  }
  // Now we write to the canonical-truth table.
  await store.put({ id: claim_id, type: "canonical_claim", entity: row.entity.payload,
                    provenance: [{ source_path: `curator:${note}`,
                                    ingested_at: new Date().toISOString() }] });
}

export async function curatorReject(store: Store, claim_id: string, reason: string) {
  const row = await store.get<Claim>("claim", claim_id);
  if (!row) throw new Error("not found");
  // Rejection is recorded but the row stays — for forensics + dedup.
  await store.put({
    ...row,
    entity: { ...row.entity, verification_status: "rejected" },
  });
}

Design decisions

  • Three jurisdictions encoded as status. unverified = extractor output. verified = machine-promoted via cross-vendor consensus. canonical = curator-promoted. The pipeline owns the first two; the curator owns the third.
  • rejected is sticky. Once a curator says "no," the pipeline never re-attempts. The defensive short-circuit in every handler enforces this even if a downstream dispatcher tries to re-enqueue.
  • Verification log is append-only. Every pass leaves a stamp — verifier id, timestamp, verdict, evidence. The store carries the audit trail, not just the latest verdict.
  • Promotion is a separate code path. curatorPromote() lives in admin tooling. It's never called from a handler. The handlers can only flip status within the verification jurisdiction; canonical promotion is out-of-band.
  • Defensive guards at every entry point. Even the verification handler checks if (status === "rejected") skip — the queue's enqueue-side dedup catches most, but the handler trusts no caller.
  • No silent fail-to-verify. When zero verification providers are configured, the row stays unverified with a "no providers" note — not a fake verified.

Tradeoffs

StrengthsWeaknesses
Curator authority is structural, not policy — pipeline can't betray itCurator becomes the bottleneck for canonical-truth growth
Rejected-sticky prevents repeated wasted extraction passesCurators must rejection-reason; otherwise rejection rationale is lost
Three-status model maps cleanly to UI (queue / inbox / archive)Schema must thread verification_status through every claim type
Audit log answers "why is this canonical" / "why was this rejected"Verification log can grow large per row; needs archival at scale

Citations

  • packages/registry/src/loop/handlers/verify-effect-size-claim.ts — full multi-vendor verification handler with append-only log, deterministic pass_id, and the "no providers ⇒ unverified" honest-absence path.
  • packages/registry/src/loop/verify-enqueue.ts — short-circuits on extraction_status === "rejected" regardless of which caller filed the job.
  • packages/registry/src/loop/research-task.ts — curator-policy D4: "downstream promotion is a SEPARATE, curator-approved action — never written by this module."

P14. Opaque Base64 Cursor with Strict Decoder

Problem Your REST API needs paginated list endpoints. Today you'd happily use offset/limit; tomorrow you'll want keyset cursors for large tables. If clients ever see ?offset=N in URLs, swapping to keyset is a breaking change. You want pagination to be opaque from day one — but you also want bad cursors to fail loudly with a clean 400, not silently degrade.

The Pattern

const DEFAULT_LIMIT = 50;
const MAX_LIMIT = 500;

interface CursorEnvelope { o: number; }

export interface Page<T> {
  items: T[];
  next_cursor: string | null;
}

export function encodeCursor(offset: number): string {
  const envelope: CursorEnvelope = { o: offset };
  return Buffer.from(JSON.stringify(envelope), "utf-8").toString("base64url");
}

export function decodeCursor(cursor: string | null | undefined): number {
  if (!cursor) return 0;
  let json: string;
  try {
    json = Buffer.from(cursor, "base64url").toString("utf-8");
  } catch (err) {
    throw new Error(`bad_cursor: invalid base64`);
  }
  let parsed: unknown;
  try {
    parsed = JSON.parse(json);
  } catch {
    throw new Error(`bad_cursor: invalid json`);
  }
  // STRICT — reject anything that isn't the exact expected shape.
  if (!parsed || typeof parsed !== "object" ||
      typeof (parsed as CursorEnvelope).o !== "number" ||
      (parsed as CursorEnvelope).o < 0 ||
      !Number.isInteger((parsed as CursorEnvelope).o)) {
    throw new Error("bad_cursor: not a positive-integer offset envelope");
  }
  return (parsed as CursorEnvelope).o;
}

export function paginate<T>(
  rows: readonly T[],
  opts: { cursor?: string | null; limit?: number | null } = {},
): Page<T> {
  const offset = decodeCursor(opts.cursor);
  const limit = Math.max(1, Math.min(MAX_LIMIT, opts.limit ?? DEFAULT_LIMIT));
  const slice = rows.slice(offset, offset + limit);
  const nextOffset = offset + slice.length;
  return {
    items: slice as T[],
    next_cursor: nextOffset < rows.length ? encodeCursor(nextOffset) : null,
  };
}

// In a handler:
// try {
//   const page = paginate(rows, { cursor: req.query.cursor, limit: req.query.limit });
// } catch (err) {
//   return Response.json({ error: { code: "bad_cursor", message: err.message } }, { status: 400 });
// }

Design decisions

  • Cursor is opaque base64url JSON. Clients can't parse it; they treat it as a token. When you swap from offset to keyset, you change the envelope shape — clients keep working as long as they use the cursor we hand back.
  • Strict decoder. Anything that's not exactly the expected shape throws. No "best-effort" parsing, no fallback offset 0 on bad cursor — that would mask client bugs (stale cursor → silent re-pagination from start).
  • Throws, not returns Result. Lets the handler do one try/catch around decodeCursor + paginate and turn it into a clean 400. Caller pattern is simpler than a Result-typed return.
  • Hard cap on limit. Clients passing ?limit=1000000 get truncated to MAX_LIMIT. Prevents accidental (or malicious) memory blowups.
  • next_cursor: null means last page. Client logic: "while cursor is non-null, keep paging." Simpler than "while items.length === limit" which has off-by-one cases at the boundary.
  • Same return shape regardless of backend. When the backend changes (in-memory slice today, indexed query tomorrow), paginate()'s signature doesn't.

Tradeoffs

StrengthsWeaknesses
Migration to keyset is non-breaking for clientsOpaque cursors are harder to debug from raw requests
Strict decoder catches client bugs earlyForces a try/catch at every list-endpoint handler
Hard cap on limit prevents resource abuseThe 500 max is arbitrary; needs revisiting per endpoint shape
Same code path against any backendOffset-based today; will need different envelope when keyset lands

Citations

  • packages/registry/src/rest/cursor.ts — full encodeCursor / decodeCursor / paginate with strict validation and the bad_cursor error convention.

This catalog is the source-graded record of engineering patterns that have hardened in production code. New patterns join the list when they crystallize in actual implementation; speculative patterns belong in design docs, not here.