Observabilité - Workers & Jobs - Aaperture
Observabilité complète : Corrélation logs, métriques, monitoring des jobs BullMQ, DLQ et replay.
Objectif
Fournir une observabilité complète pour les workers et jobs BullMQ : corrélation logs, métriques, monitoring, DLQ et replay.
Corrélation
IDs de corrélation
Chaîne de corrélation :
requestId (API) → jobId (BullMQ) → job_runs.id
Tous les logs incluent ces IDs pour traçabilité complète.
Exemple de log structuré
{
level: 'info',
message: 'Job started',
requestId: 'req_abc123', // Si déclenché depuis API
jobId: 'org_123:export:build:entity_456:hash789',
jobRunId: 'run_xyz789',
jobName: 'export:build',
orgId: 'org_123',
userId: 'user_456',
entityId: 'entity_456',
attempt: 1,
timestamp: '2024-01-01T00:00:00Z'
}
Implémentation
Middleware API :
@Injectable()
export class CorrelationMiddleware implements NestMiddleware {
use(req: Request, res: Response, next: NextFunction) {
const requestId = req.headers['x-request-id'] || generateId();
req['requestId'] = requestId;
res.setHeader('X-Request-ID', requestId);
next();
}
}
Logger avec corrélation :
class WorkerLogger {
log(job: Job, message: string, metadata?: Record<string, any>) {
this.logger.log({
...metadata,
jobId: job.id,
jobName: job.name,
jobRunId: job.data.jobRunId,
requestId: job.data.requestId,
message
});
}
}
Métriques
Métriques clés
Queue Depth
Nombre de jobs en attente par queue.
Métrique : bullmq.queue.depth{queue="export"}
Type : Gauge
Labels : queue, orgId (optionnel)
Success/Fail Rate
Taux de succès/échec par type de job.
Métriques :
bullmq.job.success{jobName="export:build"}bullmq.job.fail{jobName="export:build"}Type : Counter Labels :jobName,orgId(optionnel)
P95 Duration
Durée d'exécution (percentile 95) par type de job.
Métrique : bullmq.job.duration{jobName="export:build",percentile="95"}
Type : Histogram
Labels : jobName, percentile
Coûts IA
Suivi des coûts OpenAI par organisation.
Métrique : ai.cost_cents{orgId="org_123"}
Type : Counter
Labels : orgId, model
Exposition métriques
Prometheus (recommandé) :
import { Registry, Counter, Histogram, Gauge } from 'prom-client';
const registry = new Registry();
const jobDuration = new Histogram({
name: 'bullmq_job_duration_seconds',
help: 'Job execution duration',
labelNames: ['jobName', 'status'],
registers: [registry]
});
const queueDepth = new Gauge({
name: 'bullmq_queue_depth',
help: 'Number of jobs waiting in queue',
labelNames: ['queue'],
registers: [registry]
});
Endpoint : GET /metrics (format Prometheus)
Logs structurés
Format standard
Start :
{
level: 'info',
message: 'Job started',
jobId: string,
jobName: string,
jobRunId: string,
orgId: string,
userId?: string,
entityId?: string,
attempt: number,
timestamp: string
}
Success :
{
level: 'info',
message: 'Job completed',
jobId: string,
jobName: string,
jobRunId: string,
duration: number, // ms
result?: Record<string, any>,
timestamp: string
}
Error :
{
level: 'error',
message: 'Job failed',
jobId: string,
jobName: string,
jobRunId: string,
error: string,
stack?: string,
attempt: number,
willRetry: boolean,
timestamp: string
}
Centralisation
Options :
- ELK Stack (Elasticsearch, Logstash, Kibana)
- CloudWatch (AWS)
- Datadog
- Loki (Grafana)
Format : JSON structuré avec champs standardisés.
Monitoring Dashboard
Dashboard Bull Board
Intégration existante : /ops/queues (production recommandée via https://queues.aaperture.com)
Fonctionnalités :
- Visualisation en temps réel de toutes les queues
- Gestion des jobs (retry, purge, filtrage)
- Statistiques par queue
Dashboard Métriques
Grafana (recommandé) ou équivalent :
Panels :
- Queue Depth : Graphique temps réel par queue
- Job Success/Fail Rate : Graphique avec taux de succès/échec
- P95 Duration : Graphique durée d'exécution par type de job
- Coûts IA : Graphique coûts cumulés par organisation
- Jobs par heure : Graphique volume de jobs
Alertes :
- Queue depth > 1000 → Alerte
- Fail rate > 10% → Alerte
- P95 duration > 60s → Alerte
- Coûts IA > 100€/jour → Alerte
Dead Letter Queue (DLQ)
Gestion DLQ
Jobs échoués après toutes les tentatives → DLQ.
Table job_runs :
status = 'FAILED'+attempt >= maxAttempts→ En DLQ- Flag
in_dlq = true(optionnel)
Monitoring DLQ
Métrique :
const dlqDepth = new Gauge({
name: 'bullmq_dlq_depth',
help: 'Number of jobs in Dead Letter Queue',
labelNames: ['jobName'],
registers: [registry]
});
Dashboard :
- Liste jobs en DLQ avec filtres (jobName, orgId, date)
- Détails erreur pour chaque job
- Actions : replay, delete, view logs
Replay
Endpoint : POST /admin/jobs/:jobRunId/replay
Processus :
- Charger
job_runsavecjobRunId - Vérifier
status = 'FAILED'etin_dlq = true - Créer nouveau job avec même payload
- Mettre à jour
job_runs:in_dlq = false, nouveaujobRunId
UI :
- Bouton "Replay" dans liste DLQ
- Confirmation avant replay
- Notification si replay réussi/échoué
Job Runs Table
Structure
Voir WORKERS_ARCHITECTURE.md pour structure complète.
Requêtes utiles
Jobs en cours :
SELECT * FROM job_runs
WHERE status = 'RUNNING'
ORDER BY started_at DESC;
Jobs échoués récents :
SELECT * FROM job_runs
WHERE status = 'FAILED'
AND finished_at > NOW() - INTERVAL '24 hours'
ORDER BY finished_at DESC;
Jobs lents (P95) :
SELECT job_name,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM (finished_at - started_at))) as p95_duration
FROM job_runs
WHERE status = 'COMPLETED'
AND finished_at > NOW() - INTERVAL '7 days'
GROUP BY job_name;
Coûts IA par organisation :
SELECT org_id,
SUM(cost_cents) as total_cost_cents
FROM job_runs jr
JOIN ai_tasks at ON jr.entity_id = at.id
WHERE jr.job_name = 'ai:run'
AND jr.finished_at > NOW() - INTERVAL '30 days'
GROUP BY org_id
ORDER BY total_cost_cents DESC;
Alertes
Configuration alertes
Prometheus Alertmanager ou équivalent :
Alertes critiques :
groups:
- name: workers
rules:
- alert: HighQueueDepth
expr: bullmq_queue_depth > 1000
for: 5m
annotations:
summary: "Queue depth is high"
- alert: HighFailRate
expr: rate(bullmq_job_fail[5m]) / rate(bullmq_job_total[5m]) > 0.1
for: 5m
annotations:
summary: "Job fail rate is high"
- alert: SlowJobs
expr: bullmq_job_duration_seconds{percentile="95"} > 60
for: 10m
annotations:
summary: "Jobs are slow (P95 > 60s)"
- alert: HighAICosts
expr: sum(rate(ai_cost_cents[1d])) > 10000
for: 1h
annotations:
summary: "AI costs are high (>100€/day)"
Notifications :
- Slack
- PagerDuty
- Webhook
UI Admin
Page Monitoring
Route : /admin/monitoring
Sections :
- Overview :
- Métriques clés (queue depth, success rate, P95 duration)
- Graphiques temps réel
- Jobs :
- Liste
job_runsavec filtres (status, jobName, orgId, date) - Détails job (payload, result, error, logs)
- Actions : replay, cancel, view logs
- Liste
- DLQ :
- Liste jobs en DLQ
- Actions : replay, delete
- Métriques :
- Graphiques métriques (queue depth, success rate, duration, coûts)
- Filtres par queue, jobName, orgId
Composants Frontend
MonitoringDashboard
Dashboard principal avec overview et graphiques.
JobRunsList
Liste job_runs avec :
- Filtres (status, jobName, orgId, date)
- Pagination
- Actions : replay, cancel, view logs
DLQPanel
Panel DLQ avec :
- Liste jobs en DLQ
- Actions : replay, delete
- Statistiques (nombre jobs, par type)
Tests
Unit
- Corrélation IDs : Propagation correcte
- Métriques : Incrémentation correcte
- Logs : Format structuré correct
Integration
- Job runs : Enregistrement correct dans DB
- Métriques : Exposition Prometheus correcte
- DLQ : Jobs échoués → DLQ
E2E
- Monitoring dashboard : Affichage métriques temps réel
- Replay : Replay job depuis DLQ → succès
Checklist d'implémentation
- Corrélation :
requestId→jobId→job_runs.id - Logs structurés : Format standard avec IDs
- Métriques Prometheus : Queue depth, success/fail rate, P95 duration, coûts IA
- Dashboard Grafana : Panels métriques + alertes
- DLQ : Gestion jobs échoués + replay
- UI Admin : Page monitoring avec job runs, DLQ, métriques
- Alertes : Configuration Prometheus Alertmanager