Skip to content

Event bus — guide for addon developers

The arka-deck event bus carries typed facts across all modules. This guide covers the API, conventions, patterns to follow and pitfalls to avoid.


The bus is a core module (not an addon). It carries typed events across all arka-deck modules. For your addon:

import type { EventBus } from '../../core/ports/outbound/event-bus.js';
export function subscribeMyAddonToEventBus(bus: EventBus) {
const unsub = bus.subscribe('chat.session.ended', async (event) => {
// event is typed Extract<ArkaEvent, { type: 'chat.session.ended' }>
console.log('Session ended:', event.sessionId, event.reason);
});
return { unsubscribe: unsub };
}

The bus is instantiated once at boot (composition/core-container.ts) and exposed on CoreContainer.eventBus. All modules subscribe from there.


Each module that wants to observe another creates direct coupling:

  • Memory knows about chat
  • Governance knows about chat
  • Telemetry knows about chat
  • Squad knows about agents

→ N modules × M dependencies. Impossible to maintain.

Chat publishes chat.session.ended. All interested modules subscribe. No module knows about the others. Adding a module impacts nothing.

Typical use cases:

  • Memory subscribes to session/turn events → captures milestones
  • Governance subscribes to tool usage → ledger
  • Telemetry subscribes to ended turns → token metrics
  • Squad subscribes to installed agents → recomposes

An event describes a fact that happened: chat.session.ended. Not a command (do this), not a question. The past tense matters.

Naming convention: <domain>.<entity>.<action> kebab-case.

GoodBad
chat.session.endedendChatSession (command)
agent.installedinstallAgent (command)
provider.instance.testedrunProviderTest (command)
memory.entry.capturedtriggerCapture (command)

The publisher does not know what subscribers do. It publishes a fact. Subscribers decide.

// ✅ Publish a fact
bus.publishAsync({ type: 'chat.session.ended', ...payload });
// ❌ Don't do this (disguised command)
bus.publish({ type: 'memory.capture-now', ...payload });
ModeSignatureWhen
publish(e)Promise<void> (allSettled)When you must wait for resolution before continuing (e.g. project.purged → cleanup cascade)
publishAsync(e)void (queueMicrotask)Frequent/observational events (turn ended, attachments)

By default, use publishAsync. It never blocks the publisher. Reserve publish for critical cascades.

  • Independent: a subscriber does not depend on another. Invocation order is not guaranteed.
  • Idempotent if possible: if you perform a persistent mutation (DB), plan for the event arriving twice.
  • Tolerant to losses: if the server crashes between publish and handler, the event is lost — no retry.
  • Errors absorbed: if your handler throws, the bus logs a warning and continues. Other subscribers still receive.

Awaits resolution of all handlers via Promise.allSettled. Returns Promise<void> that always resolves (never rejects — subscriber errors are caught).

Used for cascades where the publisher must know consequences are handled before continuing.

await bus.publish({
type: 'project.purged',
projectId,
projectPath,
deletedPaths,
});
return { ok: true };

EventBus.publishAsync(event) — async fire-and-forget

Section titled “EventBus.publishAsync(event) — async fire-and-forget”

Returns immediately (void). Handlers run at the next tick via queueMicrotask. No publisher blocking.

Used for most events (chat lifecycle, telemetry, etc.).

deps.eventBus?.publishAsync({
type: 'chat.turn.ended',
sessionId,
success: true,
durationMs,
usage: { tokensIn: 12, tokensOut: 8 },
});

EventBus.subscribe(type, handler) — strict typing

Section titled “EventBus.subscribe(type, handler) — strict typing”

type is restricted to a literal from the ArkaEventType union. The handler receives a payload typed by discrimination:

bus.subscribe('chat.tool.called', async (event) => {
// event is typed Extract<ArkaEvent, { type: 'chat.tool.called' }>
// → event.tool: string, event.input: Record<string, unknown> | null, etc.
});

Returns an Unsubscribe (= () => void). Idempotent (callable one or more times without error).

const unsub = bus.subscribe('chat.session.started', handler);
// ... later
unsub(); // OK
unsub(); // no-op, no error

Pattern 1 — Subscribe at boot, in the composition root

Section titled “Pattern 1 — Subscribe at boot, in the composition root”
composition/core-container.ts
const eventBus = new InMemoryEventBus();
const forMemory = buildForMemory({ ... });
subscribeMemoryToEventBus({ forMemory, eventBus, clock });

Pattern 2 — Subscriber filtering inside its handler

Section titled “Pattern 2 — Subscriber filtering inside its handler”

The subscribe API does not support conditional filters. Filtering happens inside the handler:

bus.subscribe('chat.session.ended', async (event) => {
if (event.reason === 'error') return;
if (event.projectPath === undefined) return;
if (!(await isMemoryEnabled(event.projectPath))) return;
await forMemory.triggerCapture({ ... });
});

Pattern 3 — Pseudo state machine (event sequences)

Section titled “Pattern 3 — Pseudo state machine (event sequences)”
class SquadStateMachine {
private pendingChange = false;
constructor(bus: EventBus, squad: SquadOrchestrator) {
bus.subscribe('agent.installed', () => { this.pendingChange = true; });
bus.subscribe('agent.removed', () => { this.pendingChange = true; });
bus.subscribe('chat.session.ended', async (e) => {
if (this.pendingChange) {
await squad.recompose(e.projectPath);
this.pendingChange = false;
}
});
}
}

Pattern 4 — Publisher of “resultative” events

Section titled “Pattern 4 — Publisher of “resultative” events”

When your addon does something interesting for others, publish an event rather than expose a direct API:

bus.publishAsync({
type: 'memory.entry.captured',
projectPath,
entryId,
kind: entry.kind,
health: entry.health,
trigger,
});

Risk of infinite cascades. If needed, use publishAsync (the next tick breaks the synchronous chain).

Mutating global shared state without synchronization

Section titled “Mutating global shared state without synchronization”

Several handlers of the same event are invoked in parallel (allSettled). If several mutate globalCounter++, race condition.

Handlers must be thin. Delegate to a dedicated use-case:

// ✅
bus.subscribe('chat.session.ended', async (e) => {
await myUseCase.handle(e);
});
// ❌
bus.subscribe('chat.session.ended', async (e) => {
// 50 lines of business logic → unmaintainable
});

Order is not guaranteed. If A must execute before B, make a single subscriber that calls A then B internally.

The bus is in-process, in-memory, single-user. If the server restarts, in-flight events are lost. Your subscribers must accept that (idempotence + loss tolerance).

// ❌
bus.subscribe('chat.session.ended', (e: any) => { ... });
// ❌
bus.subscribe('chat.session.ended', (e) => {
const evt = e as ChatRuntimeFailureEvent;
});
// ✅
bus.subscribe('chat.session.ended', (e) => {
// e is already discriminated-typed
});

The source of truth is core/domain/events/arka-event.ts (discriminated union).

To add a new event:

  1. Edit core/domain/events/arka-event.ts:
    • Add an interface MyNewEvent { readonly type: '...'; ... }
    • Add it to the ArkaEvent union
  2. The publisher publishes via bus.publishAsync(...); subscribers subscribe as usual

InMemoryEventBus is the prod and test implementation (no I/O, pure function). No separate fake needed.

import { InMemoryEventBus } from 'adapters/outbound/events/in-memory-event-bus.js';
it("my addon reacts to chat.session.ended", async () => {
const bus = new InMemoryEventBus();
const myUseCase = vi.fn();
subscribeMyAddon({ bus, myUseCase });
await bus.publish({
type: 'chat.session.ended',
sessionId: 'sid_1',
projectPath: '/abs/path',
transcript: [],
reason: 'closed',
endedAt: '2026-05-04T10:00:00.000Z',
});
expect(myUseCase).toHaveBeenCalledOnce();
});

To test a publisher, use subscriberCount() and a spy handler:

const captured: ArkaEvent[] = [];
bus.subscribe('chat.session.started', (e) => { captured.push(e); });
await myPublisher.doSomething();
expect(captured).toHaveLength(1);

  • The bus lives in the CoreContainer — instantiated at boot, released at process.exit
  • No need to unsubscribe on shutdown — the process dies, everything is released
  • The unsubscribe returned by subscribe is useful for tests or for conditional subscribers

9. Future evolutions (out of current scope)

Section titled “9. Future evolutions (out of current scope)”
  • Wildcard subscribe (bus.subscribe('chat.*', handler))
  • Persistence / replay (event sourcing)
  • Multi-process (worker threads)
  • Subscriber order priorities
  • Conditional filters on subscribe

ActionCode
Sync subscribeconst unsub = bus.subscribe('type', handler);
Unsubscribeunsub();
Blocking publishawait bus.publish({ type: '...', ... });
Async publishbus.publishAsync({ type: '...', ... });
Test subscriber countbus.subscriberCount('type') (test-only)
Reset bus (between tests)bus.clear() (test-only)

Sources of truth:

  • Domain: core/domain/events/arka-event.ts
  • Port: core/ports/outbound/event-bus.ts
  • Adapter: adapters/outbound/events/in-memory-event-bus.ts