wats.sh
Reference

Persistence Reference

The experimental @wats/persistence package: durable runtime state, SQLite adapter, migrations, idempotency, and the outbox.

experimental · reviewed 2026-06-21

import {
  CURRENT_SCHEMA_VERSION,
  PersistenceError,
  runOutboxWorkerOnce,
  type MigrationReport,
  type OutboxItem,
  type PersistenceHealth,
  type PersistenceStore
} from "@wats/persistence";
import { createSqlitePersistence } from "@wats/persistence/sqlite";
import { createPostgresPersistence } from "@wats/persistence/postgres";

The SQLite adapter is for local and single-instance testing. The Postgres adapter is optional and lives behind @wats/persistence/postgres; install pg yourself when you use it. SQLite users do not need Postgres installed.

SQLite quickstart

const store = await createSqlitePersistence({ filename: "./wats.sqlite" });
await store.migrate();
const health = await store.health();
await store.close();

Use a local path or :memory:. Database paths are operationally sensitive; diagnostics report only [REDACTED_SQLITE_DATABASE].

Postgres adapter

import { createPostgresPersistence } from "@wats/persistence/postgres";

const store = await createPostgresPersistence({
  connectionString: process.env.DATABASE_URL ?? ""
});
await store.migrate();
await store.close();

pg is an optional peer dependency. The subpath is importable without pg; the driver is loaded only when createPostgresPersistence(...) runs. Postgres diagnostics report [REDACTED_POSTGRES_DATABASE] and never echo the connection string. The adapter implements the same PersistenceStore contract and schema version as SQLite, including the message projection added in schema v3.

Schema and migrations

The migration runner creates:

  • wats_schema_migrations
  • wats_persistence_lock
  • wats_webhook_events
  • wats_service_requests
  • wats_outbox
  • wats_messages
  • wats_message_status_events

Migrations are forward-only for alpha. Already-applied migration checksums must match the package migration definitions; checksum drift fails closed with PersistenceError. 001_initial keeps its originally shipped checksum; 002_outbox_lease_id upgrades existing v1 databases with the leaseId outbox column; 003_message_projection adds the message projection tables and indexes. A held migration lock fails closed with PersistenceError code migration_lock_failed.

Runtime contract

PersistenceStore exposes:

  • backend
  • migrate(): Promise<MigrationReport>
  • health(): Promise<PersistenceHealth>
  • recordWebhookEvent(...)
  • getServiceRequest(...)
  • recordServiceRequest(...)
  • enqueueOutboxItem(...)
  • claimOutboxItems(...)
  • markOutboxItemFailed(...)
  • markOutboxItemSucceeded(...)
  • recordMessage(...)
  • appendMessageStatus(...)
  • getMessage(...)
  • listMessages(...)
  • close(): Promise<void>

Webhook event records store safe event keys and hashes, not raw webhook bodies. Duplicate event keys return "duplicate", so the service can acknowledge Meta retries without dispatching the same update twice.

Service request idempotency stores an idempotency key, request hash, and response JSON. A matching key/body hash replays the stored response; the same key with a different body hash conflicts.

Message projection

The wats_messages table records outbound send attempts (and, in future slices, inbound messages). Each row is keyed by a caller-generated local row_id, carries the WhatsApp message id (wa_message_id), direction, optional phone fields, Graph type, current status, and created/updated timestamps. wats_message_status_events is an append-only log of status transitions per wa_message_id.

await store.recordMessage({
  rowId: "wats-msg-...",
  waMessageId: "wamid.HBgM...",
  direction: "outbound",
  toPhone: "15551230000",
  type: "text",
  status: "sent",
  graphMessageId: "wamid.HBgM...",
  createdAt: new Date().toISOString(),
  updatedAt: new Date().toISOString()
});

await store.appendMessageStatus({ waMessageId: "wamid.HBgM...", status: "delivered", timestamp: new Date().toISOString() });

const record = await store.getMessage({ waMessageId: "wamid.HBgM..." });
const page = await store.listMessages({ limit: 50 });

recordMessage is idempotent on row_id (INSERT OR IGNORE): re-recording the same rowId is a no-op. appendMessageStatus inserts an event row and best-effort updates the matching message's status and updated_at (no-op if no matching message). listMessages returns rows newest-first (created_at DESC, row_id DESC), with limit (1..100) and an optional beforeRowId cursor for paging; nextCursor is the last item's rowId when more rows may exist, otherwise null.

The service records outbound projections automatically after a successful Graph send and exposes them read-only through GET /messages and GET /messages/{messageId} (see the Service Reference). Projection failures are swallowed and never break the send response.

Outbox

At-least-once local work scheduling:

await store.enqueueOutboxItem({
  id: "send-1",
  payloadHash: "sha256:...",
  createdAt: new Date().toISOString()
});

const report = await runOutboxWorkerOnce(store, {
  now: new Date().toISOString(),
  limit: 10,
  retryDelayMs: 30_000,
  async handler(item: OutboxItem) {
    // Reconstruct/send from application-owned state keyed by item.id.
  }
});

The outbox table stores only payload hashes, status, attempt counts, leaseId, and retry timestamps — no raw webhook bodies, message text, Graph request bodies, or contacts. runOutboxWorkerOnce(...) claims due pending items, calls the handler, marks successes succeeded, and reschedules failures with nextAttemptAt = now + retryDelayMs.

Claimed items use a five-minute processing lease; stale processing rows become claimable again, so a killed worker does not strand records forever. The lease is fenced: markOutboxItemFailed(...) and markOutboxItemSucceeded(...) require the current leaseId, so a stale worker cannot complete a newer reclaimed lease.

Redaction boundary

Persistence diagnostics must not print: access tokens, app secrets, webhook verify tokens, service bearer tokens, authorization headers, database URLs or SQLite paths, raw webhook bodies, message text, or contact payloads.

Relationship to service and CLI

@wats/service accepts an injected PersistenceStore and does not read database environment variables directly. When persistence is injected:

  • signed webhook POSTs are recorded by event key/hash and duplicates are acknowledged without redispatch;
  • service send routes honor Idempotency-Key for replay/conflict behavior;
  • injected stores must expose the outbox methods as part of the accepted service persistence contract.

The CLI exposes read-only local message navigation through wats messages list and wats messages show. These call the running local service /api/messages endpoint; they do not call Meta and do not require live credentials. Deeper conversation/thread navigation is not built.

Non-goals

Not implemented yet: deeper CLI thread/conversation navigation, delivery/read status UI, raw webhook body storage, automatic service send enqueueing, production hosting guarantees, and live Meta validation. The Postgres adapter is shape-only and tested with a mock client; it is not live-validated against a running Postgres service in default CI.

On this page