cross-service-events
messaging specs/messaging/cross-service-events.kmd
Specification body
Cross-Service Event Delivery (Redis Streams)
Origem
services/foundation/id/engine/services/identity/internal/service/event_bus.go
auto-documenta: "intentionally simple — a KDB-backed durable queue can
replace it later without changing the publisher interface." Esse "later"
chegou: tickets #091, #092, #093 precisam que auth/sso/keys services
consumam identity.user.erased cross-process.
Ticket #097 levantou 3 opções (gRPC RPC, durable queue, webhook reuse). Esta spec ratifica Opção B — Durable queue via Redis Streams.
Decisão (2026-05-14)
Redis Streams é o substrate escolhido para event delivery cross-service até que kdb-stream layer da RFC-001 shipe, momento em que a migração é drop-in (mesma semântica pub/sub).
Razões:
- Reuse-first —
pkg/redisclient/redisclient.gojá existe no engine; sem nova infra - Self-hosted-first — Redis é OSS, self-hostable, deployment já estabelecido em ops
- Consumer groups — retry + offset + DLQ built-in; cada serviço tem seu próprio progresso
- Migration path — kdb-stream futuro usa XADD/XREADGROUP semantics paralelas
- Drop simplicity vs NATS JetStream — Redis já está in-stack; NATS exigiria nova infra
- Webhook_deliveries reuse rejeitada — semantic stretch (intended for external webhooks, polling latency ruim pra cascade)
Contrato (R-rules)
R1 — Stream key naming
Único stream global por serviço-origem:
| Stream key | Publisher | Consumers |
|---|---|---|
koder:events:id | services/foundation/id/engine (identity) | auth, sso, keys |
koder:events:auth (futuro) | services/foundation/id/engine (auth) | (TBD per event) |
Prefixo koder:events: reservado.
R2 — Event envelope
Cada XADD carrega campos chave-valor (Redis Streams native format):
XADD koder:events:id * \
type identity.user.erased \
event_id evt_<ulid> \
occurred_at 2026-05-14T12:34:56Z \
tenant_id tenant-A \
subject_id user-u123 \
actor system \
data '{"erasure_id":"ERS-2026-05-14-abc12345"}'
Campos obrigatórios:
type— event type (e.g.identity.user.erased)event_id— globally unique (ULID); idempotency key for consumersoccurred_at— ISO 8601 UTCtenant_id— tenant scopesubject_id— primary subject (user_id, workspace_id, etc.)actor— who/what triggered (system,user:<id>,admin:<id>)data— JSON-encoded event-specific payload
R3 — Consumer group naming
Cada serviço consumidor cria seu próprio consumer group:
<service-name>-<purpose>
e.g.:
auth-erasure-cascade
sso-erasure-cascade
keys-erasure-cascade
Pattern: <service>-<purpose>. Group names são globally unique per stream.
R4 — Consumer behavior
- Boot-time:
XGROUP CREATE koder:events:id <group> $ MKSTREAM(start from now; future events only) - Loop:
XREADGROUP GROUP <group> <consumer-id> COUNT 100 BLOCK 5000 STREAMS koder:events:id > - Per message: deserialize → handle (idempotent — same
event_idmay be re-delivered) - Success:
XACK koder:events:id <group> <message_id> - Failure: log + leave un-ack'd; XPENDING tracks; retry via
XCLAIMafter timeout - Hard failure (>3 retries):
XCLAIM+ move to DLQ streamkoder:events:id:dlqfor ops
R5 — Idempotency contract
Consumers MUST be idempotent per event_id. Recommended:
- Maintain
processed_eventstable withevent_idPK + TTL of 24h - Skip if
event_idalready processed within window - For erasure cascade: anonimization/deletion are naturally idempotent (re-applying = no-op)
R6 — Publisher behavior
Publisher (e.g. ErasureService.ExecuteErasure):
- Generate
event_idvia ULID XADD koder:events:id MAXLEN ~ 100000 * type ... event_id ... data ...- MAXLEN ~ 100000 (~ approx) trims stream to last 100k events; older purge automatically
- On Redis failure: log + emit fallback in-process EventBus (existing) for SAME-process consumers; cross-service consumers retry on next poll if Redis recovers
R7 — TLS + auth
- Production: Redis TLS + AUTH password (env
KODER_REDIS_URL=rediss://user:pass@host:6379/0) - Development: plain Redis OK
- Connection pooling via
pkg/redisclient(existing)
R8 — Observability
Publishers + consumers emit slog:
- Publish:
event_emitted {type, event_id, subject_id, stream} - Consume start:
event_received {type, event_id, group, consumer} - Consume ack:
event_acked {type, event_id, elapsed_ms} - Consume fail:
event_failed {type, event_id, error, retry_count} - DLQ:
event_dlq {type, event_id, final_error}(warn level)
Metrics (Prometheus, future): event_count_total, event_lag_seconds, event_dlq_total.
R9 — Migration to kdb-stream
When kdb-stream layer ships per RFC-001:
- Add
KODER_EVENT_BUS=kdb-streamenv flag - New
pkg/eventbus/kdb_stream.goimpl satisfying same publisher/consumer interfaces - Migration script copies in-flight stream state from Redis → kdb-stream
- Cutover: env flip + restart services
- Redis decommission
Drop-in because XADD/XREADGROUP have direct parallels in kdb-stream design.
Implementation
Wave 1 (now) — publisher abstraction in engine
-
services/foundation/id/engine/pkg/eventbus/package -
Publisherinterface (Publish method) + Redis Streams impl -
Consumerinterface (Subscribe + Process methods) + Redis Streams impl - Wire publisher in
ErasureService.ExecuteErasure(replace directidentity.PublishUserErasedwith bus.Publish; in-process EventBus keeps firing for SAME-process consumers via dual-emit)
Wave 2 — consumers in sibling services
-
services/auth/internal/listener/erasure_listener.go(ticket #091) -
services/sso/internal/listener/erasure_listener.go(ticket #092) -
services/keys/internal/listener/erasure_listener.go(ticket #093)
Wave 3 — DLQ + observability + metrics
- DLQ stream + admin endpoint to inspect/replay
- Prometheus metrics
Test contract
T1 — Round-trip happy path
Publisher → XADD → Consumer XREADGROUP → handler invoked → XACK. Assert: message acked, handler received correct envelope.
T2 — Consumer idempotency
Same event delivered twice → handler invoked twice but business state changes once. Assert: anonimization/deletion idempotent.
T3 — Retry on failure
Handler returns error → message stays un-ack'd → XCLAIM after timeout → retry. Assert: 2nd attempt invokes handler.
T4 — DLQ on persistent failure
Handler fails 3× → message moved to DLQ stream. Assert: original stream cleared, DLQ has entry with error.
T5 — Concurrent consumers
2 consumer instances in same group → messages distributed (load balanced). Assert: each msg processed exactly once across instances.
N1 — Redis down on publish
Publisher Redis call fails → falls back to in-process EventBus + retry on next event. Assert: same-process consumers still work; cross-service consumers catch up on Redis recover.
N2 — Malformed event
Consumer receives event with missing required field → moves to DLQ immediately, doesn't crash.
Não-escopo
- Same-process pub/sub: existing in-process
EventBuscontinues for badges, webhooks, etc. This spec only covers CROSS-process delivery. - Synchronous request/reply (RPC pattern) — out-of-scope; events are fire-and-forget.
- Event sourcing / replay-from-zero — Wave 3 of erasure flow (
#094covers replay fromerasure_requeststable; not from stream).
Referências
- Redis Streams docs: https://redis.io/docs/data-types/streams/
- Consumer groups: https://redis.io/docs/data-types/streams/#consumer-groups
- RFC-001 kdb-as-unified-data-plane § Layers (kdb-stream future)
- services/foundation/id/engine#097 (decision ticket)