Aller au contenu

Event bus — guide pour développeurs d'addons

Le bus d’événements arka-deck transporte des faits typés entre tous les modules. Ce guide couvre l’API, les conventions, les patterns à suivre et les pièges à éviter.


Le bus est un module cœur (pas un addon). Il transporte des events typés entre tous les modules d’arka-deck. Pour ton addon :

import type { EventBus } from '../../core/ports/outbound/event-bus.js';
export function subscribeMyAddonToEventBus(bus: EventBus) {
const unsub = bus.subscribe('chat.session.ended', async (event) => {
// event est typé Extract<ArkaEvent, { type: 'chat.session.ended' }>
console.log('Session terminée:', event.sessionId, event.reason);
});
return { unsubscribe: unsub };
}

Le bus est instancié une fois au boot (composition/core-container.ts) et exposé sur CoreContainer.eventBus. Tous les modules s’abonnent depuis là.


Chaque module qui veut observer un autre module crée un couplage direct :

  • Mémoire connaît le chat
  • Gouvernance connaît le chat
  • Télémétrie connaît le chat
  • Squad connaît les agents

→ N modules × M dépendances. Impossible à maintenir.

Le chat publie chat.session.ended. Tous les modules intéressés s’abonnent. Aucun module ne connaît les autres. L’ajout d’un module n’impacte rien.

Cas d’usage typique :

  • Mémoire s’abonne aux events session/turn → capture des jalons
  • Gouvernance s’abonne aux tools utilisés → ledger
  • Télémétrie s’abonne aux turns terminés → métriques tokens
  • Squad s’abonne aux agents installés → recompose

Un event décrit un fait qui s’est produit : chat.session.ended. Pas une commande (do this), pas une question. Le temps passé est important.

Convention de nommage : <domain>.<entity>.<action> kebab-case.

BonMauvais
chat.session.endedendChatSession (commande)
agent.installedinstallAgent (commande)
provider.instance.testedrunProviderTest (commande)
memory.entry.capturedtriggerCapture (commande)

Le publisher ne sait pas ce que les subscribers font. Il publie un fait. Les subscribers décident.

// ✅ Publier un fait
bus.publishAsync({ type: 'chat.session.ended', ...payload });
// ❌ Ne pas faire ça (commande déguisée)
bus.publish({ type: 'memory.capture-now', ...payload });
ModeSignatureQuand
publish(e)Promise<void> (allSettled)Quand tu dois attendre la résolution avant de continuer (par exemple project.purged → cascade nettoyage)
publishAsync(e)void (queueMicrotask)Events fréquents/observatoires (turn ended, attachments)

Par défaut, utilise publishAsync. Il ne bloque jamais le publisher. Réserve publish aux cascades critiques.

  • Indépendants : un subscriber ne dépend pas d’un autre. L’ordre d’invocation n’est pas garanti.
  • Idempotents si possible : si tu fais une mutation persistante (DB), prévoir le cas où l’event arrive deux fois.
  • Tolérants aux pertes : si le serveur crash entre un publish et le handler, l’event est perdu — pas de retry.
  • Erreurs absorbées : si ton handler throw, le bus log un warning et continue. Les autres subscribers reçoivent quand même.

Attend la résolution de tous les handlers via Promise.allSettled. Renvoie Promise<void> qui résout TOUJOURS (jamais reject — les erreurs subscribers sont catchées).

Utilisé pour des cascades où le publisher doit savoir que les conséquences ont été traitées avant de continuer.

await bus.publish({
type: 'project.purged',
projectId,
projectPath,
deletedPaths,
});
return { ok: true };

EventBus.publishAsync(event) — async fire-and-forget

Section intitulée « EventBus.publishAsync(event) — async fire-and-forget »

Retourne immédiatement (void). Les handlers s’exécutent au tick suivant via queueMicrotask. Aucun blocage du publisher.

Utilisé pour la majorité des events (cycle de vie chat, télémétrie, etc.).

deps.eventBus?.publishAsync({
type: 'chat.turn.ended',
sessionId,
success: true,
durationMs,
usage: { tokensIn: 12, tokensOut: 8 },
});

EventBus.subscribe(type, handler) — typage strict

Section intitulée « EventBus.subscribe(type, handler) — typage strict »

type est restreint à un littéral du union ArkaEventType. Le handler reçoit un payload typé par discrimination :

bus.subscribe('chat.tool.called', async (event) => {
// event est typé Extract<ArkaEvent, { type: 'chat.tool.called' }>
// → event.tool: string, event.input: Record<string, unknown> | null, etc.
});

Retourne un Unsubscribe (= () => void). Idempotent (appelable une ou plusieurs fois sans erreur).

const unsub = bus.subscribe('chat.session.started', handler);
// ... plus tard
unsub(); // OK
unsub(); // no-op, pas d'erreur

Pattern 1 — Subscriber au boot, dans le composition root

Section intitulée « Pattern 1 — Subscriber au boot, dans le composition root »
composition/core-container.ts
const eventBus = new InMemoryEventBus();
const forMemory = buildForMemory({ ... });
subscribeMemoryToEventBus({ forMemory, eventBus, clock });

Pattern 2 — Subscriber qui filtre dans son handler

Section intitulée « Pattern 2 — Subscriber qui filtre dans son handler »

L’API subscribe ne supporte pas de filtre conditionnel. Le filtrage se fait dans le handler :

bus.subscribe('chat.session.ended', async (event) => {
if (event.reason === 'error') return;
if (event.projectPath === undefined) return;
if (!(await isMemoryEnabled(event.projectPath))) return;
await forMemory.triggerCapture({ ... });
});

Pattern 3 — Pseudo machine à états (séquences d’events)

Section intitulée « Pattern 3 — Pseudo machine à états (séquences d’events) »
class SquadStateMachine {
private pendingChange = false;
constructor(bus: EventBus, squad: SquadOrchestrator) {
bus.subscribe('agent.installed', () => { this.pendingChange = true; });
bus.subscribe('agent.removed', () => { this.pendingChange = true; });
bus.subscribe('chat.session.ended', async (e) => {
if (this.pendingChange) {
await squad.recompose(e.projectPath);
this.pendingChange = false;
}
});
}
}

Pattern 4 — Publisher d’events « résultatifs »

Section intitulée « Pattern 4 — Publisher d’events « résultatifs » »

Quand ton addon fait quelque chose d’intéressant pour les autres, publie un event plutôt que d’exposer une API directe :

bus.publishAsync({
type: 'memory.entry.captured',
projectPath,
entryId,
kind: entry.kind,
health: entry.health,
trigger,
});

Risque de cascades infinies. Si nécessaire, utilise publishAsync (le tick suivant casse la chaîne synchrone).

Mutation d’état partagé global sans synchronisation

Section intitulée « Mutation d’état partagé global sans synchronisation »

Plusieurs handlers du même event sont invoqués en parallèle (allSettled). Si plusieurs mutent globalCounter++, race condition.

Les handlers doivent être fins. Déléguez à un use-case dédié :

// ✅
bus.subscribe('chat.session.ended', async (e) => {
await myUseCase.handle(e);
});
// ❌
bus.subscribe('chat.session.ended', async (e) => {
// 50 lignes de logique métier → ingérable
});

L’ordre n’est pas garanti. Si A doit s’exécuter avant B, faites un seul subscriber qui appelle A puis B en interne.

Le bus est in-process, in-memory, single-user. Si le serveur redémarre, les events en vol sont perdus. Tes subscribers doivent accepter ça (idempotence + tolérance pertes).

// ❌
bus.subscribe('chat.session.ended', (e: any) => { ... });
// ❌
bus.subscribe('chat.session.ended', (e) => {
const evt = e as ChatRuntimeFailureEvent;
});
// ✅
bus.subscribe('chat.session.ended', (e) => {
// e est déjà typé par discrimination
});

La source de vérité est core/domain/events/arka-event.ts (union discriminée).

Pour ajouter un nouvel event :

  1. Modifier core/domain/events/arka-event.ts :
    • Ajouter une interface MyNewEvent { readonly type: '...'; ... }
    • Ajouter-la à l’union ArkaEvent
  2. Le publisher publie via bus.publishAsync(...) ; les subscribers s’abonnent comme d’habitude

InMemoryEventBus est l’implémentation prod et test (pas d’I/O, pure fonction). Pas de fake distinct nécessaire.

import { InMemoryEventBus } from 'adapters/outbound/events/in-memory-event-bus.js';
it("mon addon réagit à chat.session.ended", async () => {
const bus = new InMemoryEventBus();
const myUseCase = vi.fn();
subscribeMyAddon({ bus, myUseCase });
await bus.publish({
type: 'chat.session.ended',
sessionId: 'sid_1',
projectPath: '/abs/path',
transcript: [],
reason: 'closed',
endedAt: '2026-05-04T10:00:00.000Z',
});
expect(myUseCase).toHaveBeenCalledOnce();
});

Pour tester un publisher, utiliser subscriberCount() et un handler-spy :

const captured: ArkaEvent[] = [];
bus.subscribe('chat.session.started', (e) => { captured.push(e); });
await myPublisher.doSomething();
expect(captured).toHaveLength(1);

  • Le bus vit dans le CoreContainer — instancié au boot, libéré au process.exit
  • Pas besoin d’unsubscribe au shutdown — le process meurt, tout est libéré
  • L’unsubscribe retourné par subscribe est utile pour les tests ou pour des subscribers conditionnels

  • Wildcard subscribe (bus.subscribe('chat.*', handler))
  • Persistance / replay (event sourcing)
  • Multi-process (worker threads)
  • Priorités d’ordre subscribers
  • Filtres conditionnels côté subscribe

ActionCode
Subscribe syncconst unsub = bus.subscribe('type', handler);
Unsubscribeunsub();
Publish bloquantawait bus.publish({ type: '...', ... });
Publish asyncbus.publishAsync({ type: '...', ... });
Test count subscribersbus.subscriberCount('type') (test-only)
Reset bus (entre tests)bus.clear() (test-only)

Source de vérité :

  • Domaine : core/domain/events/arka-event.ts
  • Port : core/ports/outbound/event-bus.ts
  • Adapter : adapters/outbound/events/in-memory-event-bus.ts