Listeners Reference
The listener substrate: waiting for a future typed update via wa.listen, with timeouts, cancellation, and registry lifecycle.
active · reviewed 2026-04-22
The listener substrate lets a handler (or any async flow) wait for a future typed update that matches a filter — conversational patterns without hand-rolling Promise wiring.
The substrate composes the TypedFilter surface (matching), the TypedRouter (dispatch), and the WhatsApp facade (composition root).
import {
WhatsApp,
createListenerRegistry,
ListenerAbortError,
ListenerTimeoutError,
type ListenerHandle
} from "@wats/core";
import { message } from "@wats/core/filtersTyped";
const wa = new WhatsApp({ graphClient });
// Facade-local listener substrate (lazy). Returns a frozen handle
// carrying { id, promise, cancel, cancelled, settled }.
const reply: ListenerHandle = wa.listen({
type: "message",
from: "15551234567",
timeoutMs: 30_000
});
// Somewhere else: the webhook adapter calls wa.dispatch(update) as
// updates arrive. The listener's promise resolves to the matching
// TypedMessageUpdate — listener evaluation runs BEFORE the handler
// loop, but the update still flows through normal handlers
// (additive, not short-circuit).
try {
const reply = await reply.promise;
console.log("got reply:", reply.message.text?.body);
} catch (err) {
if (err instanceof ListenerTimeoutError) {
// No matching update within 30_000 ms.
} else if (err instanceof ListenerAbortError) {
// reply.cancel() was called, or an AbortSignal aborted.
}
}createListenerRegistry
function createListenerRegistry(
options?: ListenerRegistryOptions
): ListenerRegistry;
interface ListenerRegistryOptions {
readonly maxActiveListeners?: number; // default: 10_000
}
interface ListenerRegistry {
readonly activeCount: number;
register<T extends TypedUpdate>(
filter: TypedFilter<T>,
options?: ListenerOptions
): ListenerHandle<T>;
evaluate(update: TypedUpdate): {
readonly matched: boolean;
readonly listenerId?: symbol;
};
clear(): void;
}createListenerRegistry() returns an in-memory registry. Use the facade-owned registry exposed via wa.listenerRegistry (lazily created on first .listen() call), or construct your own and pass it via new WhatsApp({ listenerRegistry }).
register(filter, options?)
filtermust be a brandedTypedFilter<T>(see the filters reference). Any non-filter throwsListenerOptionsError("invalid_filter").options.timeoutMs(optional positive integer): rejects the handle withListenerTimeoutErrorafter N milliseconds and removes the listener from the registry.0, negative, non-integer,NaN, andInfinityall throwListenerOptionsError("invalid_timeout")at construction.options.signal(optionalAbortSignal): rejects withListenerAbortError("listener_signal_aborted")on abort. An already-aborted signal at register time rejects synchronously and the listener never enters the registry (soactiveCountis unaffected).options.description(optional string): debug label. Non-string throwsListenerOptionsError("invalid_description").- If
activeCountis already atmaxActiveListeners,registerthrowsListenerOptionsError("max_listeners_exceeded")before any side-effects (no timer or signal listener is attached).
Returns a frozen ListenerHandle<T>:
interface ListenerHandle<T extends TypedUpdate = TypedUpdate> {
readonly id: symbol;
readonly promise: Promise<T>;
readonly cancelled: boolean;
readonly settled: boolean;
cancel(): void;
}id— uniqueSymbol; matchesevaluate(update).listenerIdwhen this listener wins a match.promise— resolves to the matchedTypedUpdateOR rejects withListenerTimeoutError/ListenerAbortError. Never resolves twice.cancelled— flipstrueonly when.cancel()is called.settled— flipstrueon any settlement path (match, timeout, abort, cancel, registry clear).cancel()— idempotent; rejects a pending promise withListenerAbortError("listener_cancelled"). No-op aftersettled.
Every settlement path runs through a single internal finalize() helper that clears the timer, removes the abort listener, flips settled, and removes the registry entry. No dangling setTimeout or AbortSignal listeners survive a settled listener.
evaluate(update)
The router (or any caller) passes an incoming TypedUpdate. The registry iterates listeners in registration order and resolves the first matching listener — first-match-wins:
- If 3 listeners all match the same update, only the first (by registration order) resolves; the other two stay pending.
- The winning listener is removed from the registry before its promise resolves, so chained
.then(...)code seesactiveCountalready decremented. - Predicate throws propagate unchanged — a
custom()filter that throws surfaces the throw to the caller ofevaluate(). The router (and the facade dispatch wrapper) isolate these throws so the dispatch resolution contract survives.
clear()
Rejects every pending listener with ListenerAbortError("listener_registry_cleared") and empties the registry. Typically invoked on graceful shutdown.
Listener BEFORE handler ordering
Listener resolution runs BEFORE the handler loop on every dispatch. The update still flows through normal handlers — listeners are additive, not short-circuit:
- Register a listener for
{ type: "message", from: USER }. - Register a handler that logs every incoming message.
- Dispatch an update matching both.
- The listener wins its match first (its promise resolves); the handler still fires.
const lh = wa.listen({ type: "message" });
wa.on(message, () => console.log("handler saw message"));
await wa.dispatch(update);
const u = await lh.promise; // resolves
// Console still shows "handler saw message".This mirrors pywa's listeners.py / handler interplay and keeps normal routing unaffected by the listener substrate.
Sent-result waiters
WhatsApp.startChat(...) returns a waitable send result whose helpers register one-shot listeners on the same registry:
waitForReply({ timeoutMs?, signal? })waitUntilDelivered({ timeoutMs?, signal? })waitUntilRead({ timeoutMs?, signal? })waitUntilFailed({ timeoutMs?, signal? })
Same timeoutMs and AbortSignal cleanup guarantees as wa.listen(...). They are convenience filters over observed future webhooks; they do not add persistence, replay, delivery guarantees, or read/delivered inference.
TypedRouter integration
TypedRouterOptions accepts an optional listenerRegistry hook:
import { TypedRouter, createListenerRegistry } from "@wats/core";
const registry = createListenerRegistry();
const router = new TypedRouter({ listenerRegistry: registry });When the router's dispatch(update) is invoked:
observer.onBeforeDispatch(dispatchId, update)fires.listenerRegistry.evaluate(update)runs — at most one listener wins (first-match-wins).- If a listener matched,
observer.onListenerMatch(dispatchId, listenerId, update)fires. - The normal handler loop runs unchanged — snapshot semantics, error collection,
"stop"return, etc. observer.onAfterDispatch(dispatchId, report)fires.
Listener-predicate throws during evaluation are isolated at the router boundary: the throw is swallowed so the dispatch still resolves normally. A throwing listener filter is a programmer bug in consumer code; observe it via your own logging.
WhatsApp.listen facade method
The facade is the ergonomic surface. It lazily creates a default ListenerRegistry on first call (unless the caller supplied one via new WhatsApp({ listenerRegistry })).
interface WhatsAppListenOptions<TKind extends TypedUpdate["kind"]> {
readonly type: TKind; // kind gate — narrows T
readonly from?: string; // optional sender wa_id narrower
readonly filter?: TypedFilter<...>; // optional extra constraint
readonly timeoutMs?: number;
readonly signal?: AbortSignal;
readonly description?: string;
}
wa.listen<TKind>(
options: WhatsAppListenOptions<TKind>
): ListenerHandle<Extract<TypedUpdate, { kind: TKind }>>;The facade composes the kind gate, the optional from narrower, and the optional user-supplied filter via and(...) from @wats/core/filtersTyped.
Additional facade surface:
wa.listenerRegistry— getter returning the current registry (orundefinedbefore the first.listen()call when the caller did not supply one at construction).wa.activeListenerCount— convenience getter equivalent towa.listenerRegistry?.activeCount ?? 0.WhatsAppListenOptionsError— thrown by.listen()for invalid options;.codetaxonomy:"invalid_listen_options"/"invalid_listen_type"/"invalid_listen_from"/"invalid_listen_filter".
Error taxonomy
All three error classes extend Error and carry a stable .code field.
| Class | Code | When |
|---|---|---|
ListenerTimeoutError | "listener_timeout" | options.timeoutMs elapsed |
ListenerAbortError | "listener_cancelled" | handle.cancel() |
ListenerAbortError | "listener_signal_aborted" | AbortSignal aborted |
ListenerAbortError | "listener_registry_cleared" | registry.clear() |
ListenerOptionsError | "invalid_filter" | Non-branded filter |
ListenerOptionsError | "invalid_options" | Non-object options |
ListenerOptionsError | "invalid_timeout" | timeoutMs not a positive integer |
ListenerOptionsError | "invalid_signal" | signal not AbortSignal-shaped |
ListenerOptionsError | "invalid_description" | description not a string |
ListenerOptionsError | "max_listeners_exceeded" | activeCount >= maxActiveListeners (default 10_000) |
ListenerOptionsError | "invalid_max_active_listeners" | maxActiveListeners not a positive integer |
The facade surfaces WhatsAppListenOptionsError for facade-scoped validation (unknown type, empty from, non-object filter); the underlying ListenerRegistry.register runs afterwards and may still throw ListenerOptionsError for options it validates directly.
Observer seam: onListenerMatch
interface RouterObserver {
// ... existing hooks ...
onListenerMatch?: (
dispatchId: string,
listenerId: symbol,
update: TypedUpdate
) => void;
}The hook fires once per dispatch only when a listener wins its match. Throws inside onListenerMatch are isolated (silent-swallow) per the router observer-throw policy — they never poison the dispatch report.
Parity notes
pywa/listeners.py exposes client.listen(...) with a similar kind + from-narrower surface. WATS matches the shape: kind discriminant, optional from narrower, optional extra filter, timeoutMs + signal. Differences:
- WATS returns a
ListenerHandlewith{ promise, cancel }instead of a raw Promise, so callers can cancel without wiring anAbortController. - WATS listener evaluation runs before handler dispatch. pywa handlers and listeners interleave on a thread-queue; WATS fires listener resolution synchronously in
dispatch()and then runs the handler loop. - WATS is single-process / in-memory; pywa's listener table is similarly in-process. No persistence layer exists in either.
Non-goals
- No persistence. Listener state lives in memory only; a process restart drops all pending listeners. Cross-process distribution is out of scope.
- No cross-instance distribution. A listener registered on one
WhatsAppinstance is invisible to another instance in the same process or a different process. - No listener deduplication across identical filters. Two
wa.listen({ type: "message" })calls create two independent handles. - No Clock abstraction. Timeouts use real
setTimeout; the contract is a numerictimeoutMs. - No @wats/http webhook wiring. The webhook adapter owns that integration — it calls
wa.dispatch(update)as envelopes arrive.
Not implemented yet: a sendAndWait combinator and FakeClock threading for deterministic timeout tests.