Event bus — guide for addon developers
The arka-deck event bus carries typed facts across all modules. This guide covers the API, conventions, patterns to follow and pitfalls to avoid.
The bus is a core module (not an addon). It carries typed events across all arka-deck modules. For your addon:
import type { EventBus } from '../../core/ports/outbound/event-bus.js';
export function subscribeMyAddonToEventBus(bus: EventBus) { const unsub = bus.subscribe('chat.session.ended', async (event) => { // event is typed Extract<ArkaEvent, { type: 'chat.session.ended' }> console.log('Session ended:', event.sessionId, event.reason); }); return { unsubscribe: unsub };}The bus is instantiated once at boot (composition/core-container.ts) and exposed on CoreContainer.eventBus. All modules subscribe from there.
1. Why a bus
Section titled “1. Why a bus”Without a bus
Section titled “Without a bus”Each module that wants to observe another creates direct coupling:
- Memory knows about chat
- Governance knows about chat
- Telemetry knows about chat
- Squad knows about agents
→ N modules × M dependencies. Impossible to maintain.
With a bus
Section titled “With a bus”Chat publishes chat.session.ended. All interested modules subscribe. No module knows about the others. Adding a module impacts nothing.
Typical use cases:
- Memory subscribes to session/turn events → captures milestones
- Governance subscribes to tool usage → ledger
- Telemetry subscribes to ended turns → token metrics
- Squad subscribes to installed agents → recomposes
2. Concepts
Section titled “2. Concepts”Event = past fact
Section titled “Event = past fact”An event describes a fact that happened: chat.session.ended. Not a command (do this), not a question. The past tense matters.
Naming convention: <domain>.<entity>.<action> kebab-case.
| Good | Bad |
|---|---|
chat.session.ended | endChatSession (command) |
agent.installed | installAgent (command) |
provider.instance.tested | runProviderTest (command) |
memory.entry.captured | triggerCapture (command) |
Event = passive, not reactive
Section titled “Event = passive, not reactive”The publisher does not know what subscribers do. It publishes a fact. Subscribers decide.
// ✅ Publish a factbus.publishAsync({ type: 'chat.session.ended', ...payload });
// ❌ Don't do this (disguised command)bus.publish({ type: 'memory.capture-now', ...payload });Sync vs Async
Section titled “Sync vs Async”| Mode | Signature | When |
|---|---|---|
publish(e) | Promise<void> (allSettled) | When you must wait for resolution before continuing (e.g. project.purged → cleanup cascade) |
publishAsync(e) | void (queueMicrotask) | Frequent/observational events (turn ended, attachments) |
By default, use publishAsync. It never blocks the publisher. Reserve publish for critical cascades.
Subscribers
Section titled “Subscribers”- Independent: a subscriber does not depend on another. Invocation order is not guaranteed.
- Idempotent if possible: if you perform a persistent mutation (DB), plan for the event arriving twice.
- Tolerant to losses: if the server crashes between publish and handler, the event is lost — no retry.
- Errors absorbed: if your handler
throws, the bus logs a warning and continues. Other subscribers still receive.
3. Detailed API
Section titled “3. Detailed API”EventBus.publish(event) — sync
Section titled “EventBus.publish(event) — sync”Awaits resolution of all handlers via Promise.allSettled. Returns Promise<void> that always resolves (never rejects — subscriber errors are caught).
Used for cascades where the publisher must know consequences are handled before continuing.
await bus.publish({ type: 'project.purged', projectId, projectPath, deletedPaths,});return { ok: true };EventBus.publishAsync(event) — async fire-and-forget
Section titled “EventBus.publishAsync(event) — async fire-and-forget”Returns immediately (void). Handlers run at the next tick via queueMicrotask. No publisher blocking.
Used for most events (chat lifecycle, telemetry, etc.).
deps.eventBus?.publishAsync({ type: 'chat.turn.ended', sessionId, success: true, durationMs, usage: { tokensIn: 12, tokensOut: 8 },});EventBus.subscribe(type, handler) — strict typing
Section titled “EventBus.subscribe(type, handler) — strict typing”type is restricted to a literal from the ArkaEventType union. The handler receives a payload typed by discrimination:
bus.subscribe('chat.tool.called', async (event) => { // event is typed Extract<ArkaEvent, { type: 'chat.tool.called' }> // → event.tool: string, event.input: Record<string, unknown> | null, etc.});Returns an Unsubscribe (= () => void). Idempotent (callable one or more times without error).
const unsub = bus.subscribe('chat.session.started', handler);// ... laterunsub(); // OKunsub(); // no-op, no error4. Recommended patterns
Section titled “4. Recommended patterns”Pattern 1 — Subscribe at boot, in the composition root
Section titled “Pattern 1 — Subscribe at boot, in the composition root”const eventBus = new InMemoryEventBus();const forMemory = buildForMemory({ ... });
subscribeMemoryToEventBus({ forMemory, eventBus, clock });Pattern 2 — Subscriber filtering inside its handler
Section titled “Pattern 2 — Subscriber filtering inside its handler”The subscribe API does not support conditional filters. Filtering happens inside the handler:
bus.subscribe('chat.session.ended', async (event) => { if (event.reason === 'error') return; if (event.projectPath === undefined) return; if (!(await isMemoryEnabled(event.projectPath))) return; await forMemory.triggerCapture({ ... });});Pattern 3 — Pseudo state machine (event sequences)
Section titled “Pattern 3 — Pseudo state machine (event sequences)”class SquadStateMachine { private pendingChange = false;
constructor(bus: EventBus, squad: SquadOrchestrator) { bus.subscribe('agent.installed', () => { this.pendingChange = true; }); bus.subscribe('agent.removed', () => { this.pendingChange = true; }); bus.subscribe('chat.session.ended', async (e) => { if (this.pendingChange) { await squad.recompose(e.projectPath); this.pendingChange = false; } }); }}Pattern 4 — Publisher of “resultative” events
Section titled “Pattern 4 — Publisher of “resultative” events”When your addon does something interesting for others, publish an event rather than expose a direct API:
bus.publishAsync({ type: 'memory.entry.captured', projectPath, entryId, kind: entry.kind, health: entry.health, trigger,});5. Anti-patterns to avoid
Section titled “5. Anti-patterns to avoid”await bus.publish inside a handler
Section titled “await bus.publish inside a handler”Risk of infinite cascades. If needed, use publishAsync (the next tick breaks the synchronous chain).
Mutating global shared state without synchronization
Section titled “Mutating global shared state without synchronization”Several handlers of the same event are invoked in parallel (allSettled). If several mutate globalCounter++, race condition.
Complex business logic in the handler
Section titled “Complex business logic in the handler”Handlers must be thin. Delegate to a dedicated use-case:
// ✅bus.subscribe('chat.session.ended', async (e) => { await myUseCase.handle(e);});
// ❌bus.subscribe('chat.session.ended', async (e) => { // 50 lines of business logic → unmaintainable});Relying on handler order
Section titled “Relying on handler order”Order is not guaranteed. If A must execute before B, make a single subscriber that calls A then B internally.
Relying on event persistence
Section titled “Relying on event persistence”The bus is in-process, in-memory, single-user. If the server restarts, in-flight events are lost. Your subscribers must accept that (idempotence + loss tolerance).
Type-casting events
Section titled “Type-casting events”// ❌bus.subscribe('chat.session.ended', (e: any) => { ... });
// ❌bus.subscribe('chat.session.ended', (e) => { const evt = e as ChatRuntimeFailureEvent;});
// ✅bus.subscribe('chat.session.ended', (e) => { // e is already discriminated-typed});6. Event catalogue
Section titled “6. Event catalogue”The source of truth is core/domain/events/arka-event.ts (discriminated union).
To add a new event:
- Edit
core/domain/events/arka-event.ts:- Add an
interface MyNewEvent { readonly type: '...'; ... } - Add it to the
ArkaEventunion
- Add an
- The publisher publishes via
bus.publishAsync(...); subscribers subscribe as usual
7. Testing an addon that consumes the bus
Section titled “7. Testing an addon that consumes the bus”InMemoryEventBus is the prod and test implementation (no I/O, pure function). No separate fake needed.
import { InMemoryEventBus } from 'adapters/outbound/events/in-memory-event-bus.js';
it("my addon reacts to chat.session.ended", async () => { const bus = new InMemoryEventBus(); const myUseCase = vi.fn(); subscribeMyAddon({ bus, myUseCase });
await bus.publish({ type: 'chat.session.ended', sessionId: 'sid_1', projectPath: '/abs/path', transcript: [], reason: 'closed', endedAt: '2026-05-04T10:00:00.000Z', });
expect(myUseCase).toHaveBeenCalledOnce();});To test a publisher, use subscriberCount() and a spy handler:
const captured: ArkaEvent[] = [];bus.subscribe('chat.session.started', (e) => { captured.push(e); });
await myPublisher.doSomething();
expect(captured).toHaveLength(1);8. Lifecycle & shutdown
Section titled “8. Lifecycle & shutdown”- The bus lives in the
CoreContainer— instantiated at boot, released atprocess.exit - No need to unsubscribe on shutdown — the process dies, everything is released
- The
unsubscribereturned bysubscribeis useful for tests or for conditional subscribers
9. Future evolutions (out of current scope)
Section titled “9. Future evolutions (out of current scope)”- Wildcard subscribe (
bus.subscribe('chat.*', handler)) - Persistence / replay (event sourcing)
- Multi-process (worker threads)
- Subscriber order priorities
- Conditional filters on
subscribe
10. Quick reference
Section titled “10. Quick reference”| Action | Code |
|---|---|
| Sync subscribe | const unsub = bus.subscribe('type', handler); |
| Unsubscribe | unsub(); |
| Blocking publish | await bus.publish({ type: '...', ... }); |
| Async publish | bus.publishAsync({ type: '...', ... }); |
| Test subscriber count | bus.subscriberCount('type') (test-only) |
| Reset bus (between tests) | bus.clear() (test-only) |
Sources of truth:
- Domain:
core/domain/events/arka-event.ts - Port:
core/ports/outbound/event-bus.ts - Adapter:
adapters/outbound/events/in-memory-event-bus.ts