Aller au contenu principal

Système de Queue pour Jobs Lourds

Dernière mise à jour : 2025-01-XX

Documentation complète du système de queue basé sur BullMQ pour le traitement asynchrone des jobs lourds.

🎯 Vue d'ensemble

Le système de queue permet de découpler les opérations longues du thread principal de l'application, offrant :

  • Traitement asynchrone : Les jobs sont exécutés en arrière-plan sans bloquer les requêtes HTTP
  • Retry automatique : Retry avec backoff exponentiel en cas d'échec
  • Scalabilité horizontale : Support de workers multiples pour traiter les jobs en parallèle
  • Monitoring : Suivi des jobs (en cours, en attente, échoués)
  • Priorisation : Possibilité de prioriser certains types de jobs
  • Rate limiting : Limitation du nombre de jobs par type
  • Mises à jour en temps réel : Notifications WebSocket pour suivre la progression des jobs

🏗️ Architecture

┌─────────────────┐
│ Controller │
│ / Scheduler │
└────────┬────────┘


┌─────────────────┐
│ QueueService │
│ (add jobs) │
└────────┬────────┘


┌─────────────────┐
│ Queue (Redis) │
│ (BullMQ) │
└────────┬────────┘


┌─────────────────┐
│ Processor │
│ (WorkerHost) │
└────────┬────────┘


┌─────────────────┐
│ Service │
│ (Business │
│ Logic) │
└─────────────────┘


┌─────────────────┐
│ WebSocket │
│ (Status │
│ Updates) │
└─────────────────┘

Flux de traitement

  1. Enqueue : Un controller ou scheduler ajoute un job à la queue via QueueService
  2. Queue : Le job est stocké dans Redis (BullMQ)
  3. Process : Un processor (WorkerHost) récupère et traite le job
  4. Update : Les mises à jour de progression sont envoyées via WebSocket
  5. Complete/Fail : Le job est marqué comme complété ou échoué

⚙️ Configuration

Variables d'environnement

Le système utilise les mêmes variables Redis que le cache :

# Redis configuration (utilisé par BullMQ)
REDIS_ENABLED=true # Activer Redis (défaut: true)
REDIS_HOST=redis # Host Redis (défaut: redis)
REDIS_PORT=6379 # Port Redis (défaut: 6379)
REDIS_PASSWORD= # Mot de passe Redis (optionnel)

Configuration des queues

Les configurations par queue sont définies dans backend/src/queue/queue.config.ts :

export const queueJobOptions: Record<string, JobsOptions> = {
exports: {
attempts: 3, // Nombre de tentatives en cas d'échec
backoff: {
delay: 2000, // Délai initial (ms)
type: "exponential", // Type de backoff
},
removeOnComplete: {
age: 3600, // Garder les jobs complétés pendant 1h
count: 100, // Garder les 100 derniers jobs complétés
},
removeOnFail: {
age: 86400, // Garder les jobs échoués pendant 24h
count: 1000, // Garder les 1000 derniers jobs échoués
},
},
// ... autres queues
};

📦 Queues disponibles

1. Exports (exports)

Description : Traitement asynchrone des exports de données (CSV, Excel, PDF, Word, JSON)

Configuration :

  • Tentatives : 3
  • Timeout : 5 minutes (implicite via processor)

Job Data :

{
entity: "sessions" | "contacts" | "quotes" | "invoices" | "all",
format: "csv" | "excel" | "pdf" | "docx" | "json",
userId: string
}

Processor : ExportProcessor

Utilisation :

const jobId = await queueService.addExportJob({
entity: "sessions",
format: "pdf",
userId: "user-123",
});

2. Google Calendar Sync (google-calendar-sync)

Description : Synchronisation bidirectionnelle Google Calendar ↔ Sessions

Configuration :

  • Tentatives : 5
  • Timeout : 10 minutes (implicite)

Job Data :

{
userId: string;
}

Processor : GoogleCalendarSyncProcessor

Utilisation :

const jobId = await queueService.addGoogleCalendarSyncJob({
userId: "user-123",
});

3. Scheduled Emails (scheduled-emails)

Description : Envoi d'emails planifiés

Configuration :

  • Tentatives : 3
  • Timeout : 1 minute (implicite)

Job Data :

{
emailId: string,
userId: string
}

Processor : ScheduledEmailsProcessor

Utilisation :

const jobId = await queueService.addScheduledEmailJob({
emailId: "email-123",
userId: "user-123",
});

4. Workflow Tasks (workflow-tasks)

Description : Exécution de tâches de workflow

Configuration :

  • Tentatives : 3
  • Timeout : 2 minutes (implicite)

Job Data :

{
taskId: string,
userId: string
}

Processor : WorkflowTasksProcessor

Utilisation :

const jobId = await queueService.addWorkflowTaskJob({
taskId: "task-123",
userId: "user-123",
});

5. Recurring Sessions (recurring-sessions)

Description : Génération d'occurrences futures pour les sessions récurrentes

Configuration :

  • Tentatives : 2
  • Timeout : 10 minutes (implicite)

Job Data :

{
userId?: string // Optionnel : si non fourni, traite tous les utilisateurs
}

Processor : RecurringSessionsProcessor

Utilisation :

const jobId = await queueService.addRecurringSessionsJob();
// ou pour un utilisateur spécifique :
const jobId = await queueService.addRecurringSessionsJob({
userId: "user-123",
});

6. Audit Cleanup (audit-cleanup)

Description : Nettoyage des anciens logs d'audit

Configuration :

  • Tentatives : 2
  • Timeout : 5 minutes (implicite)

Job Data :

{
daysToKeep?: number // Optionnel : défaut 30 jours
}

Processor : AuditCleanupProcessor

Utilisation :

const jobId = await queueService.addAuditCleanupJob({ daysToKeep: 30 });

🔧 Processors

Les processors sont des classes qui étendent WorkerHost et implémentent la méthode process() pour traiter les jobs.

Structure d'un Processor

import { Inject } from "@nestjs/common";
import { Processor, WorkerHost } from "@nestjs/bullmq";
import { Job } from "bullmq";
import type { LoggerService } from "@nestjs/common";

import { QueueNames } from "../types/queue-names.types.js";
import { QueueService } from "../queue.service.js";

@Processor(QueueNames.EXPORTS)
export class ExportProcessor extends WorkerHost {
constructor(
private readonly exportService: ExportService,
private readonly queueService: QueueService,
@Inject("LoggerService") logger: LoggerService
) {
super();
}

async process(job: Job<ExportJobData>): Promise<ExportJobResult> {
// 1. Émettre progression initiale
this.queueService.emitJobProgress({
jobId: job.id!,
status: "active",
progress: 0,
message: "Starting...",
});

try {
// 2. Traiter le job
await job.updateProgress(50);

// 3. Émettre progression
this.queueService.emitJobProgress({
jobId: job.id!,
status: "active",
progress: 50,
message: "Processing...",
});

// 4. Compléter le job
await job.updateProgress(100);
this.queueService.emitJobProgress({
jobId: job.id!,
status: "completed",
progress: 100,
message: "Completed",
result: { success: true },
});

return { success: true };
} catch (error) {
// 5. Gérer les erreurs
this.queueService.emitJobProgress({
jobId: job.id!,
status: "failed",
message: error.message,
result: { success: false },
});
throw error;
}
}
}

💻 Utilisation

Dans un Controller

import { QueueService } from "../queue/queue.service.js";

@Controller("export")
export class ExportController {
constructor(private readonly queueService: QueueService) {}

@Get()
async export(@Req() req: RequestWithUser, @Query("entity") entity: string) {
// Ajouter le job à la queue
const jobId = await this.queueService.addExportJob({
entity,
format: "pdf",
userId: req.user.id,
});

// Retourner immédiatement avec le jobId
return {
jobId,
status: "queued",
message: "Export job has been queued",
statusUrl: `/export/status/${jobId}`,
};
}

@Get("status/:jobId")
async getStatus(@Param("jobId") jobId: string) {
const status = await this.queueService.getJobStatus(
QueueNames.EXPORTS,
jobId
);
return status;
}
}

Dans un Scheduler

import { QueueService } from "../queue/queue.service.js";

@Injectable()
export class MyScheduler {
constructor(private readonly queueService: QueueService) {}

@Cron(CronExpression.EVERY_5_MINUTES)
async handleScheduledTask() {
// Au lieu d'exécuter directement :
// await this.service.doWork();

// Ajouter des jobs à la queue :
const jobId = await this.queueService.addSomeJob({
// ... job data
});
}
}

📡 WebSocket & Mises à jour en temps réel

Les mises à jour de progression sont envoyées automatiquement via WebSocket.

Événement WebSocket

Event : queue:job:progress

Payload :

{
jobId: string,
status: "waiting" | "active" | "completed" | "failed" | "delayed",
progress?: number, // 0-100
message?: string,
result?: {
success: boolean,
message?: string,
data?: unknown
}
}

Écoute côté Frontend

import { useAuthenticatedSocketIO } from "@/client/websocket";

const { socket } = useAuthenticatedSocketIO();

socket.on("queue:job:progress", (data) => {
console.log(`Job ${data.jobId}: ${data.status} - ${data.progress}%`);
if (data.status === "completed") {
// Traiter le résultat
console.log("Result:", data.result);
}
});

📊 Monitoring & Debugging

Vérifier le statut d'un job

const status = await queueService.getJobStatus(QueueNames.EXPORTS, jobId);

// Retourne :
{
id: string,
name: string,
state: "waiting" | "active" | "completed" | "failed" | "delayed",
progress?: number,
data: JobData,
returnvalue?: JobResult,
failedReason?: string
}

Logs

Tous les processors loggent automatiquement :

  • Début de traitement
  • Progression
  • Complétion
  • Erreurs

Les logs incluent :

  • Job ID
  • User ID (si applicable)
  • Progression
  • Messages d'erreur détaillés

🔄 Migration depuis les schedulers

Avant (exécution directe)

@Cron(CronExpression.EVERY_5_MINUTES)
async handleScheduledEmailsExecution() {
const readyEmails = await this.service.getEmailsReadyToSend();
for (const email of readyEmails) {
await this.service.sendScheduledEmail(email.id); // Bloque le thread
}
}

Après (avec queue)

@Cron(CronExpression.EVERY_5_MINUTES)
async handleScheduledEmailsExecution() {
const readyEmails = await this.service.getEmailsReadyToSend();
for (const email of readyEmails) {
// Ajoute à la queue, ne bloque pas
await this.queueService.addScheduledEmailJob({
emailId: email.id,
userId: email.owner_id,
});
}
}

📁 Structure des fichiers

backend/src/queue/
├── README.md # Documentation technique du module
├── queue.module.ts # Module principal avec configuration BullMQ
├── queue.service.ts # Service pour gérer les queues et jobs
├── queue.config.ts # Configuration des queues (retry, cleanup, etc.)
├── types/
│ ├── job-data.types.ts # Types de données des jobs
│ └── queue-names.types.ts # Noms des queues (enum)
└── processors/
├── export.processor.ts # Processor pour les exports
├── google-calendar-sync.processor.ts
├── scheduled-emails.processor.ts
├── workflow-tasks.processor.ts
├── recurring-sessions.processor.ts
└── audit-cleanup.processor.ts

Documentation :

  • QUEUE_SYSTEM.md - Documentation complète avec exemples
  • backend/src/queue/README.md (documentation technique) - Documentation technique du module

✅ Avantages

Performance

  • Non-bloquant : Les requêtes HTTP retournent immédiatement
  • Parallélisation : Plusieurs workers peuvent traiter les jobs simultanément
  • Scalabilité : Ajout facile de workers supplémentaires

Fiabilité

  • Retry automatique : Les jobs échoués sont automatiquement réessayés
  • Persistance : Les jobs sont stockés dans Redis, survivent aux redémarrages
  • Monitoring : Suivi complet des jobs (en cours, en attente, échoués)

Expérience utilisateur

  • Feedback en temps réel : Mises à jour de progression via WebSocket
  • Pas de timeout HTTP : Les jobs longs ne causent plus de timeout
  • Meilleure réactivité : L'interface reste responsive pendant le traitement

🚀 Améliorations possibles

Monitoring & Observabilité

Outils de monitoring disponibles

Plusieurs outils permettent de suivre et gérer les queues BullMQ :

1. Bull Board (Recommandé - Open Source) ⭐ ✅ IMPLÉMENTÉ

Description : Dashboard open-source officiel pour BullMQ, intégration facile avec NestJS.

Installation :

npm install @bull-board/nestjs @bull-board/api @bull-board/express

Intégration :

Le module QueueBoardModule est déjà configuré et intégré dans l'application.

Accès : http://localhost:8080/admin/queues (ou votre URL backend + /admin/queues)

Note : La route est exclue du préfixe global /api pour un accès direct. Elle est accessible sans authentification pour le moment (à protéger en production).

Fichiers :

  • backend/src/queue/queue-board.module.ts - Module Bull Board
  • Intégré dans backend/src/app.module.ts

Fonctionnalités :

  • ✅ Visualisation des jobs en temps réel
  • ✅ Statistiques par queue (waiting, active, completed, failed)
  • ✅ Retry manuel des jobs échoués
  • ✅ Purge des queues
  • ✅ Filtrage et recherche de jobs
  • ✅ Détails complets des jobs (payload, logs, etc.)

Lien : https://github.com/felixmosh/bull-board

2. Upqueue.io (SaaS - Payant)

Description : Plateforme SaaS complète pour la gestion de BullMQ.

Fonctionnalités :

  • Connexions multiples avec auto-détection des queues
  • Alertes configurables (Slack, email, webhooks)
  • Dashboard avec analytics et métriques
  • Gestion avancée des jobs (retry, delay, etc.)

Prix : À partir de $19.99/mois (code promo : EARLYBIRD40)

Lien : https://upqueue.io

3. Queuedash (SaaS - Freemium)

Description : Monitoring en temps réel avec historique et analytics.

Fonctionnalités :

  • Live Job Stream (mises à jour instantanées)
  • Alertes intelligentes (stalls, spikes)
  • Historique recherchable
  • Analytics de performance (throughput, latency, error rates)

Prix : Plan gratuit (1000 jobs), puis payant

Lien : https://queuedash.dev

4. Queue Monitor (SaaS - Multi-queues)

Description : Plateforme unifiée supportant BullMQ, AWS SQS, Azure Service Bus.

Fonctionnalités :

  • Support multi-queues
  • Analytics en temps réel
  • Alertes personnalisées
  • Gestion complète des queues

Prix : Essai gratuit, puis payant

Lien : https://queuemonitor.org

5. BullMQ Exporter (Open Source)

Description : Exporteur Prometheus + dashboard sécurisé pour BullMQ.

Fonctionnalités :

  • Métriques Prometheus
  • Dashboard sécurisé (login requis)
  • Intégration avec Grafana

Lien : https://github.com/ron96G/bullmq-exporter

6. Redis Insight (Gratuit - Redis Labs)

Description : Interface graphique officielle pour Redis (inclut la visualisation des queues).

Fonctionnalités :

  • Visualisation des données Redis
  • Analyse des clés et structures
  • Monitoring des performances
  • Support des structures BullMQ

Lien : https://redis.io/insight

Recommandation

Pour un projet open-source ou auto-hébergé, Bull Board est la meilleure option :

  • ✅ Gratuit et open-source
  • ✅ Intégration facile avec NestJS
  • ✅ Fonctionnalités complètes
  • ✅ Communauté active
  • ✅ Pas de dépendance externe

Pour un projet nécessitant un monitoring avancé et des alertes, Upqueue.io ou Queuedash sont de bonnes options SaaS.

Métriques Prometheus

Exporter des métriques pour monitoring :

// Exemple de métriques à exposer
queue_jobs_total{queue="exports",status="waiting"} 5
queue_jobs_total{queue="exports",status="active"} 2
queue_jobs_total{queue="exports",status="completed"} 150
queue_jobs_total{queue="exports",status="failed"} 3
queue_job_duration_seconds{queue="exports"} 45.2

Priorisation des jobs

Ajouter des priorités pour traiter certains jobs en premier :

// Dans queue.service.ts
async addExportJob(data: ExportJobData, priority: number = 0) {
const job = await this.exportsQueue.add("export", data, {
priority, // Plus le nombre est élevé, plus la priorité est haute
});
return job.id!;
}

// Utilisation
await queueService.addExportJob(data, 10); // Haute priorité
await queueService.addExportJob(data, 0); // Priorité normale

Rate Limiting par type de job

Limiter le nombre de jobs traités simultanément :

// Dans queue.config.ts
export const queueJobOptions: Record<string, JobsOptions> = {
exports: {
...defaultJobOptions,
concurrency: 2, // Maximum 2 exports en parallèle
},
"scheduled-emails": {
...defaultJobOptions,
concurrency: 5, // Maximum 5 emails en parallèle
},
};

Jobs récurrents (Scheduled Jobs)

Planifier des jobs récurrents directement dans BullMQ :

// Ajouter un job récurrent
await this.exportsQueue.add(
"daily-export",
{ userId: "user-123", entity: "sessions", format: "csv" },
{
repeat: {
pattern: "0 2 * * *", // Tous les jours à 2h du matin
},
}
);

Dead Letter Queue

Créer une queue spéciale pour les jobs qui échouent définitivement :

// Dans queue.config.ts
export const queueJobOptions: Record<string, JobsOptions> = {
exports: {
...defaultJobOptions,
attempts: 3,
removeOnFail: false, // Garder les jobs échoués
},
};

// Processor pour dead letter queue
@Processor(QueueNames.DEAD_LETTER)
export class DeadLetterProcessor extends WorkerHost {
async process(job: Job) {
// Logger l'erreur, envoyer une alerte, etc.
this.logger.error(`Job failed permanently: ${job.id}`);
}
}

Job Progress détaillé

Améliorer le suivi de progression avec des étapes nommées :

interface DetailedProgress {
currentStep: string;
totalSteps: number;
stepProgress: number; // 0-100 pour l'étape actuelle
overallProgress: number; // 0-100 global
}

// Dans le processor
await job.updateProgress({
currentStep: "Generating PDF",
totalSteps: 5,
stepProgress: 75,
overallProgress: 35, // Étape 3 sur 5, 75% de l'étape = 35% global
});

Job Batching

Grouper plusieurs jobs similaires pour traitement en batch :

// Ajouter plusieurs jobs en batch
const jobs = await this.exportsQueue.addBulk([
{ name: "export", data: { entity: "sessions", userId: "user-1" } },
{ name: "export", data: { entity: "contacts", userId: "user-1" } },
{ name: "export", data: { entity: "quotes", userId: "user-1" } },
]);

Job Delay

Retarder l'exécution d'un job :

// Exécuter le job dans 1 heure
await this.queueService.addExportJob(data, {
delay: 3600000, // 1 heure en millisecondes
});

Job Timeout personnalisé

Configurer des timeouts spécifiques par job :

// Dans le processor
async process(job: Job) {
// Timeout de 10 minutes pour ce job spécifique
const timeout = setTimeout(() => {
job.moveToFailed(new Error("Job timeout"), "0");
}, 600000);

try {
// Traitement du job
await this.doWork();
clearTimeout(timeout);
} catch (error) {
clearTimeout(timeout);
throw error;
}
}

Job Dependencies

Créer des dépendances entre jobs :

// Job 2 dépend de Job 1
const job1 = await this.queue.add("step1", data1);
const job2 = await this.queue.add("step2", data2, {
parent: {
id: job1.id!,
queue: this.queue.name,
},
});

Job Events & Hooks

Écouter les événements de jobs pour logging/alertes :

// Dans queue.module.ts ou un service dédié
this.exportsQueue.on("completed", (job) => {
this.logger.log(`Job ${job.id} completed`);
});

this.exportsQueue.on("failed", (job, error) => {
this.logger.error(`Job ${job.id} failed: ${error.message}`);
// Envoyer une alerte si nécessaire
});

Retry avec stratégie personnalisée

Stratégies de retry plus sophistiquées :

// Retry avec délai personnalisé
const retryStrategy = (attemptsMade: number, error: Error) => {
if (error.message.includes("rate limit")) {
return 60000; // Attendre 1 minute pour rate limit
}
return Math.min(attemptsMade * 2000, 30000); // Backoff exponentiel max 30s
};

await this.queue.add("job", data, {
attempts: 5,
backoff: {
type: "custom",
delay: retryStrategy,
},
});

Job Metadata & Tags

Ajouter des métadonnées et tags pour faciliter le filtrage :

await this.queue.add("export", data, {
jobId: `export-${userId}-${Date.now()}`,
tags: ["user-export", "pdf", "priority-high"],
meta: {
userId,
entity,
format,
requestedAt: new Date().toISOString(),
},
});

📖 Meilleures pratiques

1. Gestion des erreurs

Toujours logger les erreurs avec le contexte complet :

catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);

this.logger.error(
`Job failed → JobID: ${job.id} | UserID: ${userId} | Error: ${errorMessage}`,
error instanceof Error ? error.stack : undefined,
undefined,
{
emoji: "❌",
scope: "QUEUE_PROCESSOR",
jobId: job.id,
userId,
},
);

throw error; // Laisser BullMQ gérer le retry
}

2. Mises à jour de progression

Émettre des mises à jour régulières pour les jobs longs :

// Bon : Mises à jour régulières
await job.updateProgress(25);
this.queueService.emitJobProgress({ ... });

await job.updateProgress(50);
this.queueService.emitJobProgress({ ... });

// Mauvais : Pas de mise à jour
// Le job tourne sans feedback pendant 5 minutes

3. Idempotence

Rendre les jobs idempotents (peuvent être exécutés plusieurs fois sans effet de bord) :

async process(job: Job) {
// Vérifier si le job a déjà été traité
const existing = await this.db.getJobResult(job.id);
if (existing) {
return existing; // Retourner le résultat existant
}

// Traiter le job
const result = await this.doWork();

// Stocker le résultat
await this.db.saveJobResult(job.id, result);

return result;
}

4. Limitation de la taille des données

Éviter de passer de grandes quantités de données dans le job :

// Bon : Passer seulement les IDs
await queueService.addExportJob({
userId: "user-123",
entity: "sessions",
format: "pdf",
});

// Mauvais : Passer toutes les données
await queueService.addExportJob({
userId: "user-123",
data: [...10000 sessions...], // Trop de données
});

5. Nettoyage des jobs

Configurer correctement le nettoyage pour éviter l'accumulation :

removeOnComplete: {
age: 3600, // Garder 1h
count: 100, // Ou les 100 derniers
},
removeOnFail: {
age: 86400, // Garder 24h pour debugging
count: 1000, // Ou les 1000 derniers
},

🔍 Debugging

Vérifier le statut d'un job

const status = await queueService.getJobStatus(QueueNames.EXPORTS, jobId);
console.log(status);
// {
// id: "export-user-123-sessions-1234567890",
// name: "export",
// state: "completed",
// progress: 100,
// data: { entity: "sessions", format: "pdf", userId: "user-123" },
// returnvalue: { success: true, url: "..." },
// }

Lister les jobs d'une queue

// Jobs en attente
const waiting = await this.exportsQueue.getWaiting();

// Jobs actifs
const active = await this.exportsQueue.getActive();

// Jobs complétés
const completed = await this.exportsQueue.getCompleted(0, 10);

// Jobs échoués
const failed = await this.exportsQueue.getFailed(0, 10);

Retry manuel d'un job échoué

const job = await this.exportsQueue.getJob(jobId);
if (job && (await job.getState()) === "failed") {
await job.retry(); // Réessayer le job
}

Purger une queue

// Supprimer tous les jobs complétés
await this.exportsQueue.clean(0, 100, "completed");

// Supprimer tous les jobs échoués
await this.exportsQueue.clean(0, 100, "failed");

📊 Exemples d'utilisation avancés

Export avec notification email

// Dans ExportProcessor
async process(job: Job<ExportJobData>) {
// ... génération de l'export ...

// Envoyer un email de notification
await this.emailService.send({
to: user.email,
subject: "Votre export est prêt",
body: `Votre export ${entity} est disponible : ${signedUrl}`,
});

return result;
}

Job avec étapes multiples

async process(job: Job) {
// Étape 1
await job.updateProgress(10);
const data = await this.fetchData();

// Étape 2
await job.updateProgress(30);
const processed = await this.processData(data);

// Étape 3
await job.updateProgress(60);
const formatted = await this.formatData(processed);

// Étape 4
await job.updateProgress(90);
const uploaded = await this.uploadData(formatted);

// Complété
await job.updateProgress(100);
return uploaded;
}

Job conditionnel

async process(job: Job) {
const { userId, entity } = job.data;

// Vérifier si l'utilisateur a le droit d'exporter
const canExport = await this.permissionsService.canExport(userId, entity);
if (!canExport) {
throw new Error("User does not have permission to export");
}

// Continuer le traitement
// ...
}

🚀 Prochaines améliorations

  • Dashboard de monitoring des queues (Bull Board)
  • Priorisation des jobs (high/medium/low)
  • Rate limiting par type de job
  • Jobs récurrents (scheduled jobs)
  • Dead letter queue pour les jobs qui échouent définitivement
  • Métriques et alertes (Prometheus/Grafana)
  • Job batching pour optimiser les traitements groupés
  • Job dependencies pour chaîner les jobs
  • Job events & hooks pour logging avancé
  • Retry avec stratégie personnalisée selon le type d'erreur

🎓 Exemples pratiques

Exemple 1 : Export avec suivi de progression

Frontend :

// pages/export/ExportPage.tsx
const { socket } = useAuthenticatedSocketIO();
const [exportStatus, setExportStatus] = useState<JobProgress | null>(null);

useEffect(() => {
socket.on("queue:job:progress", (progress: JobProgress) => {
if (progress.jobId === currentJobId) {
setExportStatus(progress);

if (progress.status === "completed") {
// Télécharger le fichier
window.location.href = progress.result?.data?.url;
}
}
});

return () => {
socket.off("queue:job:progress");
};
}, [currentJobId]);

const handleExport = async () => {
const response = await api.get("/export", {
params: { entity: "sessions", format: "pdf" },
});

setCurrentJobId(response.data.jobId);
setExportStatus({ status: "waiting", jobId: response.data.jobId });
};

Backend :

// export.controller.ts
@Get()
async export(@Req() req: RequestWithUser, @Query("entity") entity: string) {
const jobId = await this.queueService.addExportJob({
entity,
format: "pdf",
userId: req.user.id,
});

return { jobId, status: "queued" };
}

Exemple 2 : Scheduler avec queue

Avant (bloquant) :

@Cron(CronExpression.EVERY_5_MINUTES)
async handleScheduledEmails() {
const emails = await this.service.getEmailsReadyToSend();
for (const email of emails) {
await this.service.sendEmail(email.id); // Bloque pendant l'envoi
}
}

Après (non-bloquant) :

@Cron(CronExpression.EVERY_5_MINUTES)
async handleScheduledEmails() {
const emails = await this.service.getEmailsReadyToSend();
for (const email of emails) {
// Ajoute à la queue, retourne immédiatement
await this.queueService.addScheduledEmailJob({
emailId: email.id,
userId: email.owner_id,
});
}
}

Exemple 3 : Job avec retry conditionnel

async process(job: Job) {
try {
await this.externalApiCall();
} catch (error) {
// Si c'est une erreur temporaire (rate limit), retry
if (error.status === 429) {
throw error; // BullMQ va retry automatiquement
}

// Si c'est une erreur permanente, ne pas retry
if (error.status === 400) {
await job.moveToFailed(error, "0"); // Ne pas retry
return;
}

throw error;
}
}

🔧 Configuration avancée

Connexion Redis personnalisée

// Dans queue.module.ts
BullModule.forRoot({
connection: {
host: process.env.REDIS_HOST || "redis",
port: parseInt(process.env.REDIS_PORT || "6379", 10),
password: process.env.REDIS_PASSWORD,
maxRetriesPerRequest: 3,
retryStrategy: (times: number) => {
const delay = Math.min(times * 50, 2000);
return delay;
},
// Options supplémentaires
enableReadyCheck: true,
enableOfflineQueue: true,
},
});

Configuration par environnement

// queue.config.ts
const isProduction = process.env.NODE_ENV === "production";

export const queueJobOptions: Record<string, JobsOptions> = {
exports: {
...defaultJobOptions,
attempts: isProduction ? 3 : 1, // Moins de retries en dev
removeOnComplete: {
age: isProduction ? 3600 : 300, // Garder moins longtemps en dev
count: isProduction ? 100 : 10,
},
},
};

📚 Références