Aller au contenu principal

Workers Architecture - Guide de test

Ce guide vous permet de tester l'architecture Workers BullMQ de bout en bout.

🧪 Prérequis

  1. ✅ Migrations exécutées : npm run migrate
  2. ✅ Types DB générés : npm run generate:types
  3. ✅ Redis accessible
  4. ✅ Workers activés : QUEUE_WORKERS_ENABLED=true
  5. ✅ Backend démarré
  6. ✅ Frontend démarré

📋 Checklist de test

1. Test Export Async

Backend

# 1. Créer un export async
curl -X GET "http://localhost:3000/api/export/async?entity=sessions&format=csv" \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json"

# Réponse attendue :
# {
# "exportJobId": "uuid",
# "jobRunId": "uuid",
# "status": "QUEUED",
# "pollUrl": "/api/job-runs/uuid"
# }

Vérifications DB

-- Vérifier que export_job est créé
SELECT * FROM export_jobs WHERE id = 'YOUR_EXPORT_JOB_ID';
-- Status doit être 'QUEUED'

-- Vérifier que job_run est créé
SELECT * FROM job_runs WHERE job_id = 'YOUR_JOB_ID';
-- Status doit être 'QUEUED' ou 'RUNNING'

-- Attendre quelques secondes, puis vérifier
SELECT * FROM job_runs WHERE job_id = 'YOUR_JOB_ID';
-- Status doit être 'COMPLETED' ou 'FAILED'

Frontend

// Tester le hook useJobRunStatus
import { useJobRunStatus } from "@/client/job-runs/useJobRuns";

function TestExport() {
const [jobRunId, setJobRunId] = useState<string | null>(null);
const { status, isRunning, isCompleted, jobRun } = useJobRunStatus(jobRunId);

// Vérifier que :
// 1. isRunning = true au début
// 2. Polling automatique toutes les 2 secondes
// 3. isCompleted = true quand terminé
// 4. jobRun.result contient fileObjectId
}

2. Test AI Task

Backend

// Créer une tâche IA
const aiTask = await aiTaskService.create({
orgId: userId,
userId,
type: "SUMMARIZE",
input: {
content: "Long text to summarize...",
maxLength: 500,
},
model: "gpt-4o-mini",
});

// Enqueue le job
const jobId = await workersQueueService.enqueueAiRun({
v: 1,
orgId: userId,
userId,
entityId: aiTask.id,
taskType: "SUMMARIZE",
input: aiTask.input,
model: "gpt-4o-mini",
});

Vérifications DB

-- Vérifier ai_task
SELECT * FROM ai_tasks WHERE id = 'YOUR_AI_TASK_ID';
-- Status doit passer de 'QUEUED' → 'RUNNING' → 'DONE'

-- Vérifier les coûts
SELECT tokens_in, tokens_out, cost_cents FROM ai_tasks WHERE id = 'YOUR_AI_TASK_ID';
-- cost_cents doit être calculé (ex: 150 = $1.50)

3. Test Document Processing

Backend

// Créer un document
const document = await documentService.create({
orgId: userId,
ownerUserId: userId,
source: "UPLOAD",
fileObjectId: fileObject.id,
mimeType: "application/pdf",
});

// Enqueue doc:ingest
const jobId = await workersQueueService.enqueueDocIngest({
v: 1,
orgId: userId,
userId,
entityId: document.id,
});

Vérifications DB

-- Vérifier document_pages créées
SELECT * FROM document_pages WHERE document_id = 'YOUR_DOCUMENT_ID';
-- Une page par page du PDF

-- Vérifier document_artifacts créés
SELECT * FROM document_artifacts WHERE document_id = 'YOUR_DOCUMENT_ID';
-- Type 'TEXT' avec le texte OCR

🔍 Vérifications générales

1. Vérifier les workers démarrent

# Vérifier les logs au démarrage
grep "Worker.*Module" logs/app.log
# Doit afficher :
# - WorkerExportModule
# - WorkerEmailModule
# - WorkerAiModule
# - WorkerDocModule

2. Vérifier les queues Redis

# Connecter à Redis
redis-cli

# Lister les queues
KEYS "bull:*"

# Vérifier une queue spécifique
LLEN "bull:exports-v2:wait"
LLEN "bull:ai-insights:wait"

3. Vérifier les job runs

-- Voir tous les job runs actifs
SELECT
id,
job_name,
status,
entity_type,
entity_id,
queued_at,
started_at,
finished_at,
attempt
FROM job_runs
WHERE status IN ('QUEUED', 'RUNNING')
ORDER BY queued_at DESC
LIMIT 10;

-- Voir les jobs en échec
SELECT
id,
job_name,
status,
error,
attempt
FROM job_runs
WHERE status = 'FAILED'
ORDER BY queued_at DESC
LIMIT 10;

4. Vérifier les file objects

-- Voir les fichiers créés par les workers
SELECT
fo.id,
fo.key,
fo.content_type,
fo.size_bytes,
fo.created_at
FROM file_objects fo
WHERE fo.created_at > NOW() - INTERVAL '1 hour'
ORDER BY fo.created_at DESC;

🐛 Tests d'erreur

1. Test idempotence

// Enqueue le même job deux fois avec le même jobId
const jobId1 = await workersQueueService.enqueueExportBuild(payload);
const jobId2 = await workersQueueService.enqueueExportBuild(payload);

// jobId1 et jobId2 doivent être identiques
console.assert(jobId1 === jobId2, "Job IDs must be identical for idempotence");

2. Test retry

// Créer un job qui va échouer (ex: export avec entity invalide)
// Vérifier que BullMQ retry selon la configuration (5 tentatives pour exports)

3. Test DLQ (Dead Letter Queue)

// Créer un job qui échoue définitivement
// Vérifier qu'il est déplacé vers la DLQ après toutes les tentatives

📊 Métriques à surveiller

1. Temps de traitement

-- Temps moyen de traitement par type de job
SELECT
job_name,
AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) as avg_duration_seconds,
COUNT(*) as total_jobs
FROM job_runs
WHERE status = 'COMPLETED'
AND finished_at IS NOT NULL
AND started_at IS NOT NULL
GROUP BY job_name;

2. Taux de succès

-- Taux de succès par type de job
SELECT
job_name,
COUNT(*) FILTER (WHERE status = 'COMPLETED') as completed,
COUNT(*) FILTER (WHERE status = 'FAILED') as failed,
ROUND(
100.0 * COUNT(*) FILTER (WHERE status = 'COMPLETED') / COUNT(*),
2
) as success_rate_percent
FROM job_runs
GROUP BY job_name;

3. Coûts IA

-- Coûts totaux par utilisateur
SELECT
user_id,
SUM(cost_cents) as total_cost_cents,
COUNT(*) as total_tasks,
AVG(cost_cents) as avg_cost_cents
FROM ai_tasks
WHERE status = 'DONE'
AND cost_cents IS NOT NULL
GROUP BY user_id
ORDER BY total_cost_cents DESC;

✅ Checklist de validation

  • Export async fonctionne de bout en bout
  • Job run créé dans la DB
  • Worker traite le job
  • File object créé dans R2
  • Frontend peut poller le status
  • Frontend peut télécharger le fichier
  • Idempotence fonctionne (même jobId pour même payload)
  • Retries fonctionnent (job échoue puis retry)
  • Logs structurés avec corrélation
  • Métriques disponibles dans la DB

🎯 Scénarios de test complets

Scénario 1 : Export CSV de sessions

  1. Frontend : Cliquer sur "Export Sessions (CSV)"
  2. Backend : Appeler /export/async?entity=sessions&format=csv
  3. Worker : Générer le CSV, uploader vers R2
  4. Frontend : Afficher progress bar, puis bouton download
  5. Vérification : Fichier téléchargeable et contenu correct

Scénario 2 : Tâche IA de résumé

  1. Frontend : Demander un résumé de texte
  2. Backend : Créer ai_task, enqueue ai:run
  3. Worker : Appeler OpenAI, calculer coûts
  4. Frontend : Afficher le résumé et le coût
  5. Vérification : Résumé correct, coûts enregistrés

Scénario 3 : Séquences d'emails

  1. Scheduler : Cron toutes les 5 minutes
  2. Worker : Trouver enrollments dus, envoyer emails
  3. Vérification : Emails envoyés, deliveries créées
  4. Stop conditions : Vérifier que les séquences s'arrêtent correctement

🔧 Commandes utiles

# Voir les logs des workers
tail -f logs/app.log | grep "WORKER"

# Vérifier Redis
redis-cli MONITOR

# Vérifier les migrations
npm run migrate:status

# Générer les types DB
npm run generate:types

# Lancer les tests (si créés)
npm run test:workers