parts / principia
Principia — reusable patterns
A source-graded registry of organizational science exposed over a versioned REST + MCP contract. Pattern catalog is scaffolded upstream and will populate as the engagement-family ingest pipeline matures.
Principia — Reusable Engineering Patterns
Production-validated patterns from the Principia codebase, stripped of business context, written to be dropped into any new system.
Each pattern is copy-paste-ready TypeScript. Domain-specific terminology has been removed. Citations at the bottom of each entry point to the concrete implementation in this repo.
Pattern Index
| # | Pattern Name | Core Technology | Problem Solved |
|---|---|---|---|
| P01 | Single-Table JSONB-per-Row Registry with JSON Bootstrap Migration | Postgres JSONB + bundled JSON | Persist a heterogeneous, schema-evolving entity catalog without per-table DDL and migrate from a file-backed predecessor in one shot |
| P02 | Provenance-Merging Upsert Wrapper | Plain object map + JSON | Upsert records that accumulate "where did this come from" trails across many ingest passes without losing earlier sources |
| P03 | Pluggable Persistence Behind a Thin Async Interface | TypeScript interface + factory | Swap a JSON-file store for a database without rewriting ingest, resolve, MCP, and REST layers |
| P04 | Deterministic Idempotent IDs from Content Fingerprints | FNV-1a / SHA-256 hash | File the same logical work twice and get one row, not two — without needing a lookup-then-insert round-trip |
| P05 | Multi-Vendor LLM Consensus with Vote Adjudication | fetch + parallel Promise.all | Get a defensible "verified" verdict on extracted claims without trusting a single model |
| P06 | Vendored-Local Type Mirrors Awaiting Upstream Package Publish | TypeScript structural types | Ship code that depends on an unpublished package version by mirroring the not-yet-released shape locally |
| P07 | Idempotent Background Job Queue with Exponential-Backoff Retry | RegistryStore + Map<type, handler> | Drive long-running enrichment work that survives crashes, retries on transient failure, and dedupes concurrent enqueues |
| P08 | License-Gated Field Stripping at the Response Boundary | Allowlist set + handler-side strip | Return matching records to callers without leaking text whose source license forbids republication |
| P09 | Cron-Driven Watchlist Scheduler with Cadence-Based Next-Run | RegistryStore + interval map | Run persistent searches against external sources on a per-monitor cadence without a job-scheduler dependency |
| P10 | Read-Only Gap Detector with Stable Gap IDs | Pure function over store snapshot | Surface "what's missing" from a registry as actionable records without ever mutating the registry the curator is auditing |
| P11 | Process-Singleton Resource with Resolution-Order Fallback | globalThis cache + env-var precedence | Build a heavy resource (DB pool, ingest pipeline) once per process, with a deterministic resolution order across dev/preview/prod |
| P12 | Redis-Primary, In-Memory-Fallback Rate Limiter | Upstash REST + per-instance sliding window | Enforce global rate limits in production while never failing local dev or a Redis blip |
| P13 | Curator-Mediated Promotion (Read-Only Detection, Approval-Only Mutation) | Job handlers + explicit policy boundary | Build an extraction / verification pipeline that proposes changes but never auto-promotes — the curator stays in the loop |
| P14 | Opaque Base64 Cursor with Strict Decoder | base64url + JSON envelope | Paginate REST list endpoints in a way that lets you swap offset-based for keyset later without breaking clients |
Patterns
P01. Single-Table JSONB-per-Row Registry with JSON Bootstrap Migration
Problem You're building a registry of heterogeneous entity types (twenty kinds today, more next quarter). Each entity has a stable id + a payload whose shape will keep evolving. A column-per-field schema means a migration on every new entity type; a column-per-entity-type means twenty tables with near-identical CRUD. You also have an existing JSON-file store with real production data that needs to migrate into the database on first deploy without a separate migration script.
The Pattern
import postgres, { type Sql } from "postgres";
import * as fs from "node:fs";
import * as path from "node:path";
type EntityType = string;
interface Row<T> {
id: string;
type: EntityType;
entity: T;
provenance: { source: string; at: string }[];
created_at: string;
updated_at: string;
}
export class JsonbRegistry {
private readonly sql: Sql;
private readonly table = "registry_rows";
private readonly bundledRoot?: string;
private inited = false;
constructor(connectionString: string, opts: { bundledRoot?: string } = {}) {
// `prepare: false` is safer behind PgBouncer transaction-pooling.
this.sql = postgres(connectionString, { prepare: false });
this.bundledRoot = opts.bundledRoot;
}
async init(): Promise<void> {
if (this.inited) return;
await this.sql.unsafe(`
CREATE TABLE IF NOT EXISTS ${this.table} (
type text NOT NULL,
id text NOT NULL,
entity jsonb NOT NULL,
provenance jsonb NOT NULL DEFAULT '[]'::jsonb,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (type, id)
);
`);
await this.sql.unsafe(
`CREATE INDEX IF NOT EXISTS ${this.table}_type_idx ON ${this.table} (type);`,
);
// One-shot bootstrap migration: if the table is empty AND a bundled
// JSON corpus exists, ingest it. Subsequent boots see populated table
// and skip — so this is safe on every redeploy.
if (this.bundledRoot && (await this.isEmpty())) {
await this.migrateBundled(this.bundledRoot);
}
this.inited = true;
}
private async isEmpty(): Promise<boolean> {
const [{ count }] = await this
.sql<{ count: number }[]>`SELECT COUNT(*)::int AS count FROM ${this.sql(this.table)}`;
return count === 0;
}
private async migrateBundled(root: string): Promise<void> {
if (!fs.existsSync(root)) return;
for (const file of fs.readdirSync(root).filter((f) => f.endsWith(".json"))) {
const parsed = JSON.parse(fs.readFileSync(path.join(root, file), "utf-8"));
for (const r of parsed.rows ?? []) {
await this.sql`
INSERT INTO ${this.sql(this.table)}
(type, id, entity, provenance, created_at, updated_at)
VALUES (${parsed.type}, ${r.id},
${this.sql.json(r.entity)},
${this.sql.json(r.provenance ?? [])},
${r.created_at}, ${r.updated_at})
ON CONFLICT (type, id) DO NOTHING
`;
}
}
}
async get<T>(type: EntityType, id: string): Promise<Row<T> | null> {
await this.init();
const rows = await this.sql<Row<T>[]>`
SELECT type, id, entity, provenance, created_at, updated_at
FROM ${this.sql(this.table)} WHERE type = ${type} AND id = ${id} LIMIT 1
`;
return rows[0] ?? null;
}
}
Design decisions
- One table, not one per type. A new entity type lands without a migration. The cost is no per-type column constraints — moved to application-layer Zod schemas.
- JSONB, not JSON. Indexable; GIN-index-eligible later when you find the queries you actually run; preserves key order on read.
- Composite primary key
(type, id). Lets the same id namespace across types (a citation and a construct can both be"q12"without collision) and is the natural shard key if you ever scale out. - Bundled-JSON bootstrap is a one-shot. The "empty table → ingest bundled corpus" check runs every boot but only acts when truly empty. Means redeploys are no-ops; first-ever deploy seeds itself.
prepare: falsefor the connection. Required behind PgBouncer transaction-pooling (Neon's pooled endpoint, Supabase's pool). Prepared statements don't survive the transaction boundary there.- Findability tradeoff accepted.
findBy(predicate)scans + filters in JS; not pushing predicates to SQL. Fine at registry scale (< 1M rows); revisit when one type crosses 100k.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| New entity types ship without DDL changes | No per-field type checking at the DB layer; relies on Zod / handler discipline |
| Bootstrap migration is automatic — no separate migration runner | Can't run SQL aggregations over entity fields without ->>/-> casting |
| Same code path works against any Postgres (Neon, Supabase, self-hosted) | Per-row JSONB rewrite on every update is more expensive than column-targeted UPDATE |
Composite PK gives natural multi-tenancy via the type axis | Cross-entity-type joins require expanding into application code |
Citations
packages/registry/src/store/postgres-store.ts— full implementation, includinginit()+maybeMigrateBundled()+putwith provenance merging.packages/registry/src/store/types.ts—RegistryEntityTypeunion (20+ entity types share the one table).
P02. Provenance-Merging Upsert Wrapper
Problem A record may be ingested from many different upstream sources over its lifetime — a citation discovered first via a meta-analysis, then re-encountered via a watchlist hit, then re-validated via a third-party API. You want every upsert to append to a provenance trail (which source files contributed, when), not overwrite the trail. The naive "last write wins" loses the audit history that makes the registry defensible.
The Pattern
interface Provenance {
source_path: string;
ingested_at: string;
note?: string;
}
interface Row<T> {
id: string;
type: string;
entity: T;
provenance: Provenance[];
created_at: string;
updated_at: string;
}
// The merge: dedupe on `source_path`, keep the earliest existing entry.
function mergeProvenance(existing: Provenance[], incoming: Provenance[]): Provenance[] {
const out = new Map<string, Provenance>();
for (const p of existing) out.set(p.source_path, p);
for (const p of incoming) {
if (!out.has(p.source_path)) out.set(p.source_path, p);
}
return [...out.values()];
}
export async function upsert<T>(
store: Map<string, Row<T>>,
row: Row<T>,
): Promise<Row<T>> {
const key = `${row.type}:${row.id}`;
const now = new Date().toISOString();
const existing = store.get(key);
if (existing) {
const merged: Row<T> = {
...row,
provenance: mergeProvenance(existing.provenance, row.provenance),
created_at: existing.created_at, // never moves
updated_at: now,
};
store.set(key, merged);
return merged;
}
const created: Row<T> = {
...row,
created_at: row.created_at ?? now,
updated_at: now,
};
store.set(key, created);
return created;
}
Design decisions
- Provenance is a first-class field, not metadata-tucked-into-entity. Means consumers can audit "where did this row come from" without parsing per-entity-type internals.
- Dedupe key is
source_path. Same source ingesting the same row twice doesn't bloat the trail. Different source paths (even pointing to the same paper) preserve both — that's the audit signal. - Earliest wins on dedupe. First-seen-at is more valuable than last-seen-at for provenance — "we've known this since T" is the discovery question.
created_atnever moves on upsert. Once-and-only-once.updated_atcarries the "last touched" semantic.- Same merge function in JSON store and Postgres store. Identical semantics across backends so consumers can't observe a difference.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Full ingest history preserved across all passes | Provenance arrays grow unbounded — needs periodic compaction at scale |
| Audit "why is this row here" trivially answerable | Every upsert pays the merge cost (linear in provenance length) |
| Same row can carry contributions from many domains | No structured query over provenance — it's an audit log, not an index |
| Mergeable across JSON ↔ DB backends without translation | Adding a new provenance field is a soft schema change with no enforcement |
Citations
packages/registry/src/store/json-store.ts—mergeProvenance()+put()flow.packages/registry/src/store/postgres-store.ts— same merge insideSELECT FOR UPDATEtransaction for concurrent-safety.
P03. Pluggable Persistence Behind a Thin Async Interface
Problem You start with a JSON-file store (zero dependencies, easy local dev), but you know Postgres is coming. If every ingest adapter, resolver, MCP handler, and REST route imports the JSON store directly, the eventual swap becomes a fan-out refactor across the whole codebase. You want the seam now, even though you have only one implementation.
The Pattern
export type EntityType = string;
export interface Row<T> {
id: string;
type: EntityType;
entity: T;
provenance: { source_path: string; ingested_at: string }[];
created_at: string;
updated_at: string;
}
// The seam. Every consumer depends on THIS, not on a concrete class.
export interface Store {
get<T>(type: EntityType, id: string): Promise<Row<T> | null>;
put<T>(row: Row<T>): Promise<Row<T>>;
iterate<T>(type: EntityType): Promise<Row<T>[]>;
count(type: EntityType): Promise<number>;
delete(type: EntityType, id: string): Promise<boolean>;
findBy<T>(type: EntityType, p: (r: Row<T>) => boolean): Promise<Row<T>[]>;
}
// All methods async, even when the JSON impl is sync. That's the discipline:
// callers can't accidentally tie themselves to sync semantics.
export class JsonStore implements Store {
// ... fs.readFileSync wrapped in async wrappers
}
export class PostgresStore implements Store {
// ... real async I/O
}
// Resolution at process boot.
export async function buildStore(env: NodeJS.ProcessEnv): Promise<Store> {
if (env.DATABASE_URL) return new PostgresStore(env.DATABASE_URL);
if (env.STORE_ROOT) return new JsonStore(env.STORE_ROOT);
return new JsonStore(path.join(os.tmpdir(), "store"));
}
Design decisions
- Every method returns a Promise, even when one implementation can answer synchronously. This forces callers to write code that survives the eventual swap.
findBy(predicate)is the escape hatch. A real indexed-query interface is hard to express generically across JSON and SQL backends. Predicate-over-iterate works against both; you accept the scan cost until you have evidence a real index helps.- Implementations are interchangeable at the constructor level. No factory framework — just
new JsonStore(root)ornew PostgresStore(url)and pass either one to consumers. - Resolution function is the only place that knows about both. Every other file imports the
Storeinterface. The factory is the single bottleneck for "which backend am I on" decisions.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Adding a new backend (SQLite, DuckDB, Cloudflare D1) is a new class — no consumer changes | Async-everywhere has a small ergonomic cost for sync-backed implementations |
| Tests use the JSON store; production uses Postgres; same code paths exercised | Predicate-based findBy punts the indexing question until later |
| Consumers can't accidentally depend on backend-specific behavior | Implementations must agree on subtle semantics (timestamp format, merge rules) by convention not by interface |
| Production swap is one-line in the factory, not a fan-out refactor | Some power moves (transactions, CTEs) don't survive the abstraction |
Citations
packages/registry/src/store/types.ts—Store(calledRegistryStore) interface definition.packages/registry/src/store/json-store.ts+packages/registry/src/store/postgres-store.ts— two implementations.apps/web/lib/store.ts—getStore()factory withDATABASE_URL-first resolution.
P04. Deterministic Idempotent IDs from Content Fingerprints
Problem You want to file work into a queue (a verification job, an acquisition request, a research task) from many call sites — schedulers, REST handlers, manual scripts — and you want the same logical work to collapse to one row regardless of how often it's filed. The naive approach (random UUID + duplicate check) needs a query round-trip and a lookup index; you want enqueue to be one writer-only call.
The Pattern
import * as crypto from "node:crypto";
// FNV-1a 32-bit — fast, deterministic, ~no collisions at curator scale.
function hash32(s: string): string {
let h = 0x811c9dc5;
for (let i = 0; i < s.length; i++) {
h ^= s.charCodeAt(i);
h = Math.imul(h, 0x01000193);
}
return (h >>> 0).toString(16).padStart(8, "0");
}
// Strip everything that isn't load-bearing for identity.
function normalize(s: string): string {
return s.toLowerCase().replace(/[^a-z0-9]+/g, "").trim();
}
interface JobInput {
job_type: string;
subject_id: string;
}
// Derive a deterministic id from the inputs that define identity.
export function deriveJobId(input: JobInput): string {
return `job.${input.job_type}.${input.subject_id}`;
}
// Composite shape — when there's no single canonical key.
interface RequestInput {
doi?: string;
title: string;
first_author: string;
year?: number;
}
export function deriveRequestId(req: RequestInput): string {
if (req.doi) return `req:doi:${req.doi.toLowerCase()}`;
const composite = [
normalize(req.title),
normalize(req.first_author),
req.year ?? "noyr",
].join("|");
// Truncated human-readable prefix + hash disambiguator.
const prefix = normalize(req.title).slice(0, 24) || "untitled";
return `req:${prefix}:${req.year ?? "noyr"}:${hash32(composite)}`;
}
// SHA-256 for content-shaped ids (more deterministic across runtimes than FNV).
export function deriveContentId(prefix: string, content: string): string {
const h = crypto.createHash("sha256").update(content).digest("hex").slice(0, 12);
return `${prefix}_${h}`;
}
Design decisions
- DOI / canonical key wins when present. No need to fingerprint; the source already published one. The id-derivation function is a switch: external canonical id first, hash-based fallback second.
- Normalization before hashing. Strip case, punctuation, whitespace variance. Two paths to the same logical entity must produce the same hash — otherwise you've made dedup a probabilistic feature.
- Human-readable prefix on hash-based ids.
acq:engagementatwork:2019:8a3f9b1cis debuggable in raw-JSON tail-files;acq:8a3f9b1cis not. - Hash family by use case. FNV-1a is fast and good enough for collision-resistance at curator scale (<1M rows). SHA-256 (truncated to 12 hex chars) when the id might cross runtimes or you need a stronger guarantee.
- Enqueue becomes UPSERT, not check-then-insert. With deterministic ids, the database's
ON CONFLICT DO NOTHINGis the dedup. No race; no lookup round-trip.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Idempotency is a property of the id derivation, not the call path | Bad normalization rule means false collisions you can't easily unwind |
| Concurrent enqueues from N callers produce 1 row, deterministically | Re-deriving an id after a normalization change orphans the old rows |
| Ids are debuggable when read by hand | Hash prefixes leak source-field hints to anyone reading raw data |
| No "duplicate check before insert" code anywhere | Schema changes that should shift identity (e.g., versioning a job type) require explicit migration |
Citations
packages/registry/src/loop/enrichment-queue.ts—deriveJobId(simple concatenation, reliable for typed subjects).packages/registry/src/loop/acquisition-queue.ts—deriveAcquisitionRequestId(composite hash with human-readable prefix).packages/registry/src/resolve/canonical-variable-resolver.ts—mintVariableId(FNV-1a over normalized name).packages/registry/src/loop/canonical-survey-item-dedup.ts—canonicalItemId(SHA-256 prefix).
P05. Multi-Vendor LLM Consensus with Vote Adjudication
Problem You need to extract structured claims from unstructured text (a paper, a webpage, a transcript) and downstream consumers will act on those claims. A single LLM call has a non-trivial hallucination rate — too risky for "verified" output. You want a verdict whose confidence comes from independent agreement across vendors, not from one model's self-reported confidence.
The Pattern
interface Env {
OPENAI_API_KEY?: string;
ANTHROPIC_API_KEY?: string;
GEMINI_API_KEY?: string;
BUDGET_USD?: string;
}
interface VendorResponse {
provider: string;
doi?: string | null;
title?: string | null;
authors?: string[] | null;
cost_usd: number;
error?: string;
}
type Verdict =
| "verified_strong_consensus" // ≥2 agree on canonical id
| "verified_metadata_consensus" // ≥2 agree on title + authors
| "disagreement"
| "insufficient_providers"
| "error";
async function callOpenAI(prompt: string, key: string): Promise<VendorResponse> { /* ... */ }
async function callAnthropic(prompt: string, key: string): Promise<VendorResponse> { /* ... */ }
async function callGemini(prompt: string, key: string): Promise<VendorResponse> { /* ... */ }
function consensus(responses: VendorResponse[]): { verdict: Verdict; result?: object } {
const ok = responses.filter((r) => !r.error && (r.title || r.doi));
if (ok.length < 2) return { verdict: "insufficient_providers" };
// Strong consensus: ≥2 vendors return the same canonical id.
const counts = new Map<string, number>();
for (const r of ok) if (r.doi) counts.set(r.doi, (counts.get(r.doi) ?? 0) + 1);
for (const [id, n] of counts) {
if (n >= 2) {
const match = ok.find((r) => r.doi === id)!;
return { verdict: "verified_strong_consensus", result: match };
}
}
// Metadata consensus: same title + author overlap across ≥2.
const anchor = ok[0]!;
let agree = 1;
for (let i = 1; i < ok.length; i++) {
if (sameTitle(anchor.title, ok[i]!.title) && authorOverlap(anchor.authors, ok[i]!.authors) >= 0.5) {
agree++;
}
}
if (agree >= 2) return { verdict: "verified_metadata_consensus", result: anchor };
return { verdict: "disagreement" };
}
export async function validate(prompt: string, env: Env) {
const tasks: Promise<VendorResponse>[] = [];
if (env.OPENAI_API_KEY) tasks.push(callOpenAI(prompt, env.OPENAI_API_KEY));
if (env.ANTHROPIC_API_KEY) tasks.push(callAnthropic(prompt, env.ANTHROPIC_API_KEY));
if (env.GEMINI_API_KEY) tasks.push(callGemini(prompt, env.GEMINI_API_KEY));
if (tasks.length === 0) {
return { verdict: "insufficient_providers" as const, cost_usd: 0 };
}
const responses = await Promise.all(tasks);
const cost = responses.reduce((s, r) => s + r.cost_usd, 0);
const budget = Number(env.BUDGET_USD ?? "2");
if (cost > budget) return { verdict: "error" as const, cost_usd: cost };
return { ...consensus(responses), cost_usd: cost, responses };
}
Design decisions
- Independent vendors, not multiple calls to one model. Cross-vendor agreement guards against shared training-data biases. Two calls to the same model are nearly perfectly correlated.
- Strong consensus uses canonical ids first. When two vendors both return the same DOI, the verdict is rock-solid. Fallback to metadata fuzzy-match only when ids are absent.
insufficient_providersis a first-class verdict. "Only one model available" is not "verified by one model" — the result is honest absence-of-consensus, not a fake yes.- Per-call budget guard. Multi-vendor fan-out racks up tokens fast. The budget is a circuit breaker that returns
errorrather than silently overspending. - Strict JSON output format. Force
response_format: { type: "json_object" }(OpenAI), system-prompt JSON requirement (Anthropic/Gemini), and parse defensively — most vendors leak markdown code fences around the JSON. - Errored vendors don't count. Drop them from the consensus pool; only successful structured outputs vote.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Defensible "verified" verdict backed by cross-vendor agreement | 3× the per-call cost (and latency, if not parallelized) |
| Disagreement is itself a signal — surfaces "uncertain" claims to curator | Vendor pricing changes silently; cost estimates drift |
| Tolerates individual vendor outages without falling to zero confidence | Adding a 4th vendor is a new branch, not a config change |
| Honest "insufficient providers" prevents accidental single-model trust | Strict JSON enforcement breaks when a vendor changes their wrapper |
Citations
packages/literature/src/multimodel/validate-non-doi.ts— full implementation across OpenAI, Anthropic, Gemini with consensus + budget guard.packages/registry/src/loop/handlers/verify-effect-size-claim.ts— same shape for verifying extracted claims against cited sources (PRN-032 novelty verification).
P06. Vendored-Local Type Mirrors Awaiting Upstream Package Publish
Problem You're building against a shared package whose new version isn't published yet. The team has agreed on the new types — they live in a draft PR, a coordinator brief, a planning doc — but the package registry won't resolve them until the upstream owner pushes. You don't want to wait; you also don't want a local fork that diverges.
The Pattern
// File: src/types/upstream-mirror.ts
//
// MIRRORS: @canonical/shared-types@0.11.0 (not yet published).
// When v0.11.0 lands on the registry, swap this file for:
// export type { Intervention, DeliveryModality } from "@canonical/shared-types";
// The shapes below are STRUCTURALLY IDENTICAL — no consumer changes needed.
/** Mirrors `DeliveryModality` from @canonical/shared-types@0.11.0. */
export type DeliveryModality =
| "training" | "coaching" | "tech" | "policy" | "other";
/** Mirrors `CostBand` from @canonical/shared-types@0.11.0. */
export type CostBand = "low" | "medium" | "high" | "enterprise";
/**
* Mirrors `Intervention` from @canonical/shared-types@0.11.0.
* Structurally identical — TypeScript's structural typing means consumers
* can `import type { Intervention } from "./upstream-mirror"` today and
* `from "@canonical/shared-types"` post-publish without touching call sites.
*/
export interface Intervention {
intervention_id: string;
name: string;
targets: string[];
mechanism: string;
delivery_modality: DeliveryModality[];
cost_band?: CostBand;
}
// Re-export at the package boundary so internal callers always import from
// here, never directly from `@canonical/shared-types`. The swap becomes
// a one-file edit.
//
// File: packages/registry/src/index.ts
export type { Intervention, DeliveryModality } from "./types/upstream-mirror";
Design decisions
- Header comment is mandatory. Reader has to know it's a mirror, what version it tracks, and what triggers the swap. Without that, future contributors edit the local copy and create real drift.
- Structural identity is the contract. TypeScript's structural typing means a local
Interventionand the eventual publishedInterventionare interchangeable as long as the field-by-field shape matches. The swap is literallys/from ".\/upstream-mirror"/from "@canonical\/shared-types"/g. - Internal callers always import from the package barrel. Never from the mirror file directly, never from the not-yet-published package. The barrel is the single bottleneck.
- Mirrors live in a dedicated module. Not scattered in
types.tsfor each capability. Future swap is grep-able:find . -name "*-types.ts" -path "*mirror*". - Bounded lifetime. Mirrors should ship with an explicit swap ticket (track in your assignment queue, your
TODOlist, somewhere watched). Mirrors that persist beyond their upstream publish become real forks.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Unblocks parallel work — consumers don't wait for upstream to publish | Mirror can drift if upstream changes spec before publishing |
| Swap is a one-file edit, not a fan-out refactor | Discipline-only — nothing enforces the structural-identity claim |
| Header comment makes intent legible | Long-lived mirrors are an architectural smell |
| TypeScript catches divergence at the consumer boundary on swap | Reviewing a mirror requires diffing against an unpublished spec |
Citations
packages/registry/src/loop/intervention-types.ts— mirrorsInterventionfrom@people-analyst/measurement-core@0.11.0ahead of publish.packages/registry/src/loop/equivalence-types.ts,critique-types.ts,verification-types.ts— same pattern across four other deltas.packages/registry/src/index.ts— barrel re-export so consumers never see the mirror.
P07. Idempotent Background Job Queue with Exponential-Backoff Retry
Problem You have background work (refresh metadata from an external API, run a slow LLM extraction, recompute a derived view) that needs to survive process restarts, retry on transient failure, and not double-run when two callers enqueue the same job concurrently. You don't want to operate a separate job-runner service.
The Pattern
type JobStatus = "pending" | "running" | "complete" | "failed" | "skipped";
type Priority = "high" | "normal" | "low";
interface Job {
job_id: string; // deterministic — same logical work → same id
job_type: string;
subject_id: string;
status: JobStatus;
priority: Priority;
scheduled_at: string;
started_at?: string;
completed_at?: string;
error?: string;
retries: number;
next_retry_at?: string;
}
interface Handler {
job_type: string;
handle(job: Job, ctx: { now: Date }): Promise<{ produced?: string[] }>;
}
interface RetryPolicy {
max_retries: number;
base_delay_ms: number;
max_delay_ms: number;
}
const DEFAULT_RETRY: RetryPolicy = {
max_retries: 3,
base_delay_ms: 5_000,
max_delay_ms: 5 * 60_000,
};
function backoff(retries: number, p: RetryPolicy): number {
return Math.min(p.base_delay_ms * 2 ** retries, p.max_delay_ms);
}
export class Worker {
private readonly handlers: Map<string, Handler>;
constructor(
private readonly store: Store,
handlers: Handler[],
private readonly policy = DEFAULT_RETRY,
) {
this.handlers = new Map(handlers.map((h) => [h.job_type, h]));
}
/** Idempotent enqueue. Same (job_type, subject_id) twice = one row. */
async enqueue(input: { job_type: string; subject_id: string; priority?: Priority }) {
const job_id = `job.${input.job_type}.${input.subject_id}`;
const existing = await this.store.get<Job>("job", job_id);
if (existing) {
const s = existing.entity.status;
if (s === "pending" || s === "running") return { job_id, created: false };
// Terminal status — caller wants another pass. Re-file to pending.
await this.persist({ ...existing.entity, status: "pending", retries: 0,
error: undefined, next_retry_at: undefined });
return { job_id, created: false };
}
await this.persist({
job_id, job_type: input.job_type, subject_id: input.subject_id,
status: "pending", priority: input.priority ?? "normal",
scheduled_at: new Date().toISOString(), retries: 0,
});
return { job_id, created: true };
}
/** Run every pending + due job. */
async tick(now = new Date()) {
const pending = await this.store.findBy<Job>("job", (r) => r.entity.status === "pending");
const due = pending.filter((r) => this.isDue(r.entity, now));
due.sort((a, b) => /* high > normal > low; oldest first within tier */ 0);
for (const row of due) {
const handler = this.handlers.get(row.entity.job_type);
if (!handler) {
await this.persist({ ...row.entity, status: "skipped",
error: `no handler for ${row.entity.job_type}` });
continue;
}
await this.runOne(row.entity, handler, now);
}
}
private async runOne(job: Job, handler: Handler, now: Date) {
await this.persist({ ...job, status: "running", started_at: now.toISOString() });
try {
const result = await handler.handle(job, { now });
await this.persist({ ...job, status: "complete",
completed_at: new Date().toISOString() });
} catch (err) {
const retries = job.retries + 1;
if (retries > this.policy.max_retries) {
await this.persist({ ...job, status: "failed",
error: (err as Error).message, retries });
} else {
const nextRetryAt = new Date(Date.now() + backoff(job.retries, this.policy));
await this.persist({ ...job, status: "pending", retries,
next_retry_at: nextRetryAt.toISOString(),
error: (err as Error).message });
}
}
}
private isDue(job: Job, now: Date): boolean {
const scheduledOk = Date.parse(job.scheduled_at) <= now.getTime();
const retryOk = !job.next_retry_at || Date.parse(job.next_retry_at) <= now.getTime();
return scheduledOk && retryOk;
}
private async persist(job: Job) {
await this.store.put({ id: job.job_id, type: "job", entity: job,
provenance: [], created_at: "", updated_at: "" });
}
}
Design decisions
- Deterministic job_id is the dedup primitive. Same
(job_type, subject_id)enqueued from N callers produces 1 row. No lookup-then-insert race. - Re-enqueue of a terminal job is "refile". A
completeorfailedjob, re-enqueued, is the caller saying "do it again." Reset retries and scheduled_at; status → pending. - Status transitions are append-only by convention.
pending → running → complete(orfailed, or back topendingfor retry). The store retains the latest state; the provenance trail carries the history. - Exponential backoff capped at
max_delay_ms. Otherwise a job that keeps failing withmax_retries=10would schedule its 10th retry days out — useless for visibility. - No
next_retry_atclearing on success. Failed-then-succeeded jobs retain the trail of attempts in their provenance. - Worker is a function, not a service.
tick()is called by cron, by a script, by an admin button. The worker doesn't own its scheduling cadence.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Survives restarts — state lives in the store, not in process memory | Single-process by default; needs locking semantics for multi-worker |
| Idempotent at enqueue and at handler level (the handlers must be idempotent too) | Polling overhead per tick — not push-driven |
| New job types are new handlers — no schema changes | No native cron expression — cadence lives outside the worker |
| Retry policy is per-worker, not per-job — keeps reasoning simple | Backoff is wall-clock, not service-specific — a flaky upstream can starve other jobs |
Citations
packages/registry/src/loop/enrichment-queue.ts— fullEnrichmentQueue+EnrichmentWorkerwith retry policy, priority ordering, no-handler-skip.packages/registry/src/loop/handlers/refresh-citation-metadata.ts— minimal handler example wrapping an upstream API call.packages/registry/src/loop/verify-enqueue.ts— composing the queue with a deterministic-id helper to file verification jobs idempotently from many call sites.
P08. License-Gated Field Stripping at the Response Boundary
Problem Your catalog includes records whose source carries usage restrictions (proprietary text, attribution-required licenses, copyrighted item wording). You're building a public-facing API. Some fields can be returned safely; others can't. If even one handler forgets the check, you've published copyrighted text under your name. You want the gate to live at the response boundary, not threaded through every reader.
The Pattern
import { z } from "zod";
// Closed allowlist — defaults to "deny" for anything not explicitly listed.
const PUBLISHABLE_LICENSES = new Set([
"public_domain",
"open",
"attribution_required", // if attribution is rendered alongside
]);
interface Parent {
parent_id: string;
usage_restrictions?: {
field_license?: string;
publisher?: string;
};
}
interface Item {
item_id: string;
parent_id: string;
text?: string; // license-gated
ordinal?: number; // safe
}
function resolveLicense(parent: Parent | null): string | null {
return parent?.usage_restrictions?.field_license ?? null;
}
function isPublishable(license: string | null): boolean {
if (!license) return false; // unknown → restricted by default
return PUBLISHABLE_LICENSES.has(license);
}
function stripGatedFields(item: Item): Omit<Item, "text"> {
const { text: _t, ...rest } = item;
return rest;
}
// Discriminated union response — caller pattern-matches on status.
export const LookupOutput = z.discriminatedUnion("status", [
z.object({ status: z.literal("ok"), item: z.object({}).passthrough().nullable() }),
z.object({ status: z.literal("restricted"), reason: z.string(), parent_id: z.string() }),
]);
export async function lookupItem(store: Store, id: string) {
const itemRow = await store.get<Item>("item", id);
if (!itemRow) return { status: "ok" as const, item: null };
const parentRow = await store.get<Parent>("parent", itemRow.entity.parent_id);
const license = resolveLicense(parentRow?.entity ?? null);
if (!isPublishable(license)) {
return {
status: "restricted" as const,
reason: `parent license="${license ?? "unknown"}" — field rendering forbidden`,
parent_id: itemRow.entity.parent_id,
};
}
return { status: "ok" as const, item: itemRow.entity };
}
// Bulk search: never elide silently — surface the restricted count.
export async function searchItems(store: Store, q: string) {
const all = await store.findBy<Item>("item", (r) => /* ... */ true);
let restricted = 0;
const matches = await Promise.all(all.map(async (row) => {
const parent = await store.get<Parent>("parent", row.entity.parent_id);
if (!isPublishable(resolveLicense(parent?.entity ?? null))) {
restricted++;
return stripGatedFields(row.entity);
}
return row.entity;
}));
return { matches, restricted_count: restricted };
}
Design decisions
- Default-deny allowlist, not denylist. Unknown license → restricted. A new license value lands without anyone updating code; nothing accidentally leaks.
- Gate at the response boundary, not in the store. The store carries everything; the public API strips. Internal consumers (research scripts, admin UI) get the unfiltered view; only public-facing handlers run the gate.
{ status: "restricted", reason }is a distinct response shape. Not a null, not a stripped object pretending to be complete. The caller has to acknowledge the gate happened — discriminated union forces it.- Search surfaces
restricted_countexplicitly. Silent elision is worse than honest "we found 14 matches but can't show you the text of 9 of them." - License resolution has a legacy/back-compat fallback. Older data uses
permissions.usage_type; newer data usesusage_restrictions.field_license. The resolver checks both — but the allowlist is one set.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Default-deny prevents accidental leaks on new license values | Public handlers pay a parent-lookup per item — N+1 if not joined |
| Gate logic lives in one module — auditable in one place | Internal vs public boundary must be enforced by handler hygiene |
| Discriminated union forces callers to handle the restricted case | Adding a new gateable field requires editing the strip function |
| Restricted-count gives honest "we have more, but" signal in search | License vocabulary drift across sources is a real maintenance cost |
Citations
packages/registry/src/mcp/handlers/items.ts— full implementation withPUBLISHABLE_LICENSESallowlist,resolveLicenselegacy-fallback, lookup discriminated union, and bulk-searchrestricted_count.
P09. Cron-Driven Watchlist Scheduler with Cadence-Based Next-Run
Problem You want persistent searches against external sources (a search query, an RSS feed, a third-party API monitor) that run on their own cadence and route matches into downstream processing. You don't want every search to be a separate cron line; you don't want one giant "search everything" job either. Each search should declare its own cadence and route, and the scheduler should fire only the ones that are due.
The Pattern
type Cadence = "hourly" | "daily" | "weekly" | "monthly";
type SourceName = "scholar" | "openalex" | "rss" | "manual";
const CADENCE_MS: Record<Cadence, number> = {
hourly: 60 * 60 * 1000,
daily: 24 * 60 * 60 * 1000,
weekly: 7 * 24 * 60 * 60 * 1000,
monthly: 30 * 24 * 60 * 60 * 1000,
};
interface Watchlist {
watchlist_id: string;
description: string;
query: { sources: SourceName[]; keywords?: string[] };
cadence: Cadence;
last_run_at?: string;
next_run_at: string;
matches_to_date: number;
routes_to: "downstream_queue" | "review_queue";
active: boolean;
}
interface Match { external_id: string; title: string; url?: string; }
interface Source { name: SourceName; search(w: Watchlist): Promise<Match[]>; }
export class Scheduler {
private readonly sources: Map<SourceName, Source>;
constructor(private readonly store: Store, sources: Source[]) {
this.sources = new Map(sources.map((s) => [s.name, s]));
}
/** Fire every active watchlist whose next_run_at ≤ now. */
async runDue(now = new Date()) {
const watchlists = await this.store.iterate<Watchlist>("watchlist");
const stats = { considered: 0, run: 0, not_due: 0, inactive: 0 };
for (const row of watchlists) {
stats.considered++;
const w = row.entity;
if (!w.active) { stats.inactive++; continue; }
if (Date.parse(w.next_run_at) > now.getTime()) { stats.not_due++; continue; }
await this.runOne(w, now);
stats.run++;
}
return stats;
}
private async runOne(w: Watchlist, now: Date) {
let routed = 0;
let errors: string[] = [];
for (const sourceName of w.query.sources) {
const source = this.sources.get(sourceName);
if (!source) { errors.push(`no adapter: ${sourceName}`); continue; }
try {
const matches = await source.search(w);
for (const m of matches) {
const stored = await this.routeMatch(w, m);
if (stored === "new") routed++;
}
} catch (err) {
errors.push((err as Error).message);
// Continue with remaining sources — one bad source ≠ whole tick fails.
}
}
// Always advance next_run_at — even on partial failure — so we don't busy-loop.
const next = new Date(now.getTime() + CADENCE_MS[w.cadence]).toISOString();
await this.store.put({
...w as object,
last_run_at: now.toISOString(),
next_run_at: next,
matches_to_date: w.matches_to_date + routed,
});
}
/** Idempotent on a deterministic match-id derived from source + external_id. */
private async routeMatch(w: Watchlist, m: Match): Promise<"new" | "exists"> {
const id = `${w.query.sources[0]}:${m.external_id}`;
const existing = await this.store.get("downstream_item", id);
if (existing) return "exists";
await this.store.put({ id, type: "downstream_item", entity: m, /* ... */ });
return "new";
}
}
Design decisions
- Watchlists are data, not code. A new search isn't a new code path — it's a new row in the watchlist table. Add / pause / tune from an admin UI; no deploy.
next_run_atadvances unconditionally per tick. Even on source error, we move the cursor forward. Otherwise a chronically-failing source busy-loops the scheduler.- Per-watchlist source loop is independent. One bad source within a watchlist's source list doesn't kill the whole watchlist — we collect errors and continue.
active: booleanseparates "paused" from "deleted". Deletion loses history; pausing keeps the row +matches_to_date+last_run_atfor forensics.- Match routing is idempotent at the downstream id. The scheduler's job is "deliver matches at least once"; downstream deduplication catches duplicates from re-runs.
- Cadence is enum, not duration. "Daily" is more legible than
86400000in audit logs and admin UIs. The enum-to-ms table is one place; adding"quarterly"is two lines.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| New watchlists ship without code changes | No "run after X event" cadence — purely time-driven |
| Sources are pluggable (mock for tests, real for prod) | Source adapter quality varies; failure modes are heterogeneous |
| Per-watchlist failure isolation | Watchlists with overlapping queries waste API calls |
| Pause-vs-delete semantics preserve audit trail | Cadence drift over long runs — next_run_at is wall-clock, not aligned |
Citations
packages/registry/src/loop/literature-monitor.ts— fullLiteratureMonitorSchedulerwith cadence table, per-source loop, idempotent routing, and pluggableLiteratureSourceadapters.packages/registry/src/loop/sources/index.ts— adapter-factory that wires real sources from env vars.
P10. Read-Only Gap Detector with Stable Gap IDs
Problem You want to surface "what's missing" from a registry — incomplete records, under-evidenced claims, stale watchlists, unverified citations — as actionable items. You want curators to act on them (manually, or by filing follow-up work). You explicitly don't want the detector to fix anything itself, because the registry's value depends on every change being curator-approved.
The Pattern
type GapType =
| "thin_evidence" // record has fewer than N supporting sources
| "unverified_claim" // extraction status: needs review
| "no_linked_evidence" // verified record but no downstream linkage
| "stale_watchlist"; // not run in 2× cadence
interface Gap {
gap_id: string; // deterministic — stable across runs
gap_type: GapType;
subjects: string[]; // entity ids the gap concerns
detected_at: string;
severity: "low" | "med" | "high";
suggested_action: string; // human-readable next step
context: Record<string, unknown>;
}
function hash32(s: string): string {
let h = 0x811c9dc5;
for (let i = 0; i < s.length; i++) {
h ^= s.charCodeAt(i);
h = Math.imul(h, 0x01000193);
}
return (h >>> 0).toString(16).padStart(8, "0");
}
function gapId(type: GapType, subjects: string[]): string {
const sorted = [...subjects].sort().join("|");
return `gap:${type}:${hash32(sorted)}`;
}
// PURE FUNCTION over store snapshot. No store mutations. No side effects.
export async function detectGaps(store: Store): Promise<Gap[]> {
const out: Gap[] = [];
const now = new Date().toISOString();
// Thin evidence: records with < 3 supporting sources.
const records = await store.iterate<{ supports: string[] }>("record");
for (const row of records) {
if ((row.entity.supports ?? []).length < 3) {
out.push({
gap_id: gapId("thin_evidence", [row.id]),
gap_type: "thin_evidence",
subjects: [row.id],
detected_at: now,
severity: "med",
suggested_action: `find ≥${3 - (row.entity.supports ?? []).length} more sources`,
context: { current_support_count: (row.entity.supports ?? []).length },
});
}
}
// Stale watchlists: not run in 2× cadence.
const cadenceMs = { daily: 86_400_000, weekly: 7 * 86_400_000 };
const watchlists = await store.iterate<{
cadence: keyof typeof cadenceMs;
last_run_at?: string;
}>("watchlist");
for (const row of watchlists) {
const cadence = cadenceMs[row.entity.cadence];
const last = row.entity.last_run_at ? Date.parse(row.entity.last_run_at) : 0;
if (Date.now() - last > 2 * cadence) {
out.push({
gap_id: gapId("stale_watchlist", [row.id]),
gap_type: "stale_watchlist",
subjects: [row.id],
detected_at: now,
severity: "low",
suggested_action: "investigate scheduler / source health",
context: { last_run_at: row.entity.last_run_at, cadence: row.entity.cadence },
});
}
}
return out;
}
Design decisions
- Read-only. The detector reads; it never writes. A separate consumer (a digest script, a follow-up task creator, a notification) can act on the output. This separation is the load-bearing discipline — auto-mutation in the detector path would betray the audit promise.
- Deterministic gap_id from sorted subjects. Re-running detection produces the same id for the same gap. Consumers persisting gaps (or filing follow-up work from them) can dedupe.
- Gap types are a closed enum. Each one carries a
suggested_actiontemplate so the consumer renders human-readable guidance, not raw gap-type strings. - Severity is bucketed, not raw. "Med" reads as actionable; "score: 0.42" reads as artifact. Severity buckets are stable across small data shifts.
subjects[]is a list, not a single id. Some gaps involve multiple entities (a missing edge between two nodes; a triple that's missing one corner). Singleton case is the array of length 1.- Pure function signature. No environment access, no env-var reads, no
Date.now()outside the explicitdetected_atstamp — trivially unit-testable.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Curator stays in the loop — nothing auto-fixed | Detection latency = how often you call it; gaps linger until next run |
| Same gap across runs produces the same id (idempotent downstream) | Adding a new gap type means a new code path, not a new row |
| Trivially unit-testable | Long iteration over large stores is O(N) per type |
| Severity + suggested_action make output renderable directly | Hard-coded thresholds (< 3 sources) drift from true policy over time |
Citations
packages/registry/src/loop/gap-detector.ts— fulldetectRegistryGaps()with five gap types, deterministicgap_id, suggested-task-type mapping.packages/registry/src/loop/research-task.ts—upsertResearchTaskFromGap(), the curator-policy-compliant consumer that turns gaps into trackable follow-up items.
P11. Process-Singleton Resource with Resolution-Order Fallback
Problem A heavy resource (DB connection pool, ingest pipeline, model client, secret-resolver) needs to be built once per process and shared across every request handler. You also want a deterministic order for which backend to use: production env-var first, dev override second, bundled default third, tmp fallback fourth. Lazy build, no race on first request, no rebuild on hot-reload.
The Pattern
import * as fs from "node:fs";
import * as os from "node:os";
import * as path from "node:path";
// Persist across HMR / module-graph re-imports in dev.
declare global {
var __sharedResource: Resource | undefined;
var __sharedResourceReady: Promise<Resource> | undefined;
}
interface Resource { /* whatever shape */ }
function resolveBackend(env: NodeJS.ProcessEnv): {
kind: "primary" | "env-override" | "bundled" | "tmp";
config: string;
} {
// 1. Primary: a real backend if its env var is set.
if (env.DATABASE_URL) return { kind: "primary", config: env.DATABASE_URL };
// 2. Env override for local dev (e.g., explicit JSON store path).
if (env.RESOURCE_ROOT) return { kind: "env-override", config: env.RESOURCE_ROOT };
// 3. Bundled default — what production uses when no env is set.
const bundled = path.join(process.cwd(), "data/bundled");
if (fs.existsSync(bundled)) return { kind: "bundled", config: bundled };
// 4. Tmp fallback — last resort for local dev with no setup.
return { kind: "tmp", config: path.join(os.tmpdir(), "shared-resource") };
}
function redact(s: string): string {
try {
const u = new URL(s);
if (u.password) u.password = "***";
return u.toString();
} catch { return "<unparseable>"; }
}
async function build(): Promise<Resource> {
const { kind, config } = resolveBackend(process.env);
console.log(`[shared-resource] using ${kind} backend at ${redact(config)}`);
// ... build the resource based on `kind`
return {} as Resource;
}
/** Lazy, race-safe, idempotent. */
export function getResource(): Promise<Resource> {
if (globalThis.__sharedResource) {
return Promise.resolve(globalThis.__sharedResource);
}
if (!globalThis.__sharedResourceReady) {
globalThis.__sharedResourceReady = build().then((r) => {
globalThis.__sharedResource = r;
return r;
});
}
return globalThis.__sharedResourceReady;
}
Design decisions
- Cache the promise, not the resolved value. Two concurrent first-callers both
awaitthe same in-flight build; we don't kick off two parallel builds. (Caching only the resolved value is a classic race-condition footgun.) globalThiscache survives HMR. Module re-imports during dev hot-reload would re-build the resource if it lived in module scope.globalThislives across the boundary.- Resolution order is explicit and code-encoded. Not a config file. Reading the function tells you everything about backend selection.
- Each branch announces itself.
console.logat boot tells you which backend resolved. Without this, "why is my dev pulling from production?" debugging is brutal. - Sensitive config is redacted in logs. Connection strings often include credentials; strip them before logging.
- Fallback isn't a silent default. Logging
kind: "tmp"makes "I'm running on an ephemeral local backend" visible — so nobody ships against the wrong env.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Build cost paid once per process | First request pays the build latency |
| Concurrent first-callers don't race | Build failures fail every caller — no per-call retry |
| Same code works for dev / preview / prod via resolution order | Resolution order changes are easy to get wrong; needs tests |
| Surviving HMR avoids dev churn | globalThis is a typed-loophole; needs declare global boilerplate |
Citations
apps/web/lib/store.ts—getStore()withDATABASE_URL→STORE_ROOT→ bundled → tmp fallback, promise-caching onglobalThis, redacted-URL logging.
P12. Redis-Primary, In-Memory-Fallback Rate Limiter
Problem You want per-consumer rate limits in production where many serverless instances share traffic — a per-instance counter is useless because each instance only sees a fraction of the load. But local dev shouldn't require Redis, and a Redis outage shouldn't 500 your whole API.
The Pattern
interface RateLimitResult {
allowed: boolean;
limit: number;
remaining: number;
reset_at: string;
retry_after_seconds: number;
}
interface RateSpec { count: number; window_ms: number; }
const buckets: Map<string, { hits: number[]; }> = new Map();
function getRedis(): { incr: (k: string) => Promise<number>;
expire: (k: string, s: number) => Promise<unknown>; } | null {
if (!process.env.REDIS_URL) return null;
// Lazy build a singleton client; return null when unavailable.
return /* upstash redis instance or similar */ null;
}
export async function check(consumer: string, spec: RateSpec, now = Date.now()): Promise<RateLimitResult> {
const redis = getRedis();
if (redis) {
try {
return await checkRedis(redis, consumer, spec, now);
} catch (err) {
// Fail-open to in-memory so a Redis blip doesn't 500 the API.
console.warn("[rate-limit] Redis check failed; using in-memory", err);
// fall through
}
}
return checkInMemory(consumer, spec, now);
}
async function checkRedis(redis: NonNullable<ReturnType<typeof getRedis>>,
consumer: string, spec: RateSpec, now: number): Promise<RateLimitResult> {
// Fixed-window counter. Key includes the window index so we get
// automatic per-window isolation without explicit cleanup.
const windowIndex = Math.floor(now / spec.window_ms);
const key = `ratelimit:${consumer}:${windowIndex}`;
const windowSec = Math.max(1, Math.floor(spec.window_ms / 1000));
const count = await redis.incr(key);
if (count === 1) await redis.expire(key, windowSec + 1);
const resetMs = (windowIndex + 1) * spec.window_ms;
if (count > spec.count) {
return {
allowed: false, limit: spec.count, remaining: 0,
reset_at: new Date(resetMs).toISOString(),
retry_after_seconds: Math.max(1, Math.ceil((resetMs - now) / 1000)),
};
}
return {
allowed: true, limit: spec.count,
remaining: Math.max(0, spec.count - count),
reset_at: new Date(resetMs).toISOString(),
retry_after_seconds: 0,
};
}
function checkInMemory(consumer: string, spec: RateSpec, now: number): RateLimitResult {
// True sliding window — per-instance only. OK for dev; not a substitute
// for Redis in production with multiple instances.
const bucket = buckets.get(consumer) ?? { hits: [] };
buckets.set(consumer, bucket);
const cutoff = now - spec.window_ms;
while (bucket.hits.length > 0 && bucket.hits[0]! <= cutoff) bucket.hits.shift();
if (bucket.hits.length >= spec.count) {
const resetMs = bucket.hits[0]! + spec.window_ms;
return {
allowed: false, limit: spec.count, remaining: 0,
reset_at: new Date(resetMs).toISOString(),
retry_after_seconds: Math.max(1, Math.ceil((resetMs - now) / 1000)),
};
}
bucket.hits.push(now);
const resetMs = bucket.hits[0]! + spec.window_ms;
return {
allowed: true, limit: spec.count,
remaining: Math.max(0, spec.count - bucket.hits.length),
reset_at: new Date(resetMs).toISOString(),
retry_after_seconds: 0,
};
}
Design decisions
- Redis fixed-window, not sliding-log. Sliding-log needs more keys per check; fixed-window is one INCR + one EXPIRE. Acceptable edge case: boundary bursts can admit 2× the limit briefly. Document it; don't hide it.
EXPIREunconditionally each call. NX-conditional EXPIRE requires a Lua script on Upstash REST. Re-setting the TTL on every INCR is harmless and keeps the pipeline at two ops.- Fail-open on Redis error. A Redis outage should not take down rate limiting (and thus the API). Log it loudly, fall back to in-memory, and surface the degradation in metrics.
- In-memory is per-instance. Document this explicitly — the effective ceiling becomes
instances × limit. Production must run with Redis to get a meaningful global ceiling. Retry-Aftersemantics in the response. Whenallowed: false, populateretry_after_secondsso the API can forward it as an HTTP header.- Reset timestamp in result, even when allowed. Callers can show "X requests remaining, resets in N seconds" without a second probe.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Production gets globally-correct limits via Redis | Dual implementations — two code paths to keep in sync |
| Local dev works out of the box without Redis | Boundary-burst edge case admits 2× briefly |
| Redis outages degrade gracefully (in-memory fallback) | In-memory fallback in production with multiple instances is incorrect |
Single check() API across both paths | Failover detection is per-process — concurrent processes can disagree |
Citations
packages/registry/src/rest/rate-limit.ts— fullcheckRestRateLimitwith parse-rate, env-var-driven per-consumer specs, fail-open fallback, test-only state resetters.packages/registry/src/rest/upstash-client.ts— client lazy-init that returns null when env is absent.
P13. Curator-Mediated Promotion (Read-Only Detection, Approval-Only Mutation)
Problem Your pipeline extracts structured claims from messy sources (LLM extractors, automated validators, third-party APIs). Each step produces "candidate" output — best-effort, frequently wrong, sometimes hallucinated. You want a system that surfaces candidates clearly but never lets an unsupervised extractor write to the canonical store. The promotion from "candidate" to "canonical" must be an explicit human action.
The Pattern
// Three statuses, three jurisdictions:
// - "unverified" — extractor output; lives in the store but tagged
// - "verified" — multi-vendor consensus (P05); machine-promoted, machine-trusted
// - "rejected" — curator marked as wrong; never re-enqueue
type VerificationStatus = "unverified" | "verified" | "disputed" | "rejected";
interface Claim {
claim_id: string;
payload: { /* the actual claim */ };
verification_status: VerificationStatus;
verification_log: VerificationEntry[];
}
interface VerificationEntry {
pass_id: string; // deterministic from (verifier + timestamp + subject)
verifier: string;
verified_at: string;
verdict: "confirmed" | "rejected" | "uncertain";
evidence?: string;
note: string;
}
// Verification handler: writes to the LOG, may flip status to verified/disputed.
// NEVER promotes to "canonical-truth" outside its narrow lane.
export class VerificationHandler {
readonly job_type = "verify_claim";
async handle(job: Job, ctx: { store: Store; now: Date }) {
const row = await ctx.store.get<Claim>("claim", job.subject_id);
if (!row) throw new Error(`claim not found: ${job.subject_id}`);
// Curator-policy: rejected rows never re-enter the pipeline.
if (row.entity.verification_status === "rejected") {
return { note: "skipped: previously curator-rejected" };
}
const responses = await runMultiVendor(row.entity);
const adjudication = adjudicate(responses);
const newEntry: VerificationEntry = {
pass_id: derivePassId(job, ctx.now),
verifier: `multi-vendor:${responses.map((r) => r.provider).join("+")}`,
verified_at: ctx.now.toISOString(),
verdict: adjudication.verdict,
evidence: adjudication.evidence,
note: adjudication.note,
};
// Append to log; flip status based on adjudication.
// The pipeline machine-promotes "uncertain → uncertain" and
// "verified → verified" but NEVER "anything → canonical" without curator.
await ctx.store.put({
...row,
entity: {
...row.entity,
verification_status: adjudication.status, // verified | disputed | unverified
verification_log: [...row.entity.verification_log, newEntry],
},
});
return { produced: [job.subject_id] };
}
}
// Curator-side: explicit action. Lives in admin UI / CLI, not the pipeline.
export async function curatorPromote(store: Store, claim_id: string, note: string) {
const row = await store.get<Claim>("claim", claim_id);
if (!row) throw new Error("not found");
if (row.entity.verification_status !== "verified") {
throw new Error("only verified claims can be curator-promoted");
}
// Now we write to the canonical-truth table.
await store.put({ id: claim_id, type: "canonical_claim", entity: row.entity.payload,
provenance: [{ source_path: `curator:${note}`,
ingested_at: new Date().toISOString() }] });
}
export async function curatorReject(store: Store, claim_id: string, reason: string) {
const row = await store.get<Claim>("claim", claim_id);
if (!row) throw new Error("not found");
// Rejection is recorded but the row stays — for forensics + dedup.
await store.put({
...row,
entity: { ...row.entity, verification_status: "rejected" },
});
}
Design decisions
- Three jurisdictions encoded as status.
unverified= extractor output.verified= machine-promoted via cross-vendor consensus.canonical= curator-promoted. The pipeline owns the first two; the curator owns the third. rejectedis sticky. Once a curator says "no," the pipeline never re-attempts. The defensive short-circuit in every handler enforces this even if a downstream dispatcher tries to re-enqueue.- Verification log is append-only. Every pass leaves a stamp — verifier id, timestamp, verdict, evidence. The store carries the audit trail, not just the latest verdict.
- Promotion is a separate code path.
curatorPromote()lives in admin tooling. It's never called from a handler. The handlers can only flip status within the verification jurisdiction; canonical promotion is out-of-band. - Defensive guards at every entry point. Even the verification handler checks
if (status === "rejected") skip— the queue's enqueue-side dedup catches most, but the handler trusts no caller. - No silent fail-to-verify. When zero verification providers are configured, the row stays
unverifiedwith a "no providers" note — not a fakeverified.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Curator authority is structural, not policy — pipeline can't betray it | Curator becomes the bottleneck for canonical-truth growth |
| Rejected-sticky prevents repeated wasted extraction passes | Curators must rejection-reason; otherwise rejection rationale is lost |
| Three-status model maps cleanly to UI (queue / inbox / archive) | Schema must thread verification_status through every claim type |
| Audit log answers "why is this canonical" / "why was this rejected" | Verification log can grow large per row; needs archival at scale |
Citations
packages/registry/src/loop/handlers/verify-effect-size-claim.ts— full multi-vendor verification handler with append-only log, deterministicpass_id, and the "no providers ⇒ unverified" honest-absence path.packages/registry/src/loop/verify-enqueue.ts— short-circuits onextraction_status === "rejected"regardless of which caller filed the job.packages/registry/src/loop/research-task.ts— curator-policy D4: "downstream promotion is a SEPARATE, curator-approved action — never written by this module."
P14. Opaque Base64 Cursor with Strict Decoder
Problem
Your REST API needs paginated list endpoints. Today you'd happily use offset/limit; tomorrow you'll want keyset cursors for large tables. If clients ever see ?offset=N in URLs, swapping to keyset is a breaking change. You want pagination to be opaque from day one — but you also want bad cursors to fail loudly with a clean 400, not silently degrade.
The Pattern
const DEFAULT_LIMIT = 50;
const MAX_LIMIT = 500;
interface CursorEnvelope { o: number; }
export interface Page<T> {
items: T[];
next_cursor: string | null;
}
export function encodeCursor(offset: number): string {
const envelope: CursorEnvelope = { o: offset };
return Buffer.from(JSON.stringify(envelope), "utf-8").toString("base64url");
}
export function decodeCursor(cursor: string | null | undefined): number {
if (!cursor) return 0;
let json: string;
try {
json = Buffer.from(cursor, "base64url").toString("utf-8");
} catch (err) {
throw new Error(`bad_cursor: invalid base64`);
}
let parsed: unknown;
try {
parsed = JSON.parse(json);
} catch {
throw new Error(`bad_cursor: invalid json`);
}
// STRICT — reject anything that isn't the exact expected shape.
if (!parsed || typeof parsed !== "object" ||
typeof (parsed as CursorEnvelope).o !== "number" ||
(parsed as CursorEnvelope).o < 0 ||
!Number.isInteger((parsed as CursorEnvelope).o)) {
throw new Error("bad_cursor: not a positive-integer offset envelope");
}
return (parsed as CursorEnvelope).o;
}
export function paginate<T>(
rows: readonly T[],
opts: { cursor?: string | null; limit?: number | null } = {},
): Page<T> {
const offset = decodeCursor(opts.cursor);
const limit = Math.max(1, Math.min(MAX_LIMIT, opts.limit ?? DEFAULT_LIMIT));
const slice = rows.slice(offset, offset + limit);
const nextOffset = offset + slice.length;
return {
items: slice as T[],
next_cursor: nextOffset < rows.length ? encodeCursor(nextOffset) : null,
};
}
// In a handler:
// try {
// const page = paginate(rows, { cursor: req.query.cursor, limit: req.query.limit });
// } catch (err) {
// return Response.json({ error: { code: "bad_cursor", message: err.message } }, { status: 400 });
// }
Design decisions
- Cursor is opaque base64url JSON. Clients can't parse it; they treat it as a token. When you swap from offset to keyset, you change the envelope shape — clients keep working as long as they use the cursor we hand back.
- Strict decoder. Anything that's not exactly the expected shape throws. No "best-effort" parsing, no fallback offset 0 on bad cursor — that would mask client bugs (stale cursor → silent re-pagination from start).
- Throws, not returns Result. Lets the handler do one try/catch around
decodeCursor+paginateand turn it into a clean 400. Caller pattern is simpler than a Result-typed return. - Hard cap on
limit. Clients passing?limit=1000000get truncated toMAX_LIMIT. Prevents accidental (or malicious) memory blowups. next_cursor: nullmeans last page. Client logic: "while cursor is non-null, keep paging." Simpler than "while items.length === limit" which has off-by-one cases at the boundary.- Same return shape regardless of backend. When the backend changes (in-memory slice today, indexed query tomorrow),
paginate()'s signature doesn't.
Tradeoffs
| Strengths | Weaknesses |
|---|---|
| Migration to keyset is non-breaking for clients | Opaque cursors are harder to debug from raw requests |
| Strict decoder catches client bugs early | Forces a try/catch at every list-endpoint handler |
| Hard cap on limit prevents resource abuse | The 500 max is arbitrary; needs revisiting per endpoint shape |
| Same code path against any backend | Offset-based today; will need different envelope when keyset lands |
Citations
packages/registry/src/rest/cursor.ts— fullencodeCursor/decodeCursor/paginatewith strict validation and thebad_cursorerror convention.
This catalog is the source-graded record of engineering patterns that have hardened in production code. New patterns join the list when they crystallize in actual implementation; speculative patterns belong in design docs, not here.