Aller au contenu principal

Workers Architecture - Guide de démarrage rapide

Ce guide vous permet de démarrer rapidement avec l'architecture Workers BullMQ.

🚀 Installation

1. Exécuter les migrations

npm run migrate

Cela créera les tables suivantes :

  • job_runs : Tracking des jobs
  • outbox_events : Pattern outbox
  • file_objects : Métadonnées des fichiers R2
  • export_jobs : Jobs d'export
  • ai_tasks : Tâches IA
  • documents, document_pages, document_artifacts : Documents et OCR

2. Générer les types DB

# Générer les types TypeScript depuis la DB
npm run generate:types

3. Démarrer les workers

Les workers démarrent automatiquement si QUEUE_WORKERS_ENABLED=true dans votre .env.

# Vérifier que les workers sont activés
grep QUEUE_WORKERS_ENABLED .env

📝 Utilisation

Exemple 1 : Export asynchrone

Backend :

// GET /api/export/async?entity=sessions&format=csv
const response = await api.get("/export/async", {
entity: "sessions",
format: "csv",
});
// Retourne : { exportJobId, jobRunId, status: "QUEUED", pollUrl }

Frontend :

import { useJobRunStatus } from "@/client/job-runs/useJobRuns";
import { JobRunProgress } from "@/components/job-runs/JobRunProgress";

function ExportButton() {
const [jobRunId, setJobRunId] = useState<string | null>(null);
const { isCompleted, jobRun } = useJobRunStatus(jobRunId);

const handleExport = async () => {
const response = await api.get("/export/async", {
entity: "sessions",
format: "csv",
});
setJobRunId(response.jobRunId);
};

return (
<div>
{!jobRunId && <button onClick={handleExport}>Export</button>}
{jobRunId && !isCompleted && <JobRunProgress jobRunId={jobRunId} />}
{isCompleted && (
<a href={`/api/file-objects/${jobRun?.result?.fileObjectId}/download`}>
Download
</a>
)}
</div>
);
}

Exemple 2 : Tâche IA

Backend :

import { WorkersQueueService } from "@/workers/workers-queue.service";
import { AiTaskService } from "@/workers/worker-ai/ai-task.service";

// Créer une tâche IA
const aiTask = await aiTaskService.create({
orgId: userId,
userId,
type: "SUMMARIZE",
input: { content: "Long text to summarize", maxLength: 500 },
model: "gpt-4o-mini",
});

// Enqueue le job
const jobId = await workersQueueService.enqueueAiRun({
v: 1,
orgId: userId,
userId,
entityId: aiTask.id,
taskType: "SUMMARIZE",
input: aiTask.input,
model: "gpt-4o-mini",
});

Frontend :

const { status, jobRun, isCompleted } = useJobRunStatus(jobRunId);

if (isCompleted && jobRun?.result) {
console.log("Summary:", jobRun.result.data.output.content);
console.log("Cost:", jobRun.result.data.costCents, "cents");
}

🔍 Vérification

Vérifier que les workers fonctionnent

  1. Vérifier les logs :
# Les workers doivent loguer au démarrage
grep "Worker.*Module" logs/app.log
  1. Vérifier les queues :
# Vérifier que les queues sont créées dans Redis
redis-cli KEYS "bull:*"
  1. Tester un export :
curl -X GET "http://localhost:3000/api/export/async?entity=sessions&format=csv" \
-H "Authorization: Bearer YOUR_TOKEN"

Vérifier les jobs dans la DB

-- Voir les job runs actifs
SELECT * FROM job_runs
WHERE status IN ('QUEUED', 'RUNNING')
ORDER BY queued_at DESC;

-- Voir les exports en cours
SELECT * FROM export_jobs
WHERE status IN ('QUEUED', 'RUNNING')
ORDER BY created_at DESC;

-- Voir les tâches IA
SELECT * FROM ai_tasks
WHERE status IN ('QUEUED', 'RUNNING')
ORDER BY created_at DESC;

🐛 Dépannage

Les workers ne démarrent pas

  1. Vérifier QUEUE_WORKERS_ENABLED=true dans .env
  2. Vérifier que Redis est accessible
  3. Vérifier les logs pour les erreurs de démarrage

Les jobs restent en QUEUED

  1. Vérifier que les workers sont démarrés
  2. Vérifier la connexion Redis
  3. Vérifier les logs des workers pour les erreurs

Erreurs de migration

  1. Vérifier que PostgreSQL est accessible
  2. Vérifier que les migrations précédentes sont appliquées
  3. Vérifier les permissions DB

📚 Documentation complète

  • Architecture : docs/WORKERS_ARCHITECTURE.md
  • Frontend Usage : docs/WORKERS_FRONTEND_USAGE.md
  • Intégration : docs/WORKERS_INTEGRATION_EXAMPLE.md
  • Status : docs/WORKERS_IMPLEMENTATION_COMPLETE.md

✅ Checklist de démarrage

  • Migrations exécutées (npm run migrate)
  • Types DB générés (npm run generate:types)
  • QUEUE_WORKERS_ENABLED=true dans .env
  • Redis accessible
  • Workers démarrés (vérifier les logs)
  • Test d'un export async réussi
  • Frontend : hooks testés avec un job