Système de Queue pour Jobs Lourds
Dernière mise à jour : 2025-01-XX
Documentation complète du système de queue basé sur BullMQ pour le traitement asynchrone des jobs lourds.
🎯 Vue d'ensemble
Le système de queue permet de découpler les opérations longues du thread principal de l'application, offrant :
- ✅ Traitement asynchrone : Les jobs sont exécutés en arrière-plan sans bloquer les requêtes HTTP
- ✅ Retry automatique : Retry avec backoff exponentiel en cas d'échec
- ✅ Scalabilité horizontale : Support de workers multiples pour traiter les jobs en parallèle
- ✅ Monitoring : Suivi des jobs (en cours, en attente, échoués)
- ✅ Priorisation : Possibilité de prioriser certains types de jobs
- ✅ Rate limiting : Limitation du nombre de jobs par type
- ✅ Mises à jour en temps réel : Notifications WebSocket pour suivre la progression des jobs
🏗️ Architecture
┌─────────────────┐
│ Controller │
│ / Scheduler │
└────────┬────────┘
│
▼
┌─────────────────┐
│ QueueService │
│ (add jobs) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Queue (Redis) │
│ (BullMQ) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Processor │
│ (WorkerHost) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Service │
│ (Business │
│ Logic) │
└─────────────────┘
│
▼
┌─────────────────┐
│ WebSocket │
│ (Status │
│ Updates) │
└─────────────────┘
Flux de traitement
- Enqueue : Un controller ou scheduler ajoute un job à la queue via
QueueService - Queue : Le job est stocké dans Redis (BullMQ)
- Process : Un processor (WorkerHost) récupère et traite le job
- Update : Les mises à jour de progression sont envoyées via WebSocket
- Complete/Fail : Le job est marqué comme complété ou échoué
⚙️ Configuration
🧵 Mode Worker (BullMQ) — API vs Worker
En dev, BullMQ est exécuté dans un process worker séparé (recommandé) :
- API (HTTP) : ajoute des jobs (
enqueue) mais ne lance pas les processors - Worker : lance les processors BullMQ (ex: exports, scheduled-emails, google-calendar-sync, etc.)
Pourquoi ?
- évite que l’API charge les processors / leurs dépendances au boot
- évite que les jobs longs/retries impactent les requêtes HTTP
- permet de scaler les workers indépendamment
RUN_MODE
Le même entrypoint backend/src/main.ts gère les deux modes :
RUN_MODE=api: démarreAppModuleet écoute sur un portRUN_MODE=worker: démarre un module minimal viacreateApplicationContext()(pas de serveur HTTP)
Variables d’environnement (queue runtime)
# Active BullMQ
QUEUE_ENABLED=true
# Active le chargement des processors (worker)
QUEUE_WORKERS_ENABLED=true
# Désactive Bull Board si non nécessaire
QUEUE_BOARD_ENABLED=false
# Recommandé: désactiver schedule/event emitter côté worker (évite doublons)
SCHEDULE_ENABLED=false
EVENT_EMITTER_ENABLED=false
Note : Bull Board suppose que les queues BullMQ existent. Si
QUEUE_ENABLED=false, il faut garderQUEUE_BOARD_ENABLED=false(sinon le module de dashboard échoue au boot).
Modules impliqués
QueueRuntimeModule: module “switch” global.- si
QUEUE_ENABLED=false→ fournit unQueueServiceno-op - si
QUEUE_ENABLED=trueetQUEUE_WORKERS_ENABLED=false→ charge le mode client (API) - si
QUEUE_ENABLED=trueetQUEUE_WORKERS_ENABLED=true→ charge le mode worker
- si
QueueCoreModule: connexion BullMQ + enregistrement des queues (pas deQueueService).QueueClientModule(API) :QueueCoreModule+QueueService+ émission de progress via WebSocket.QueueWorkerModule(Worker) :QueueCoreModule+QueueService+ processors “base” (ex: audit-cleanup) + progress emitter no-op (pas de WebSocket).WorkerModule: module minimal qui charge BullMQ + processor modules + dépendances strictes.
Note (DI) :
QUEUE_PROGRESS_EMITTER_TOKENest un token Symbol (pas une string) pour éviter les collisions et rendre les injections plus robustes.
⚠️ Règle : évitez de démarrer le worker avec
AppModule(trop lourd, charge des features non liées aux jobs).
Variables d'environnement
Le système utilise les mêmes variables Redis que le cache :
# Redis configuration (utilisé par BullMQ)
REDIS_ENABLED=true # Activer Redis (défaut: true)
REDIS_HOST=redis # Host Redis (défaut: redis)
REDIS_PORT=6379 # Port Redis (défaut: 6379)
REDIS_PASSWORD= # Mot de passe Redis (optionnel)
Configuration des queues
Les configurations par queue sont définies dans backend/src/queue/queue.config.ts :
export const queueJobOptions: Record<string, JobsOptions> = {
exports: {
attempts: 3, // Nombre de tentatives en cas d'échec
backoff: {
delay: 2000, // Délai initial (ms)
type: "exponential", // Type de backoff
},
removeOnComplete: {
age: 3600, // Garder les jobs complétés pendant 1h
count: 100, // Garder les 100 derniers jobs complétés
},
removeOnFail: {
age: 86400, // Garder les jobs échoués pendant 24h
count: 1000, // Garder les 1000 derniers jobs échoués
},
},
// ... autres queues
};
📦 Queues disponibles
1. Exports (exports)
Description : Traitement asynchrone des exports de données (CSV, Excel, PDF, Word, JSON)
Configuration :
- Tentatives : 3
- Timeout : 5 minutes (implicite via processor)
Job Data :
{
entity: "sessions" | "contacts" | "quotes" | "invoices" | "all",
format: "csv" | "excel" | "pdf" | "docx" | "json",
userId: string
}
Processor : ExportProcessor
Utilisation :
const jobId = await queueService.addExportJob({
entity: "sessions",
format: "pdf",
userId: "user-123",
});
2. Google Calendar Sync (google-calendar-sync)
Description : Synchronisation bidirectionnelle Google Calendar ↔ Sessions
Configuration :
- Tentatives : 5
- Timeout : 10 minutes (implicite)
Job Data :
{
userId: string;
}
Processor : GoogleCalendarSyncProcessor
Utilisation :
const jobId = await queueService.addGoogleCalendarSyncJob({
userId: "user-123",
});
3. Scheduled Emails (scheduled-emails)
Description : Envoi d'emails planifiés
Configuration :
- Tentatives : 3
- Timeout : 1 minute (implicite)
Job Data :
{
emailId: string,
userId: string
}
Processor : ScheduledEmailsProcessor
Utilisation :
const jobId = await queueService.addScheduledEmailJob({
emailId: "email-123",
userId: "user-123",
});
4. Workflow Tasks (workflow-tasks)
Description : Exécution de tâches de workflow
Configuration :
- Tentatives : 3
- Timeout : 2 minutes (implicite)
Job Data :
{
taskId: string,
userId: string
}
Processor : WorkflowTasksProcessor
Utilisation :
const jobId = await queueService.addWorkflowTaskJob({
taskId: "task-123",
userId: "user-123",
});
5. Recurring Sessions (recurring-sessions)
Description : Génération d'occurrences futures pour les sessions récurrentes
Configuration :
- Tentatives : 2
- Timeout : 10 minutes (implicite)
Job Data :
{
userId?: string // Optionnel : si non fourni, traite tous les utilisateurs
}
Processor : RecurringSessionsProcessor
Utilisation :
const jobId = await queueService.addRecurringSessionsJob();
// ou pour un utilisateur spécifique :
const jobId = await queueService.addRecurringSessionsJob({
userId: "user-123",
});
6. Audit Cleanup (audit-cleanup)
Description : Nettoyage des anciens logs d'audit
Configuration :
- Tentatives : 2
- Timeout : 5 minutes (implicite)
Job Data :
{
daysToKeep?: number // Optionnel : défaut 30 jours
}
Processor : AuditCleanupProcessor
Utilisation :
const jobId = await queueService.addAuditCleanupJob({ daysToKeep: 30 });
🔧 Processors
Les processors sont des classes qui étendent WorkerHost et implémentent la méthode process() pour traiter les jobs.
Structure d'un Processor
import { Inject } from "@nestjs/common";
import { Processor, WorkerHost } from "@nestjs/bullmq";
import { Job } from "bullmq";
import type { LoggerService } from "@nestjs/common";
import { QueueNames } from "../types/queue-names.types.js";
import { QueueService } from "../queue.service.js";
@Processor(QueueNames.EXPORTS)
export class ExportProcessor extends WorkerHost {
constructor(
private readonly exportService: ExportService,
private readonly queueService: QueueService,
@Inject("LoggerService") logger: LoggerService
) {
super();
}
async process(job: Job<ExportJobData>): Promise<ExportJobResult> {
// 1. Émettre progression initiale
this.queueService.emitJobProgress({
jobId: job.id!,
status: "active",
progress: 0,
message: "Starting...",
});
try {
// 2. Traiter le job
await job.updateProgress(50);
// 3. Émettre progression
this.queueService.emitJobProgress({
jobId: job.id!,
status: "active",
progress: 50,
message: "Processing...",
});
// 4. Compléter le job
await job.updateProgress(100);
this.queueService.emitJobProgress({
jobId: job.id!,
status: "completed",
progress: 100,
message: "Completed",
result: { success: true },
});
return { success: true };
} catch (error) {
// 5. Gérer les erreurs
this.queueService.emitJobProgress({
jobId: job.id!,
status: "failed",
message: error.message,
result: { success: false },
});
throw error;
}
}
}
💻 Utilisation
Dans un Controller
import { QueueService } from "../queue/queue.service.js";
@Controller("export")
export class ExportController {
constructor(private readonly queueService: QueueService) {}
@Get()
async export(@Req() req: RequestWithUser, @Query("entity") entity: string) {
// Ajouter le job à la queue
const jobId = await this.queueService.addExportJob({
entity,
format: "pdf",
userId: req.user.id,
});
// Retourner immédiatement avec le jobId
return {
jobId,
status: "queued",
message: "Export job has been queued",
statusUrl: `/export/status/${jobId}`,
};
}
@Get("status/:jobId")
async getStatus(@Param("jobId") jobId: string) {
const status = await this.queueService.getJobStatus(
QueueNames.EXPORTS,
jobId
);
return status;
}
}
Dans un Scheduler
import { QueueService } from "../queue/queue.service.js";
@Injectable()
export class MyScheduler {
constructor(private readonly queueService: QueueService) {}
@Cron(CronExpression.EVERY_5_MINUTES)
async handleScheduledTask() {
// Au lieu d'exécuter directement :
// await this.service.doWork();
// Ajouter des jobs à la queue :
const jobId = await this.queueService.addSomeJob({
// ... job data
});
}
}
📡 WebSocket & Mises à jour en temps réel
Les mises à jour de progression sont envoyées automatiquement via WebSocket.
Événement WebSocket
Event : queue:job:progress
Payload :
{
jobId: string,
status: "waiting" | "active" | "completed" | "failed" | "delayed",
progress?: number, // 0-100
message?: string,
result?: {
success: boolean,
message?: string,
data?: unknown
}
}
Écoute côté Frontend
import { useAuthenticatedSocketIO } from "@/client/websocket";
const { socket } = useAuthenticatedSocketIO();
socket.on("queue:job:progress", (data) => {
console.log(`Job ${data.jobId}: ${data.status} - ${data.progress}%`);
if (data.status === "completed") {
// Traiter le résultat
console.log("Result:", data.result);
}
});
📊 Monitoring & Debugging
Vérifier le statut d'un job
const status = await queueService.getJobStatus(QueueNames.EXPORTS, jobId);
// Retourne :
{
id: string,
name: string,
state: "waiting" | "active" | "completed" | "failed" | "delayed",
progress?: number,
data: JobData,
returnvalue?: JobResult,
failedReason?: string
}
Logs
Tous les processors loggent automatiquement :
- Début de traitement
- Progression
- Complétion
- Erreurs
Les logs incluent :
- Job ID
- User ID (si applicable)
- Progression
- Messages d'erreur détaillés
🔄 Migration depuis les schedulers
Avant (exécution directe)
@Cron(CronExpression.EVERY_5_MINUTES)
async handleScheduledEmailsExecution() {
const readyEmails = await this.service.getEmailsReadyToSend();
for (const email of readyEmails) {
await this.service.sendScheduledEmail(email.id); // Bloque le thread
}
}
Après (avec queue)
@Cron(CronExpression.EVERY_5_MINUTES)
async handleScheduledEmailsExecution() {
const readyEmails = await this.service.getEmailsReadyToSend();
for (const email of readyEmails) {
// Ajoute à la queue, ne bloque pas
await this.queueService.addScheduledEmailJob({
emailId: email.id,
userId: email.owner_id,
});
}
}
📁 Structure des fichiers
backend/src/queue/
├── README.md # Documentation technique du module
├── queue-core.module.ts # Connexion BullMQ + registerQueue + QueueService
├── queue-client.module.ts # API runtime (enqueue + WebSocket progress)
├── queue-worker.module.ts # Worker runtime (processors base + progress no-op)
├── queue.service.ts # Service pour gérer les queues et jobs
├── queue-runtime.module.ts # Switch runtime (enabled/disabled + client/worker)
├── queue-progress-emitter.ts # Token + interface d’émission de progress
├── queue.config.ts # Configuration des queues (retry, cleanup, etc.)
├── types/
│ ├── job-data.types.ts # Types de données des jobs
│ └── queue-names.types.ts # Noms des queues (enum)
└── processors/
├── export.processor.ts # Processor pour les exports
├── google-calendar-sync.processor.ts
├── scheduled-emails.processor.ts
├── workflow-tasks.processor.ts
├── recurring-sessions.processor.ts
└── audit-cleanup.processor.ts
Documentation :
- QUEUE_SYSTEM.md - Documentation complète avec exemples
- backend/src/queue/README.md (documentation technique) - Documentation technique du module
✅ Avantages
Performance
- Non-bloquant : Les requêtes HTTP retournent immédiatement
- Parallélisation : Plusieurs workers peuvent traiter les jobs simultanément
- Scalabilité : Ajout facile de workers supplémentaires
Fiabilité
- Retry automatique : Les jobs échoués sont automatiquement réessayés
- Persistance : Les jobs sont stockés dans Redis, survivent aux redémarrages
- Monitoring : Suivi complet des jobs (en cours, en attente, échoués)
Expérience utilisateur
- Feedback en temps réel : Mises à jour de progression via WebSocket
- Pas de timeout HTTP : Les jobs longs ne causent plus de timeout
- Meilleure réactivité : L'interface reste responsive pendant le traitement
🚀 Améliorations possibles
Monitoring & Observabilité
Outils de monitoring disponibles
Plusieurs outils permettent de suivre et gérer les queues BullMQ :
1. Bull Board (Recommandé - Open Source) ⭐ ✅ IMPLÉMENTÉ
Description : Dashboard open-source officiel pour BullMQ, intégration facile avec NestJS.
Installation :
npm install @bull-board/nestjs @bull-board/api @bull-board/express
Intégration :
Le module QueueBoardModule est déjà configuré et intégré dans l'application.
Accès : http://localhost:8080/admin/queues (ou votre URL backend + /admin/queues)
Note : La route est exclue du préfixe global /api pour un accès direct. Elle est accessible sans authentification pour le moment (à protéger en production).
Fichiers :
backend/src/queue/queue-board.module.ts- Module Bull Board- Intégré dans
backend/src/app.module.ts
Fonctionnalités :
- ✅ Visualisation des jobs en temps réel
- ✅ Statistiques par queue (waiting, active, completed, failed)
- ✅ Retry manuel des jobs échoués
- ✅ Purge des queues
- ✅ Filtrage et recherche de jobs
- ✅ Détails complets des jobs (payload, logs, etc.)
Lien : https://github.com/felixmosh/bull-board
2. Upqueue.io (SaaS - Payant)
Description : Plateforme SaaS complète pour la gestion de BullMQ.
Fonctionnalités :
- Connexions multiples avec auto-détection des queues
- Alertes configurables (Slack, email, webhooks)
- Dashboard avec analytics et métriques
- Gestion avancée des jobs (retry, delay, etc.)
Prix : À partir de $19.99/mois (code promo : EARLYBIRD40)
Lien : https://upqueue.io
3. Queuedash (SaaS - Freemium)
Description : Monitoring en temps réel avec historique et analytics.
Fonctionnalités :
- Live Job Stream (mises à jour instantanées)
- Alertes intelligentes (stalls, spikes)
- Historique recherchable
- Analytics de performance (throughput, latency, error rates)
Prix : Plan gratuit (1000 jobs), puis payant
Lien : https://queuedash.dev
4. Queue Monitor (SaaS - Multi-queues)
Description : Plateforme unifiée supportant BullMQ, AWS SQS, Azure Service Bus.
Fonctionnalités :
- Support multi-queues
- Analytics en temps réel
- Alertes personnalisées
- Gestion complète des queues
Prix : Essai gratuit, puis payant
Lien : https://queuemonitor.org
5. BullMQ Exporter (Open Source)
Description : Exporteur Prometheus + dashboard sécurisé pour BullMQ.
Fonctionnalités :
- Métriques Prometheus
- Dashboard sécurisé (login requis)
- Intégration avec Grafana
Lien : https://github.com/ron96G/bullmq-exporter
6. Redis Insight (Gratuit - Redis Labs)
Description : Interface graphique officielle pour Redis (inclut la visualisation des queues).
Fonctionnalités :
- Visualisation des données Redis
- Analyse des clés et structures
- Monitoring des performances
- Support des structures BullMQ
Lien : https://redis.io/insight
Recommandation
Pour un projet open-source ou auto-hébergé, Bull Board est la meilleure option :
- ✅ Gratuit et open-source
- ✅ Intégration facile avec NestJS
- ✅ Fonctionnalités complètes
- ✅ Communauté active
- ✅ Pas de dépendance externe
Pour un projet nécessitant un monitoring avancé et des alertes, Upqueue.io ou Queuedash sont de bonnes options SaaS.
Métriques Prometheus
Exporter des métriques pour monitoring :
// Exemple de métriques à exposer
queue_jobs_total{queue="exports",status="waiting"} 5
queue_jobs_total{queue="exports",status="active"} 2
queue_jobs_total{queue="exports",status="completed"} 150
queue_jobs_total{queue="exports",status="failed"} 3
queue_job_duration_seconds{queue="exports"} 45.2
Priorisation des jobs
Ajouter des priorités pour traiter certains jobs en premier :
// Dans queue.service.ts
async addExportJob(data: ExportJobData, priority: number = 0) {
const job = await this.exportsQueue.add("export", data, {
priority, // Plus le nombre est élevé, plus la priorité est haute
});
return job.id!;
}
// Utilisation
await queueService.addExportJob(data, 10); // Haute priorité
await queueService.addExportJob(data, 0); // Priorité normale
Rate Limiting par type de job
Limiter le nombre de jobs traités simultanément :
// Dans queue.config.ts
export const queueJobOptions: Record<string, JobsOptions> = {
exports: {
...defaultJobOptions,
concurrency: 2, // Maximum 2 exports en parallèle
},
"scheduled-emails": {
...defaultJobOptions,
concurrency: 5, // Maximum 5 emails en parallèle
},
};
Jobs récurrents (Scheduled Jobs)
Planifier des jobs récurrents directement dans BullMQ :
// Ajouter un job récurrent
await this.exportsQueue.add(
"daily-export",
{ userId: "user-123", entity: "sessions", format: "csv" },
{
repeat: {
pattern: "0 2 * * *", // Tous les jours à 2h du matin
},
}
);
Dead Letter Queue
Créer une queue spéciale pour les jobs qui échouent définitivement :
// Dans queue.config.ts
export const queueJobOptions: Record<string, JobsOptions> = {
exports: {
...defaultJobOptions,
attempts: 3,
removeOnFail: false, // Garder les jobs échoués
},
};
// Processor pour dead letter queue
@Processor(QueueNames.DEAD_LETTER)
export class DeadLetterProcessor extends WorkerHost {
async process(job: Job) {
// Logger l'erreur, envoyer une alerte, etc.
this.logger.error(`Job failed permanently: ${job.id}`);
}
}
Job Progress détaillé
Améliorer le suivi de progression avec des étapes nommées :
interface DetailedProgress {
currentStep: string;
totalSteps: number;
stepProgress: number; // 0-100 pour l'étape actuelle
overallProgress: number; // 0-100 global
}
// Dans le processor
await job.updateProgress({
currentStep: "Generating PDF",
totalSteps: 5,
stepProgress: 75,
overallProgress: 35, // Étape 3 sur 5, 75% de l'étape = 35% global
});
Job Batching
Grouper plusieurs jobs similaires pour traitement en batch :
// Ajouter plusieurs jobs en batch
const jobs = await this.exportsQueue.addBulk([
{ name: "export", data: { entity: "sessions", userId: "user-1" } },
{ name: "export", data: { entity: "contacts", userId: "user-1" } },
{ name: "export", data: { entity: "quotes", userId: "user-1" } },
]);
Job Delay
Retarder l'exécution d'un job :
// Exécuter le job dans 1 heure
await this.queueService.addExportJob(data, {
delay: 3600000, // 1 heure en millisecondes
});
Job Timeout personnalisé
Configurer des timeouts spécifiques par job :
// Dans le processor
async process(job: Job) {
// Timeout de 10 minutes pour ce job spécifique
const timeout = setTimeout(() => {
job.moveToFailed(new Error("Job timeout"), "0");
}, 600000);
try {
// Traitement du job
await this.doWork();
clearTimeout(timeout);
} catch (error) {
clearTimeout(timeout);
throw error;
}
}
Job Dependencies
Créer des dépendances entre jobs :
// Job 2 dépend de Job 1
const job1 = await this.queue.add("step1", data1);
const job2 = await this.queue.add("step2", data2, {
parent: {
id: job1.id!,
queue: this.queue.name,
},
});
Job Events & Hooks
Écouter les événements de jobs pour logging/alertes :
// Dans queue.module.ts ou un service dédié
this.exportsQueue.on("completed", (job) => {
this.logger.log(`Job ${job.id} completed`);
});
this.exportsQueue.on("failed", (job, error) => {
this.logger.error(`Job ${job.id} failed: ${error.message}`);
// Envoyer une alerte si nécessaire
});
Retry avec stratégie personnalisée
Stratégies de retry plus sophistiquées :
// Retry avec délai personnalisé
const retryStrategy = (attemptsMade: number, error: Error) => {
if (error.message.includes("rate limit")) {
return 60000; // Attendre 1 minute pour rate limit
}
return Math.min(attemptsMade * 2000, 30000); // Backoff exponentiel max 30s
};
await this.queue.add("job", data, {
attempts: 5,
backoff: {
type: "custom",
delay: retryStrategy,
},
});
Job Metadata & Tags
Ajouter des métadonnées et tags pour faciliter le filtrage :
await this.queue.add("export", data, {
jobId: `export-${userId}-${Date.now()}`,
tags: ["user-export", "pdf", "priority-high"],
meta: {
userId,
entity,
format,
requestedAt: new Date().toISOString(),
},
});
📖 Meilleures pratiques
1. Gestion des erreurs
Toujours logger les erreurs avec le contexte complet :
catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(
`Job failed → JobID: ${job.id} | UserID: ${userId} | Error: ${errorMessage}`,
error instanceof Error ? error.stack : undefined,
undefined,
{
emoji: "❌",
scope: "QUEUE_PROCESSOR",
jobId: job.id,
userId,
},
);
throw error; // Laisser BullMQ gérer le retry
}
2. Mises à jour de progression
Émettre des mises à jour régulières pour les jobs longs :
// Bon : Mises à jour régulières
await job.updateProgress(25);
this.queueService.emitJobProgress({ ... });
await job.updateProgress(50);
this.queueService.emitJobProgress({ ... });
// Mauvais : Pas de mise à jour
// Le job tourne sans feedback pendant 5 minutes
3. Idempotence
Rendre les jobs idempotents (peuvent être exécutés plusieurs fois sans effet de bord) :
async process(job: Job) {
// Vérifier si le job a déjà été traité
const existing = await this.db.getJobResult(job.id);
if (existing) {
return existing; // Retourner le résultat existant
}
// Traiter le job
const result = await this.doWork();
// Stocker le résultat
await this.db.saveJobResult(job.id, result);
return result;
}
4. Limitation de la taille des données
Éviter de passer de grandes quantités de données dans le job :
// Bon : Passer seulement les IDs
await queueService.addExportJob({
userId: "user-123",
entity: "sessions",
format: "pdf",
});
// Mauvais : Passer toutes les données
await queueService.addExportJob({
userId: "user-123",
data: [...10000 sessions...], // Trop de données
});
5. Nettoyage des jobs
Configurer correctement le nettoyage pour éviter l'accumulation :
removeOnComplete: {
age: 3600, // Garder 1h
count: 100, // Ou les 100 derniers
},
removeOnFail: {
age: 86400, // Garder 24h pour debugging
count: 1000, // Ou les 1000 derniers
},
🔍 Debugging
Vérifier le statut d'un job
const status = await queueService.getJobStatus(QueueNames.EXPORTS, jobId);
console.log(status);
// {
// id: "export-user-123-sessions-1234567890",
// name: "export",
// state: "completed",
// progress: 100,
// data: { entity: "sessions", format: "pdf", userId: "user-123" },
// returnvalue: { success: true, url: "..." },
// }
Lister les jobs d'une queue
// Jobs en attente
const waiting = await this.exportsQueue.getWaiting();
// Jobs actifs
const active = await this.exportsQueue.getActive();
// Jobs complétés
const completed = await this.exportsQueue.getCompleted(0, 10);
// Jobs échoués
const failed = await this.exportsQueue.getFailed(0, 10);
Retry manuel d'un job échoué
const job = await this.exportsQueue.getJob(jobId);
if (job && (await job.getState()) === "failed") {
await job.retry(); // Réessayer le job
}
Purger une queue
// Supprimer tous les jobs complétés
await this.exportsQueue.clean(0, 100, "completed");
// Supprimer tous les jobs échoués
await this.exportsQueue.clean(0, 100, "failed");
📊 Exemples d'utilisation avancés
Export avec notification email
// Dans ExportProcessor
async process(job: Job<ExportJobData>) {
// ... génération de l'export ...
// Envoyer un email de notification
await this.emailService.send({
to: user.email,
subject: "Votre export est prêt",
body: `Votre export ${entity} est disponible : ${signedUrl}`,
});
return result;
}
Job avec étapes multiples
async process(job: Job) {
// Étape 1
await job.updateProgress(10);
const data = await this.fetchData();
// Étape 2
await job.updateProgress(30);
const processed = await this.processData(data);
// Étape 3
await job.updateProgress(60);
const formatted = await this.formatData(processed);
// Étape 4
await job.updateProgress(90);
const uploaded = await this.uploadData(formatted);
// Complété
await job.updateProgress(100);
return uploaded;
}
Job conditionnel
async process(job: Job) {
const { userId, entity } = job.data;
// Vérifier si l'utilisateur a le droit d'exporter
const canExport = await this.permissionsService.canExport(userId, entity);
if (!canExport) {
throw new Error("User does not have permission to export");
}
// Continuer le traitement
// ...
}
🚀 Prochaines améliorations
- Dashboard de monitoring des queues (Bull Board)
- Priorisation des jobs (high/medium/low)
- Rate limiting par type de job
- Jobs récurrents (scheduled jobs)
- Dead letter queue pour les jobs qui échouent définitivement
- Métriques et alertes (Prometheus/Grafana)
- Job batching pour optimiser les traitements groupés
- Job dependencies pour chaîner les jobs
- Job events & hooks pour logging avancé
- Retry avec stratégie personnalisée selon le type d'erreur
🎓 Exemples pratiques
Exemple 1 : Export avec suivi de progression
Frontend :
// pages/export/ExportPage.tsx
const { socket } = useAuthenticatedSocketIO();
const [exportStatus, setExportStatus] = useState<JobProgress | null>(null);
useEffect(() => {
socket.on("queue:job:progress", (progress: JobProgress) => {
if (progress.jobId === currentJobId) {
setExportStatus(progress);
if (progress.status === "completed") {
// Télécharger le fichier
window.location.href = progress.result?.data?.url;
}
}
});
return () => {
socket.off("queue:job:progress");
};
}, [currentJobId]);
const handleExport = async () => {
const response = await api.get("/export", {
params: { entity: "sessions", format: "pdf" },
});
setCurrentJobId(response.data.jobId);
setExportStatus({ status: "waiting", jobId: response.data.jobId });
};
Backend :
// export.controller.ts
@Get()
async export(@Req() req: RequestWithUser, @Query("entity") entity: string) {
const jobId = await this.queueService.addExportJob({
entity,
format: "pdf",
userId: req.user.id,
});
return { jobId, status: "queued" };
}
Exemple 2 : Scheduler avec queue
Avant (bloquant) :
@Cron(CronExpression.EVERY_5_MINUTES)
async handleScheduledEmails() {
const emails = await this.service.getEmailsReadyToSend();
for (const email of emails) {
await this.service.sendEmail(email.id); // Bloque pendant l'envoi
}
}
Après (non-bloquant) :
@Cron(CronExpression.EVERY_5_MINUTES)
async handleScheduledEmails() {
const emails = await this.service.getEmailsReadyToSend();
for (const email of emails) {
// Ajoute à la queue, retourne immédiatement
await this.queueService.addScheduledEmailJob({
emailId: email.id,
userId: email.owner_id,
});
}
}
Exemple 3 : Job avec retry conditionnel
async process(job: Job) {
try {
await this.externalApiCall();
} catch (error) {
// Si c'est une erreur temporaire (rate limit), retry
if (error.status === 429) {
throw error; // BullMQ va retry automatiquement
}
// Si c'est une erreur permanente, ne pas retry
if (error.status === 400) {
await job.moveToFailed(error, "0"); // Ne pas retry
return;
}
throw error;
}
}
🔧 Configuration avancée
Connexion Redis personnalisée
// Dans queue.module.ts
BullModule.forRoot({
connection: {
host: process.env.REDIS_HOST || "redis",
port: parseInt(process.env.REDIS_PORT || "6379", 10),
password: process.env.REDIS_PASSWORD,
maxRetriesPerRequest: 3,
retryStrategy: (times: number) => {
const delay = Math.min(times * 50, 2000);
return delay;
},
// Options supplémentaires
enableReadyCheck: true,
enableOfflineQueue: true,
},
});
Configuration par environnement
// queue.config.ts
const isProduction = process.env.NODE_ENV === "production";
export const queueJobOptions: Record<string, JobsOptions> = {
exports: {
...defaultJobOptions,
attempts: isProduction ? 3 : 1, // Moins de retries en dev
removeOnComplete: {
age: isProduction ? 3600 : 300, // Garder moins longtemps en dev
count: isProduction ? 100 : 10,
},
},
};
📚 Références
- BullMQ Documentation
- @nestjs/bullmq Documentation
- Redis Documentation
- Bull Board - Dashboard pour BullMQ