Architecture Workers BullMQ - Aaperture
Blueprint : Déporter les actions longues/lourdes (CPU/IO/externe) dans des workers BullMQ tout en conservant la logique métier transactionnelle dans l'API NestJS.
Objectif
Améliorer la maintenabilité et la scalabilité en déportant les actions longues/lourdes (CPU/IO/externe) dans des workers BullMQ tout en conservant la logique métier transactionnelle dans l'API NestJS.
Principes
- API synchrone = valider (Zod) + persister (Postgres) + enqueue job (BullMQ) + retourner rapidement.
- Workers = exécuter + retries/backoff + idempotence + DLQ + observabilité (
job_runs). - Outbox recommandé dès qu'il faut garantir "DB commit → job/event".
- Contrats de jobs stables : payloads versionnés
{ v: 1, ... }+ validation Zod côté worker. - Artefacts (exports, OCR results) stockés en Cloudflare R2 (ou équivalent) via un port
StoragePort.
Découpage cible (process)
api(NestJS) : REST + auth + règles métier + outbox/enqueue.worker-email(NestJS) : emails planifiés, tracking, webhooks.worker-ai(NestJS) : appels OpenAI, post-processing, quotas/rate limiting, coûts.worker-export(NestJS) : génération CSV/PDF/ZIP, upload R2, presigned.worker-doc(NestJS) : orchestration pipeline documents (split pages, enqueue OCR/extraction).ocr-service(Python) optionnel : OCR réel + extraction layout (si besoin libs Python).
Quand activer Python
- PDF scannés / OCR réel / parsing layout avancé.
- Dépendances natives ou ML plus pertinentes en Python.
Catalogue standard des jobs (BullMQ)
Nommage
email-seq:run_due- Exécution des séquences d'emails duesemail:send- Envoi d'email individuelai:run- Exécution de tâche IAexport:build- Génération d'exportdoc:ingest- Ingestion de documentdoc:ocr- OCR de document/pagedoc:extract_structured(optionnel) - Extraction structuréedoc:index_search(optionnel) - Indexation pour recherche
Standard job payload
- Payload =
{ v: 1, orgId, userId?, entityId, ... } - Validation Zod côté worker.
jobIdstable pour idempotence :
jobId = ${orgId}:${jobName}:${entityId}:${hash(stablePayloadPart)}- Config retries/backoff standardisée :
attempts: 5,backoff: { type: "exponential", delay: 3000 }removeOnComplete: 1000,removeOnFail: 5000(à adapter)
Exemple de payload
// email:send
{
v: 1,
orgId: "uuid",
userId: "uuid",
entityId: "uuid", // scheduled_email.id
templateKey: "quote_reminder",
recipientEmail: "client@example.com",
variables: { quoteNumber: "2024-001", ... }
}
// export:build
{
v: 1,
orgId: "uuid",
requestedBy: "uuid",
entityId: "uuid", // export_job.id
type: "ACCOUNTING",
params: { dateRange: {...}, format: "CSV" }
}
Tables transversales (recommandées)
outbox_events
Garantit transaction DB → publication job/event.
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL,
type TEXT NOT NULL, -- ex: INVOICE_PAID, QUOTE_ACCEPTED, DOC_UPLOADED
payload JSONB NOT NULL,
status TEXT NOT NULL DEFAULT 'PENDING', -- PENDING|PUBLISHED|FAILED
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ,
error TEXT
);
CREATE INDEX idx_outbox_events_status ON outbox_events(status, created_at);
CREATE INDEX idx_outbox_events_org ON outbox_events(org_id);
job_runs
Suivi d'exécution et corrélation logs.
CREATE TABLE job_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL,
job_name TEXT NOT NULL, -- ex: email:send, export:build
job_id TEXT NOT NULL UNIQUE, -- stable id pour idempotence
status TEXT NOT NULL, -- QUEUED|RUNNING|COMPLETED|FAILED|CANCELED
entity_type TEXT, -- ex: DOCUMENT|EXPORT|AI_TASK
entity_id UUID,
payload JSONB NOT NULL,
result JSONB,
error TEXT,
attempt INT NOT NULL DEFAULT 1,
queued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ
);
CREATE INDEX idx_job_runs_status ON job_runs(status, queued_at);
CREATE INDEX idx_job_runs_org ON job_runs(org_id);
CREATE INDEX idx_job_runs_entity ON job_runs(entity_type, entity_id);
file_objects (si non unifié)
Référentiel d'artefacts (R2).
CREATE TABLE file_objects (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
org_id UUID NOT NULL,
provider TEXT NOT NULL DEFAULT 'R2', -- R2|S3|LOCAL
bucket TEXT NOT NULL,
key TEXT NOT NULL,
content_type TEXT,
size_bytes BIGINT,
checksum TEXT, -- SHA-256
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_by UUID
);
CREATE INDEX idx_file_objects_org ON file_objects(org_id);
CREATE UNIQUE INDEX idx_file_objects_key ON file_objects(provider, bucket, key);
Pattern Outbox
Pour garantir "DB commit → job/event", utiliser le pattern Outbox :
- Transaction DB : Insérer dans
outbox_eventsavecstatus = 'PENDING' - Commit transaction
- Poller/Processor : Lire
outbox_eventsavecstatus = 'PENDING' - Enqueue job : Créer job BullMQ depuis
outbox_events.payload - Update :
status = 'PUBLISHED',published_at = NOW()
Avantages :
- Garantit que le job est créé si et seulement si la transaction DB est commitée
- Évite les jobs orphelins
- Permet retry en cas d'échec d'enqueue
Idempotence
Tous les jobs doivent être idempotents :
- JobId stable :
jobId = ${orgId}:${jobName}:${entityId}:${hash(stablePayloadPart)} - Vérification : Avant exécution, vérifier si
job_runsavec mêmejob_idetstatus = 'COMPLETED'existe - Skip si déjà fait : Retourner le résultat existant sans réexécuter
Exemple :
async process(job: Job<EmailSendPayload>) {
const { orgId, entityId } = job.data;
const jobId = `${orgId}:email:send:${entityId}`;
// Vérifier idempotence
const existing = await this.jobRunsRepo.findByJobId(jobId);
if (existing?.status === 'COMPLETED') {
return existing.result;
}
// Exécuter...
}
Retries & Backoff
Configuration standardisée :
{
attempts: 5,
backoff: {
type: 'exponential',
delay: 3000, // 3s, 6s, 12s, 24s, 48s
},
removeOnComplete: 1000, // Garder 1000 jobs complétés
removeOnFail: 5000, // Garder 5000 jobs échoués
}
Stratégies par type de job :
- Email : 3 tentatives, backoff 2s
- IA : 2 tentatives, backoff 5s (coûts)
- Export : 3 tentatives, backoff 5s
- OCR : 5 tentatives, backoff 3s (peut être lent)
Dead Letter Queue (DLQ)
Jobs échoués après toutes les tentatives → DLQ.
Gestion DLQ :
- Monitoring : Dashboard pour visualiser les jobs en DLQ
- Replay : Possibilité de rejouer un job depuis la DLQ
- Alertes : Notification si DLQ dépasse un seuil (ex: 100 jobs)
Observabilité
Corrélation
requestId(API) →jobId(BullMQ) →job_runs.id- Tous les logs incluent ces IDs pour traçabilité
Métriques
- Queue depth : Nombre de jobs en attente par queue
- Success/fail rate : Taux de succès/échec par type de job
- P95 duration : Durée d'exécution (percentile 95)
- Coûts IA : Suivi des coûts OpenAI par organisation
Logs structurés
{
level: 'info',
message: 'Job started',
jobId: 'uuid',
jobName: 'export:build',
orgId: 'uuid',
entityId: 'uuid',
attempt: 1,
timestamp: '2024-01-01T00:00:00Z'
}
Sécurité / Multi-tenant / Permissions
- Toutes les tables et payloads ont
orgId. - Guards API pour droits (exports, docs, outreach).
- Workers valident
orgId+ existence entité avant exécution. - Audit trail sur opérations sensibles.
Checklist d'implémentation
- Standardiser
jobName/jobId+ payload versionné{ v: 1 } - Zod validation des payloads côté worker
- Ajouter
job_runs+outbox_eventssi manquants - Isoler 4 workers : email / ai / export / doc (process séparés)
- Centraliser
StoragePort(R2) et mappingfile_objects - Mettre en place polling status sur frontend pour jobs longs
- Ajouter DLQ + replay (optionnel)
- Mettre à jour la documentation (fichiers listés ci-dessus)