Aller au contenu principal

Architecture Workers BullMQ - Aaperture

Blueprint : Déporter les actions longues/lourdes (CPU/IO/externe) dans des workers BullMQ tout en conservant la logique métier transactionnelle dans l'API NestJS.

Objectif

Améliorer la maintenabilité et la scalabilité en déportant les actions longues/lourdes (CPU/IO/externe) dans des workers BullMQ tout en conservant la logique métier transactionnelle dans l'API NestJS.

Principes

  • API synchrone = valider (Zod) + persister (Postgres) + enqueue job (BullMQ) + retourner rapidement.
  • Workers = exécuter + retries/backoff + idempotence + DLQ + observabilité (job_runs).
  • Outbox recommandé dès qu'il faut garantir "DB commit → job/event".
  • Contrats de jobs stables : payloads versionnés { v: 1, ... } + validation Zod côté worker.
  • Artefacts (exports, OCR results) stockés en Cloudflare R2 (ou équivalent) via un port StoragePort.

Découpage cible (process)

  1. api (NestJS) : REST + auth + règles métier + outbox/enqueue.
  2. worker-email (NestJS) : emails planifiés, tracking, webhooks.
  3. worker-ai (NestJS) : appels OpenAI, post-processing, quotas/rate limiting, coûts.
  4. worker-export (NestJS) : génération CSV/PDF/ZIP, upload R2, presigned.
  5. worker-doc (NestJS) : orchestration pipeline documents (split pages, enqueue OCR/extraction).
  6. ocr-service (Python) optionnel : OCR réel + extraction layout (si besoin libs Python).

Quand activer Python

  • PDF scannés / OCR réel / parsing layout avancé.
  • Dépendances natives ou ML plus pertinentes en Python.

Catalogue standard des jobs (BullMQ)

Nommage

  • email-seq:run_due - Exécution des séquences d'emails dues
  • email:send - Envoi d'email individuel
  • ai:run - Exécution de tâche IA
  • export:build - Génération d'export
  • doc:ingest - Ingestion de document
  • doc:ocr - OCR de document/page
  • doc:extract_structured (optionnel) - Extraction structurée
  • doc:index_search (optionnel) - Indexation pour recherche

Standard job payload

  • Payload = { v: 1, orgId, userId?, entityId, ... }
  • Validation Zod côté worker.
  • jobId stable pour idempotence :
    jobId = ${orgId}:${jobName}:${entityId}:${hash(stablePayloadPart)}
  • Config retries/backoff standardisée :
    • attempts: 5, backoff: { type: "exponential", delay: 3000 }
    • removeOnComplete: 1000, removeOnFail: 5000 (à adapter)

Exemple de payload

// email:send
{
v: 1,
orgId: "uuid",
userId: "uuid",
entityId: "uuid", // scheduled_email.id
templateKey: "quote_reminder",
recipientEmail: "client@example.com",
variables: { quoteNumber: "2024-001", ... }
}

// export:build
{
v: 1,
orgId: "uuid",
requestedBy: "uuid",
entityId: "uuid", // export_job.id
type: "ACCOUNTING",
params: { dateRange: {...}, format: "CSV" }
}

Tables transversales (recommandées)

outbox_events

Garantit transaction DB → publication job/event.

CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL,
type TEXT NOT NULL, -- ex: INVOICE_PAID, QUOTE_ACCEPTED, DOC_UPLOADED
payload JSONB NOT NULL,
status TEXT NOT NULL DEFAULT 'PENDING', -- PENDING|PUBLISHED|FAILED
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
error TEXT
);

CREATE INDEX idx_outbox_events_status ON outbox_events(status, created_at);
CREATE INDEX idx_outbox_events_org ON outbox_events(org_id);

job_runs

Suivi d'exécution et corrélation logs.

CREATE TABLE job_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL,
job_name TEXT NOT NULL, -- ex: email:send, export:build
job_id TEXT NOT NULL UNIQUE, -- stable id pour idempotence
status TEXT NOT NULL, -- QUEUED|RUNNING|COMPLETED|FAILED|CANCELED
entity_type TEXT, -- ex: DOCUMENT|EXPORT|AI_TASK
entity_id UUID,
payload JSONB NOT NULL,
result JSONB,
error TEXT,
attempt INT NOT NULL DEFAULT 1,
queued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ
);

CREATE INDEX idx_job_runs_status ON job_runs(status, queued_at);
CREATE INDEX idx_job_runs_org ON job_runs(org_id);
CREATE INDEX idx_job_runs_entity ON job_runs(entity_type, entity_id);

file_objects (si non unifié)

Référentiel d'artefacts (R2).

CREATE TABLE file_objects (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL,
provider TEXT NOT NULL DEFAULT 'R2', -- R2|S3|LOCAL
bucket TEXT NOT NULL,
key TEXT NOT NULL,
content_type TEXT,
size_bytes BIGINT,
checksum TEXT, -- SHA-256
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by UUID
);

CREATE INDEX idx_file_objects_org ON file_objects(org_id);
CREATE UNIQUE INDEX idx_file_objects_key ON file_objects(provider, bucket, key);

Pattern Outbox

Pour garantir "DB commit → job/event", utiliser le pattern Outbox :

  1. Transaction DB : Insérer dans outbox_events avec status = 'PENDING'
  2. Commit transaction
  3. Poller/Processor : Lire outbox_events avec status = 'PENDING'
  4. Enqueue job : Créer job BullMQ depuis outbox_events.payload
  5. Update : status = 'PUBLISHED', published_at = NOW()

Avantages :

  • Garantit que le job est créé si et seulement si la transaction DB est commitée
  • Évite les jobs orphelins
  • Permet retry en cas d'échec d'enqueue

Idempotence

Tous les jobs doivent être idempotents :

  1. JobId stable : jobId = ${orgId}:${jobName}:${entityId}:${hash(stablePayloadPart)}
  2. Vérification : Avant exécution, vérifier si job_runs avec même job_id et status = 'COMPLETED' existe
  3. Skip si déjà fait : Retourner le résultat existant sans réexécuter

Exemple :

async process(job: Job<EmailSendPayload>) {
const { orgId, entityId } = job.data;
const jobId = `${orgId}:email:send:${entityId}`;

// Vérifier idempotence
const existing = await this.jobRunsRepo.findByJobId(jobId);
if (existing?.status === 'COMPLETED') {
return existing.result;
}

// Exécuter...
}

Retries & Backoff

Configuration standardisée :

{
attempts: 5,
backoff: {
type: 'exponential',
delay: 3000, // 3s, 6s, 12s, 24s, 48s
},
removeOnComplete: 1000, // Garder 1000 jobs complétés
removeOnFail: 5000, // Garder 5000 jobs échoués
}

Stratégies par type de job :

  • Email : 3 tentatives, backoff 2s
  • IA : 2 tentatives, backoff 5s (coûts)
  • Export : 3 tentatives, backoff 5s
  • OCR : 5 tentatives, backoff 3s (peut être lent)

Dead Letter Queue (DLQ)

Jobs échoués après toutes les tentatives → DLQ.

Gestion DLQ :

  1. Monitoring : Dashboard pour visualiser les jobs en DLQ
  2. Replay : Possibilité de rejouer un job depuis la DLQ
  3. Alertes : Notification si DLQ dépasse un seuil (ex: 100 jobs)

Observabilité

Corrélation

  • requestId (API) → jobId (BullMQ) → job_runs.id
  • Tous les logs incluent ces IDs pour traçabilité

Métriques

  • Queue depth : Nombre de jobs en attente par queue
  • Success/fail rate : Taux de succès/échec par type de job
  • P95 duration : Durée d'exécution (percentile 95)
  • Coûts IA : Suivi des coûts OpenAI par organisation

Logs structurés

{
level: 'info',
message: 'Job started',
jobId: 'uuid',
jobName: 'export:build',
orgId: 'uuid',
entityId: 'uuid',
attempt: 1,
timestamp: '2024-01-01T00:00:00Z'
}

Sécurité / Multi-tenant / Permissions

  • Toutes les tables et payloads ont orgId.
  • Guards API pour droits (exports, docs, outreach).
  • Workers valident orgId + existence entité avant exécution.
  • Audit trail sur opérations sensibles.

Checklist d'implémentation

  • Standardiser jobName/jobId + payload versionné { v: 1 }
  • Zod validation des payloads côté worker
  • Ajouter job_runs + outbox_events si manquants
  • Isoler 4 workers : email / ai / export / doc (process séparés)
  • Centraliser StoragePort (R2) et mapping file_objects
  • Mettre en place polling status sur frontend pour jobs longs
  • Ajouter DLQ + replay (optionnel)
  • Mettre à jour la documentation (fichiers listés ci-dessus)

Références