Système de Queue pour Jobs Lourds
Dernière mise à jour : 2025-01-XX
Documentation complète du système de queue basé sur BullMQ pour le traitement asynchrone des jobs lourds.
🎯 Vue d'ensemble
Le système de queue permet de découpler les opérations longues du thread principal de l'application, offrant :
- ✅ Traitement asynchrone : Les jobs sont exécutés en arrière-plan sans bloquer les requêtes HTTP
- ✅ Retry automatique : Retry avec backoff exponentiel en cas d'échec
- ✅ Scalabilité horizontale : Support de workers multiples pour traiter les jobs en parallèle
- ✅ Monitoring : Suivi des jobs (en cours, en attente, échoués)
- ✅ Priorisation : Possibilité de prioriser certains types de jobs
- ✅ Rate limiting : Limitation du nombre de jobs par type
- ✅ Mises à jour en temps réel : Notifications WebSocket pour suivre la progression des jobs
🏗️ Architecture
┌─────────────────┐
│ Controller │
│ / Scheduler │
└────────┬────────┘
│
▼
┌─────────────────┐
│ QueueService │
│ (add jobs) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Queue (Redis) │
│ (BullMQ) │
└────────┬────────┘
│
▼
┌──────────────── ─┐
│ Processor │
│ (WorkerHost) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Service │
│ (Business │
│ Logic) │
└─────────────────┘
│
▼
┌─────────────────┐
│ WebSocket │
│ (Status │
│ Updates) │
└─────────────────┘
Flux de traitement
- Enqueue : Un controller ou scheduler ajoute un job à la queue via
QueueService - Queue : Le job est stocké dans Redis (BullMQ)
- Process : Un processor (WorkerHost) récupère et traite le job
- Update : Les mises à jour de progression sont envoyées via WebSocket
- Complete/Fail : Le job est marqué comme complété ou échoué
⚙️ Configuration
Variables d'environnement
Le système utilise les mêmes variables Redis que le cache :
# Redis configuration (utilisé par BullMQ)
REDIS_ENABLED=true # Activer Redis (défaut: true)
REDIS_HOST=redis # Host Redis (défaut: redis)
REDIS_PORT=6379 # Port Redis (défaut: 6379)
REDIS_PASSWORD= # Mot de passe Redis (optionnel)
Configuration des queues
Les configurations par queue sont définies dans backend/src/queue/queue.config.ts :
export const queueJobOptions: Record<string, JobsOptions> = {
exports: {
attempts: 3, // Nombre de tentatives en cas d'échec
backoff: {
delay: 2000, // Délai initial (ms)
type: "exponential", // Type de backoff
},
removeOnComplete: {
age: 3600, // Garder les jobs complétés pendant 1h
count: 100, // Garder les 100 derniers jobs complétés
},
removeOnFail: {
age: 86400, // Garder les jobs échoués pendant 24h
count: 1000, // Garder les 1000 derniers jobs échoués
},
},
// ... autres queues
};
📦 Queues disponibles
1. Exports (exports)
Description : Traitement asynchrone des exports de données (CSV, Excel, PDF, Word, JSON)
Configuration :
- Tentatives : 3
- Timeout : 5 minutes (implicite via processor)
Job Data :
{
entity: "sessions" | "contacts" | "quotes" | "invoices" | "all",
format: "csv" | "excel" | "pdf" | "docx" | "json",
userId: string
}
Processor : ExportProcessor
Utilisation :
const jobId = await queueService.addExportJob({
entity: "sessions",
format: "pdf",
userId: "user-123",
});
2. Google Calendar Sync (google-calendar-sync)
Description : Synchronisation bidirectionnelle Google Calendar ↔ Sessions
Configuration :
- Tentatives : 5
- Timeout : 10 minutes (implicite)
Job Data :
{
userId: string;
}
Processor : GoogleCalendarSyncProcessor
Utilisation :
const jobId = await queueService.addGoogleCalendarSyncJob({
userId: "user-123",
});
3. Scheduled Emails (scheduled-emails)
Description : Envoi d'emails planifiés
Configuration :
- Tentatives : 3
- Timeout : 1 minute (implicite)
Job Data :
{
emailId: string,
userId: string
}
Processor : ScheduledEmailsProcessor
Utilisation :
const jobId = await queueService.addScheduledEmailJob({
emailId: "email-123",
userId: "user-123",
});
4. Workflow Tasks (workflow-tasks)
Description : Exécution de tâches de workflow
Configuration :
- Tentatives : 3
- Timeout : 2 minutes (implicite)
Job Data :
{
taskId: string,
userId: string
}
Processor : WorkflowTasksProcessor
Utilisation :
const jobId = await queueService.addWorkflowTaskJob({
taskId: "task-123",
userId: "user-123",
});
5. Recurring Sessions (recurring-sessions)
Description : Génération d'occurrences futures pour les sessions récurrentes
Configuration :
- Tentatives : 2
- Timeout : 10 minutes (implicite)
Job Data :
{
userId?: string // Optionnel : si non fourni, traite tous les utilisateurs
}
Processor : RecurringSessionsProcessor
Utilisation :
const jobId = await queueService.addRecurringSessionsJob();
// ou pour un utilisateur spécifique :
const jobId = await queueService.addRecurringSessionsJob({
userId: "user-123",
});
6. Audit Cleanup (audit-cleanup)
Description : Nettoyage des anciens logs d'audit
Configuration :
- Tentatives : 2
- Timeout : 5 minutes (implicite)
Job Data :
{
daysToKeep?: number // Optionnel : défaut 30 jours
}
Processor : AuditCleanupProcessor
Utilisation :
const jobId = await queueService.addAuditCleanupJob({ daysToKeep: 30 });
🔧 Processors
Les processors sont des classes qui étendent WorkerHost et implémentent la méthode process() pour traiter les jobs.
Structure d'un Processor
import { Inject } from "@nestjs/common";
import { Processor, WorkerHost } from "@nestjs/bullmq";
import { Job } from "bullmq";
import type { LoggerService } from "@nestjs/common";
import { QueueNames } from "../types/queue-names.types.js";
import { QueueService } from "../queue.service.js";
@Processor(QueueNames.EXPORTS)
export class ExportProcessor extends WorkerHost {
constructor(
private readonly exportService: ExportService,
private readonly queueService: QueueService,
@Inject("LoggerService") logger: LoggerService
) {
super();
}
async process(job: Job<ExportJobData>): Promise<ExportJobResult> {
// 1. Émettre progression initiale
this.queueService.emitJobProgress({
jobId: job.id!,
status: "active",
progress: 0,
message: "Starting...",
});
try {
// 2. Traiter le job
await job.updateProgress(50);
// 3. Émettre progression
this.queueService.emitJobProgress({
jobId: job.id!,
status: "active",
progress: 50,
message: "Processing...",
});
// 4. Compléter le job
await job.updateProgress(100);
this.queueService.emitJobProgress({
jobId: job.id!,
status: "completed",
progress: 100,
message: "Completed",
result: { success: true },
});
return { success: true };
} catch (error) {
// 5. Gérer les erreurs
this.queueService.emitJobProgress({
jobId: job.id!,
status: "failed",
message: error.message,
result: { success: false },
});
throw error;
}
}
}
💻 Utilisation
Dans un Controller
import { QueueService } from "../queue/queue.service.js";
@Controller("export")
export class ExportController {
constructor(private readonly queueService: QueueService) {}
@Get()
async export(@Req() req: RequestWithUser, @Query("entity") entity: string) {
// Ajouter le job à la queue
const jobId = await this.queueService.addExportJob({
entity,
format: "pdf",
userId: req.user.id,
});
// Retourner immédiatement avec le jobId
return {
jobId,
status: "queued",
message: "Export job has been queued",
statusUrl: `/export/status/${jobId}`,
};
}
@Get("status/:jobId")
async getStatus(@Param("jobId") jobId: string) {
const status = await this.queueService.getJobStatus(
QueueNames.EXPORTS,
jobId
);
return status;
}
}
Dans un Scheduler
import { QueueService } from "../queue/queue.service.js";
@Injectable()
export class MyScheduler {
constructor(private readonly queueService: QueueService) {}
@Cron(CronExpression.EVERY_5_MINUTES)
async handleScheduledTask() {
// Au lieu d'exécuter directement :
// await this.service.doWork();
// Ajouter des jobs à la queue :
const jobId = await this.queueService.addSomeJob({
// ... job data
});
}
}
📡 WebSocket & Mises à jour en temps réel
Les mises à jour de progression sont envoyées automatiquement via WebSocket.
Événement WebSocket
Event : queue:job:progress
Payload :
{
jobId: string,
status: "waiting" | "active" | "completed" | "failed" | "delayed",
progress?: number, // 0-100
message?: string,
result?: {
success: boolean,
message?: string,
data?: unknown
}
}
Écoute côté Frontend
import { useAuthenticatedSocketIO } from "@/client/websocket";
const { socket } = useAuthenticatedSocketIO();
socket.on("queue:job:progress", (data) => {
console.log(`Job ${data.jobId}: ${data.status} - ${data.progress}%`);
if (data.status === "completed") {
// Traiter le résultat
console.log("Result:", data.result);
}
});
📊 Monitoring & Debugging
Vérifier le statut d'un job
const status = await queueService.getJobStatus(QueueNames.EXPORTS, jobId);
// Retourne :
{
id: string,
name: string,
state: "waiting" | "active" | "completed" | "failed" | "delayed",
progress?: number,
data: JobData,
returnvalue?: JobResult,
failedReason?: string
}
Logs
Tous les processors loggent automatiquement :
- Début de traitement
- Progression
- Complétion
- Erreurs
Les logs incluent :
- Job ID
- User ID (si applicable)
- Progression
- Messages d'erreur détaillés
🔄 Migration depuis les schedulers
Avant (exécution directe)
@Cron(CronExpression.EVERY_5_MINUTES)
async handleScheduledEmailsExecution() {
const readyEmails = await this.service.getEmailsReadyToSend();
for (const email of readyEmails) {
await this.service.sendScheduledEmail(email.id); // Bloque le thread
}
}
Après (avec queue)
@Cron(CronExpression.EVERY_5_MINUTES)
async handleScheduledEmailsExecution() {
const readyEmails = await this.service.getEmailsReadyToSend();
for (const email of readyEmails) {
// Ajoute à la queue, ne bloque pas
await this.queueService.addScheduledEmailJob({
emailId: email.id,
userId: email.owner_id,
});
}
}
📁 Structure des fichiers
backend/src/queue/
├── README.md # Documentation technique du module
├── queue.module.ts # Module principal avec configuration BullMQ
├── queue.service.ts # Service pour gérer les queues et jobs
├── queue.config.ts # Configuration des queues (retry, cleanup, etc.)
├── types/
│ ├── job-data.types.ts # Types de données des jobs
│ └── queue-names.types.ts # Noms des queues (enum)
└── processors/
├── export.processor.ts # Processor pour les exports
├── google-calendar-sync.processor.ts
├── scheduled-emails.processor.ts
├── workflow-tasks.processor.ts
├── recurring-sessions.processor.ts
└── audit-cleanup.processor.ts
Documentation :
- QUEUE_SYSTEM.md - Documentation complète avec exemples
- backend/src/queue/README.md (documentation technique) - Documentation technique du module
✅ Avantages
Performance
- Non-bloquant : Les requêtes HTTP retournent immédiatement
- Parallélisation : Plusieurs workers peuvent traiter les jobs simultanément
- Scalabilité : Ajout facile de workers supplémentaires
Fiabilité
- Retry automatique : Les jobs échoués sont automatiquement réessayés
- Persistance : Les jobs sont stockés dans Redis, survivent aux redémarrages
- Monitoring : Suivi complet des jobs (en cours, en attente, échoués)
Expérience utilisateur
- Feedback en temps réel : Mises à jour de progression via WebSocket
- Pas de timeout HTTP : Les jobs longs ne causent plus de timeout
- Meilleure réactivité : L'interface reste responsive pendant le traitement
🚀 Améliorations possibles
Monitoring & Observabilité
Outils de monitoring disponibles
Plusieurs outils permettent de suivre et gérer les queues BullMQ :