Workers Architecture - Guide de test
Ce guide vous permet de tester l'architecture Workers BullMQ de bout en bout.
🧪 Prérequis
- ✅ Migrations exécutées :
npm run migrate - ✅ Types DB générés :
npm run generate:types - ✅ Redis accessible
- ✅ Workers activés :
QUEUE_WORKERS_ENABLED=true - ✅ Backend démarré
- ✅ Frontend démarré
📋 Checklist de test
1. Test Export Async
Backend
# 1. Créer un export async
curl -X GET "http://localhost:3000/api/export/async?entity=sessions&format=csv" \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json"
# Réponse attendue :
# {
# "exportJobId": "uuid",
# "jobRunId": "uuid",
# "status": "QUEUED",
# "pollUrl": "/api/job-runs/uuid"
# }
Vérifications DB
-- Vérifier que export_job est créé
SELECT * FROM export_jobs WHERE id = 'YOUR_EXPORT_JOB_ID';
-- Status doit être 'QUEUED'
-- Vérifier que job_run est créé
SELECT * FROM job_runs WHERE job_id = 'YOUR_JOB_ID';
-- Status doit être 'QUEUED' ou 'RUNNING'
-- Attendre quelques secondes, puis vérifier
SELECT * FROM job_runs WHERE job_id = 'YOUR_JOB_ID';
-- Status doit être 'COMPLETED' ou 'FAILED'
Frontend
// Tester le hook useJobRunStatus
import { useJobRunStatus } from "@/client/job-runs/useJobRuns";
function TestExport() {
const [jobRunId, setJobRunId] = useState<string | null>(null);
const { status, isRunning, isCompleted, jobRun } = useJobRunStatus(jobRunId);
// Vérifier que :
// 1. isRunning = true au début
// 2. Polling automatique toutes les 2 secondes
// 3. isCompleted = true quand terminé
// 4. jobRun.result contient fileObjectId
}
2. Test AI Task
Backend
// Créer une tâche IA
const aiTask = await aiTaskService.create({
orgId: userId,
userId,
type: "SUMMARIZE",
input: {
content: "Long text to summarize...",
maxLength: 500,
},
model: "gpt-4o-mini",
});
// Enqueue le job
const jobId = await workersQueueService.enqueueAiRun({
v: 1,
orgId: userId,
userId,
entityId: aiTask.id,
taskType: "SUMMARIZE",
input: aiTask.input,
model: "gpt-4o-mini",
});
Vérifications DB
-- Vérifier ai_task
SELECT * FROM ai_tasks WHERE id = 'YOUR_AI_TASK_ID';
-- Status doit passer de 'QUEUED' → 'RUNNING' → 'DONE'
-- Vérifier les coûts
SELECT tokens_in, tokens_out, cost_cents FROM ai_tasks WHERE id = 'YOUR_AI_TASK_ID';
-- cost_cents doit être calculé (ex: 150 = $1.50)
3. Test Document Processing
Backend
// Créer un document
const document = await documentService.create({
orgId: userId,
ownerUserId: userId,
source: "UPLOAD",
fileObjectId: fileObject.id,
mimeType: "application/pdf",
});
// Enqueue doc:ingest
const jobId = await workersQueueService.enqueueDocIngest({
v: 1,
orgId: userId,
userId,
entityId: document.id,
});
Vérifications DB
-- Vérifier document_pages créées
SELECT * FROM document_pages WHERE document_id = 'YOUR_DOCUMENT_ID';
-- Une page par page du PDF
-- Vérifier document_artifacts créés
SELECT * FROM document_artifacts WHERE document_id = 'YOUR_DOCUMENT_ID';
-- Type 'TEXT' avec le texte OCR
🔍 Vérifications générales
1. Vérifier les workers démarrent
# Vérifier les logs au démarrage
grep "Worker.*Module" logs/app.log
# Doit afficher :
# - WorkerExportModule
# - WorkerEmailModule
# - WorkerAiModule
# - WorkerDocModule
2. Vérifier les queues Redis
# Connecter à Redis
redis-cli
# Lister les queues
KEYS "bull:*"
# Vérifier une queue spécifique
LLEN "bull:exports-v2:wait"
LLEN "bull:ai-insights:wait"
3. Vérifier les job runs
-- Voir tous les job runs actifs
SELECT
id,
job_name,
status,
entity_type,
entity_id,
queued_at,
started_at,
finished_at,
attempt
FROM job_runs
WHERE status IN ('QUEUED', 'RUNNING')
ORDER BY queued_at DESC
LIMIT 10;
-- Voir les jobs en échec
SELECT
id,
job_name,
status,
error,
attempt
FROM job_runs
WHERE status = 'FAILED'
ORDER BY queued_at DESC
LIMIT 10;
4. Vérifier les file objects
-- Voir les fichiers créés par les workers
SELECT
fo.id,
fo.key,
fo.content_type,
fo.size_bytes,
fo.created_at
FROM file_objects fo
WHERE fo.created_at > NOW() - INTERVAL '1 hour'
ORDER BY fo.created_at DESC;
🐛 Tests d'erreur
1. Test idempotence
// Enqueue le même job deux fois avec le même jobId
const jobId1 = await workersQueueService.enqueueExportBuild(payload);
const jobId2 = await workersQueueService.enqueueExportBuild(payload);
// jobId1 et jobId2 doivent être identiques
console.assert(jobId1 === jobId2, "Job IDs must be identical for idempotence");
2. Test retry
// Créer un job qui va échouer (ex: export avec entity invalide)
// Vérifier que BullMQ retry selon la configuration (5 tentatives pour exports)
3. Test DLQ (Dead Letter Queue)
// Créer un job qui échoue définitivement
// Vérifier qu'il est déplacé vers la DLQ après toutes les tentatives
📊 Métriques à surveiller
1. Temps de traitement
-- Temps moyen de traitement par type de job
SELECT
job_name,
AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_duration_seconds,
COUNT(*) as total_jobs
FROM job_runs
WHERE status = 'COMPLETED'
AND finished_at IS NOT NULL
AND started_at IS NOT NULL
GROUP BY job_name;
2. Taux de succès
-- Taux de succès par type de job
SELECT
job_name,
COUNT(*) FILTER (WHERE status = 'COMPLETED') as completed,
COUNT(*) FILTER (WHERE status = 'FAILED') as failed,
ROUND(
100.0 * COUNT(*) FILTER (WHERE status = 'COMPLETED') / COUNT(*),
2
) as success_rate_percent
FROM job_runs
GROUP BY job_name;
3. Coûts IA
-- Coûts totaux par utilisateur
SELECT
user_id,
SUM(cost_cents) as total_cost_cents,
COUNT(*) as total_tasks,
AVG(cost_cents) as avg_cost_cents
FROM ai_tasks
WHERE status = 'DONE'
AND cost_cents IS NOT NULL
GROUP BY user_id
ORDER BY total_cost_cents DESC;
✅ Checklist de validation
- Export async fonctionne de bout en bout
- Job run créé dans la DB
- Worker traite le job
- File object créé dans R2
- Frontend peut poller le status
- Frontend peut télécharger le fichier
- Idempotence fonctionne (même jobId pour même payload)
- Retries fonctionnent (job échoue puis retry)
- Logs structurés avec corrélation
- Métriques disponibles dans la DB
🎯 Scénarios de test complets
Scénario 1 : Export CSV de sessions
- Frontend : Cliquer sur "Export Sessions (CSV)"
- Backend : Appeler
/export/async?entity=sessions&format=csv - Worker : Générer le CSV, uploader vers R2
- Frontend : Afficher progress bar, puis bouton download
- Vérification : Fichier téléchargeable et contenu correct
Scénario 2 : Tâche IA de résumé
- Frontend : Demander un résumé de texte
- Backend : Créer ai_task, enqueue ai:run
- Worker : Appeler OpenAI, calculer coûts
- Frontend : Afficher le résumé et le coût
- Vérification : Résumé correct, coûts enregistrés
Scénario 3 : Séquences d'emails
- Scheduler : Cron toutes les 5 minutes
- Worker : Trouver enrollments dus, envoyer emails
- Vérification : Emails envoyés, deliveries créées
- Stop conditions : Vérifier que les séquences s'arrêtent correctement
🔧 Commandes utiles
# Voir les logs des workers
tail -f logs/app.log | grep "WORKER"
# Vérifier Redis
redis-cli MONITOR
# Vérifier les migrations
npm run migrate:status
# Générer les types DB
npm run generate:types
# Lancer les tests (si créés)
npm run test:workers