Workers Architecture - Guide de démarrage rapide
Ce guide vous permet de démarrer rapidement avec l'architecture Workers BullMQ.
🚀 Installation
1. Exécuter les migrations
npm run migrate
Cela créera les tables suivantes :
job_runs: Tracking des jobsoutbox_events: Pattern outboxfile_objects: Métadonnées des fichiers R2export_jobs: Jobs d'exportai_tasks: Tâches IAdocuments,document_pages,document_artifacts: Documents et OCR
2. Générer les types DB
# Générer les types TypeScript depuis la DB
npm run generate:types
3. Démarrer les workers
Les workers démarrent automatiquement si QUEUE_WORKERS_ENABLED=true dans votre .env.
# Vérifier que les workers sont activés
grep QUEUE_WORKERS_ENABLED .env
📝 Utilisation
Exemple 1 : Export asynchrone
Backend :
// GET /api/export/async?entity=sessions&format=csv
const response = await api.get("/export/async", {
entity: "sessions",
format: "csv",
});
// Retourne : { exportJobId, jobRunId, status: "QUEUED", pollUrl }
Frontend :
import { useJobRunStatus } from "@/client/job-runs/useJobRuns";
import { JobRunProgress } from "@/components/job-runs/JobRunProgress";
function ExportButton() {
const [jobRunId, setJobRunId] = useState<string | null>(null);
const { isCompleted, jobRun } = useJobRunStatus(jobRunId);
const handleExport = async () => {
const response = await api.get("/export/async", {
entity: "sessions",
format: "csv",
});
setJobRunId(response.jobRunId);
};
return (
<div>
{!jobRunId && <button onClick={handleExport}>Export</button>}
{jobRunId && !isCompleted && <JobRunProgress jobRunId={jobRunId} />}
{isCompleted && (
<a href={`/api/file-objects/${jobRun?.result?.fileObjectId}/download`}>
Download
</a>
)}
</div>
);
}
Exemple 2 : Tâche IA
Backend :
import { WorkersQueueService } from "@/workers/workers-queue.service";
import { AiTaskService } from "@/workers/worker-ai/ai-task.service";
// Créer une tâche IA
const aiTask = await aiTaskService.create({
orgId: userId,
userId,
type: "SUMMARIZE",
input: { content: "Long text to summarize", maxLength: 500 },
model: "gpt-4o-mini",
});
// Enqueue le job
const jobId = await workersQueueService.enqueueAiRun({
v: 1,
orgId: userId,
userId,
entityId: aiTask.id,
taskType: "SUMMARIZE",
input: aiTask.input,
model: "gpt-4o-mini",
});
Frontend :
const { status, jobRun, isCompleted } = useJobRunStatus(jobRunId);
if (isCompleted && jobRun?.result) {
console.log("Summary:", jobRun.result.data.output.content);
console.log("Cost:", jobRun.result.data.costCents, "cents");
}
🔍 Vérification
Vérifier que les workers fonctionnent
- Vérifier les logs :
# Les workers doivent loguer au démarrage
grep "Worker.*Module" logs/app.log
- Vérifier les queues :
# Vérifier que les queues sont créées dans Redis
redis-cli KEYS "bull:*"
- Tester un export :
curl -X GET "http://localhost:3000/api/export/async?entity=sessions&format=csv" \
-H "Authorization: Bearer YOUR_TOKEN"
Vérifier les jobs dans la DB
-- Voir les job runs actifs
SELECT * FROM job_runs
WHERE status IN ('QUEUED', 'RUNNING')
ORDER BY queued_at DESC;
-- Voir les exports en cours
SELECT * FROM export_jobs
WHERE status IN ('QUEUED', 'RUNNING')
ORDER BY created_at DESC;
-- Voir les tâches IA
SELECT * FROM ai_tasks
WHERE status IN ('QUEUED', 'RUNNING')
ORDER BY created_at DESC;
🐛 Dépannage
Les workers ne démarrent pas
- Vérifier
QUEUE_WORKERS_ENABLED=truedans.env - Vérifier que Redis est accessible
- Vérifier les logs pour les erreurs de démarrage
Les jobs restent en QUEUED
- Vérifier que les workers sont démarrés
- Vérifier la connexion Redis
- Vérifier les logs des workers pour les erreurs
Erreurs de migration
- Vérifier que PostgreSQL est accessible
- Vérifier que les migrations précédentes sont appliquées
- Vérifier les permissions DB
📚 Documentation complète
- Architecture :
docs/WORKERS_ARCHITECTURE.md - Frontend Usage :
docs/WORKERS_FRONTEND_USAGE.md - Intégration :
docs/WORKERS_INTEGRATION_EXAMPLE.md - Status :
docs/WORKERS_IMPLEMENTATION_COMPLETE.md
✅ Checklist de démarrage
- Migrations exécutées (
npm run migrate) - Types DB générés (
npm run generate:types) -
QUEUE_WORKERS_ENABLED=truedans.env - Redis accessible
- Workers démarrés (vérifier les logs)
- Test d'un export async réussi
- Frontend : hooks testés avec un job