Aller au contenu principal

Observabilité - Workers & Jobs - Aaperture

Observabilité complète : Corrélation logs, métriques, monitoring des jobs BullMQ, DLQ et replay.

Objectif

Fournir une observabilité complète pour les workers et jobs BullMQ : corrélation logs, métriques, monitoring, DLQ et replay.


Corrélation

IDs de corrélation

Chaîne de corrélation :

requestId (API) → jobId (BullMQ) → job_runs.id

Tous les logs incluent ces IDs pour traçabilité complète.

Exemple de log structuré

{
level: 'info',
message: 'Job started',
requestId: 'req_abc123', // Si déclenché depuis API
jobId: 'org_123:export:build:entity_456:hash789',
jobRunId: 'run_xyz789',
jobName: 'export:build',
orgId: 'org_123',
userId: 'user_456',
entityId: 'entity_456',
attempt: 1,
timestamp: '2024-01-01T00:00:00Z'
}

Implémentation

Middleware API :

@Injectable()
export class CorrelationMiddleware implements NestMiddleware {
use(req: Request, res: Response, next: NextFunction) {
const requestId = req.headers['x-request-id'] || generateId();
req['requestId'] = requestId;
res.setHeader('X-Request-ID', requestId);
next();
}
}

Logger avec corrélation :

class WorkerLogger {
log(job: Job, message: string, metadata?: Record<string, any>) {
this.logger.log({
...metadata,
jobId: job.id,
jobName: job.name,
jobRunId: job.data.jobRunId,
requestId: job.data.requestId,
message
});
}
}

Métriques

Métriques clés

Queue Depth

Nombre de jobs en attente par queue.

Métrique : bullmq.queue.depth{queue="export"} Type : Gauge Labels : queue, orgId (optionnel)

Success/Fail Rate

Taux de succès/échec par type de job.

Métriques :

  • bullmq.job.success{jobName="export:build"}
  • bullmq.job.fail{jobName="export:build"} Type : Counter Labels : jobName, orgId (optionnel)

P95 Duration

Durée d'exécution (percentile 95) par type de job.

Métrique : bullmq.job.duration{jobName="export:build",percentile="95"} Type : Histogram Labels : jobName, percentile

Coûts IA

Suivi des coûts OpenAI par organisation.

Métrique : ai.cost_cents{orgId="org_123"} Type : Counter Labels : orgId, model

Exposition métriques

Prometheus (recommandé) :

import { Registry, Counter, Histogram, Gauge } from 'prom-client';

const registry = new Registry();

const jobDuration = new Histogram({
name: 'bullmq_job_duration_seconds',
help: 'Job execution duration',
labelNames: ['jobName', 'status'],
registers: [registry]
});

const queueDepth = new Gauge({
name: 'bullmq_queue_depth',
help: 'Number of jobs waiting in queue',
labelNames: ['queue'],
registers: [registry]
});

Endpoint : GET /metrics (format Prometheus)


Logs structurés

Format standard

Start :

{
level: 'info',
message: 'Job started',
jobId: string,
jobName: string,
jobRunId: string,
orgId: string,
userId?: string,
entityId?: string,
attempt: number,
timestamp: string
}

Success :

{
level: 'info',
message: 'Job completed',
jobId: string,
jobName: string,
jobRunId: string,
duration: number, // ms
result?: Record<string, any>,
timestamp: string
}

Error :

{
level: 'error',
message: 'Job failed',
jobId: string,
jobName: string,
jobRunId: string,
error: string,
stack?: string,
attempt: number,
willRetry: boolean,
timestamp: string
}

Centralisation

Options :

  • ELK Stack (Elasticsearch, Logstash, Kibana)
  • CloudWatch (AWS)
  • Datadog
  • Loki (Grafana)

Format : JSON structuré avec champs standardisés.


Monitoring Dashboard

Dashboard Bull Board

Intégration existante : /ops/queues (production recommandée via https://queues.aaperture.com)

Fonctionnalités :

  • Visualisation en temps réel de toutes les queues
  • Gestion des jobs (retry, purge, filtrage)
  • Statistiques par queue

Dashboard Métriques

Grafana (recommandé) ou équivalent :

Panels :

  1. Queue Depth : Graphique temps réel par queue
  2. Job Success/Fail Rate : Graphique avec taux de succès/échec
  3. P95 Duration : Graphique durée d'exécution par type de job
  4. Coûts IA : Graphique coûts cumulés par organisation
  5. Jobs par heure : Graphique volume de jobs

Alertes :

  • Queue depth > 1000 → Alerte
  • Fail rate > 10% → Alerte
  • P95 duration > 60s → Alerte
  • Coûts IA > 100€/jour → Alerte

Dead Letter Queue (DLQ)

Gestion DLQ

Jobs échoués après toutes les tentatives → DLQ.

Table job_runs :

  • status = 'FAILED' + attempt >= maxAttempts → En DLQ
  • Flag in_dlq = true (optionnel)

Monitoring DLQ

Métrique :

const dlqDepth = new Gauge({
name: 'bullmq_dlq_depth',
help: 'Number of jobs in Dead Letter Queue',
labelNames: ['jobName'],
registers: [registry]
});

Dashboard :

  • Liste jobs en DLQ avec filtres (jobName, orgId, date)
  • Détails erreur pour chaque job
  • Actions : replay, delete, view logs

Replay

Endpoint : POST /admin/jobs/:jobRunId/replay

Processus :

  1. Charger job_runs avec jobRunId
  2. Vérifier status = 'FAILED' et in_dlq = true
  3. Créer nouveau job avec même payload
  4. Mettre à jour job_runs : in_dlq = false, nouveau jobRunId

UI :

  • Bouton "Replay" dans liste DLQ
  • Confirmation avant replay
  • Notification si replay réussi/échoué

Job Runs Table

Structure

Voir WORKERS_ARCHITECTURE.md pour structure complète.

Requêtes utiles

Jobs en cours :

SELECT * FROM job_runs 
WHERE status = 'RUNNING'
ORDER BY started_at DESC;

Jobs échoués récents :

SELECT * FROM job_runs 
WHERE status = 'FAILED'
AND finished_at > NOW() - INTERVAL '24 hours'
ORDER BY finished_at DESC;

Jobs lents (P95) :

SELECT job_name, 
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY EXTRACT(EPOCH FROM (finished_at - started_at))) as p95_duration
FROM job_runs
WHERE status = 'COMPLETED'
AND finished_at > NOW() - INTERVAL '7 days'
GROUP BY job_name;

Coûts IA par organisation :

SELECT org_id, 
SUM(cost_cents) as total_cost_cents
FROM job_runs jr
JOIN ai_tasks at ON jr.entity_id = at.id
WHERE jr.job_name = 'ai:run'
AND jr.finished_at > NOW() - INTERVAL '30 days'
GROUP BY org_id
ORDER BY total_cost_cents DESC;

Alertes

Configuration alertes

Prometheus Alertmanager ou équivalent :

Alertes critiques :

groups:
- name: workers
rules:
- alert: HighQueueDepth
expr: bullmq_queue_depth > 1000
for: 5m
annotations:
summary: "Queue depth is high"

- alert: HighFailRate
expr: rate(bullmq_job_fail[5m]) / rate(bullmq_job_total[5m]) > 0.1
for: 5m
annotations:
summary: "Job fail rate is high"

- alert: SlowJobs
expr: bullmq_job_duration_seconds{percentile="95"} > 60
for: 10m
annotations:
summary: "Jobs are slow (P95 > 60s)"

- alert: HighAICosts
expr: sum(rate(ai_cost_cents[1d])) > 10000
for: 1h
annotations:
summary: "AI costs are high (>100€/day)"

Notifications :

  • Email
  • Slack
  • PagerDuty
  • Webhook

UI Admin

Page Monitoring

Route : /admin/monitoring

Sections :

  1. Overview :
    • Métriques clés (queue depth, success rate, P95 duration)
    • Graphiques temps réel
  2. Jobs :
    • Liste job_runs avec filtres (status, jobName, orgId, date)
    • Détails job (payload, result, error, logs)
    • Actions : replay, cancel, view logs
  3. DLQ :
    • Liste jobs en DLQ
    • Actions : replay, delete
  4. Métriques :
    • Graphiques métriques (queue depth, success rate, duration, coûts)
    • Filtres par queue, jobName, orgId

Composants Frontend

MonitoringDashboard

Dashboard principal avec overview et graphiques.

JobRunsList

Liste job_runs avec :

  • Filtres (status, jobName, orgId, date)
  • Pagination
  • Actions : replay, cancel, view logs

DLQPanel

Panel DLQ avec :

  • Liste jobs en DLQ
  • Actions : replay, delete
  • Statistiques (nombre jobs, par type)

Tests

Unit

  • Corrélation IDs : Propagation correcte
  • Métriques : Incrémentation correcte
  • Logs : Format structuré correct

Integration

  • Job runs : Enregistrement correct dans DB
  • Métriques : Exposition Prometheus correcte
  • DLQ : Jobs échoués → DLQ

E2E

  • Monitoring dashboard : Affichage métriques temps réel
  • Replay : Replay job depuis DLQ → succès

Checklist d'implémentation

  • Corrélation : requestIdjobIdjob_runs.id
  • Logs structurés : Format standard avec IDs
  • Métriques Prometheus : Queue depth, success/fail rate, P95 duration, coûts IA
  • Dashboard Grafana : Panels métriques + alertes
  • DLQ : Gestion jobs échoués + replay
  • UI Admin : Page monitoring avec job runs, DLQ, métriques
  • Alertes : Configuration Prometheus Alertmanager

Références