Feature: Job Queue System
Metadata
- Issue ID: FEAT-33
- Status: Done
- Owner: Kzu0-afk
- Related PRs:
33-job-queue-system->dev - Canonical feature doc:
docs/features/FEAT-33-Job-Queue-System.md
Overview
Centralized background job queue system for moving heavy or slow work out of the HTTP request cycle. The API validates requests, stores required metadata, enqueues work, and returns quickly while dedicated workers process OCR, AI processing, indexing, email, and notification jobs asynchronously.
The queue system is backend infrastructure. It provides Redis-backed queues, worker processes, retry handling, job status tracking, logging, and idempotency protections so feature modules can enqueue background work without embedding long-running logic in controllers.
Frontend Behavior
No end-user UI is required for the initial implementation.
- API actions that enqueue background work return
202 Acceptedwith ajob_id. - Frontend flows should show a non-blocking processing state.
- Features that depend on queued results should poll an existing resource endpoint or a future job-status endpoint.
- User-facing messages must not expose queue names, Redis errors, stack traces, or worker internals.
Future admin monitoring may show queue health, job counts, failure counts, and retry history. Any dashboard, including Bull Board, must be admin-protected before exposure.
Backend Behavior
Current Codebase Context
The backend is a NestJS app using ConfigModule, PrismaModule, module-scoped services, and centralized logging through appLogger.
Relevant integration points:
backend/src/app.module.tsregisters global config and feature modules.backend/src/prisma/prisma.module.tsprovides database access.backend/src/common/logger/app-logger.tsprovides centralized logging.backend/src/notifications/notifications.service.tspersists notifications.backend/src/mail/mail.service.tssends email.backend/src/documents/documents.service.tsowns document reads and must not process OCR/indexing directly.
Technology Choice
Use BullMQ with Redis as the queue backend.
Required backend dependencies:
bullmq@nestjs/bullmqif using Nest processor decorators
Required environment variables:
REDIS_URL=redis://localhost:6379
JOB_QUEUE_DEFAULT_ATTEMPTS=3
JOB_QUEUE_DEFAULT_BACKOFF_MS=5000
JOB_QUEUE_WORKER_CONCURRENCY=5
JOB_QUEUE_REMOVE_ON_COMPLETE=10 # free-tier Redis (~30 MB ceiling); raise to 1000+ on paid plans
JOB_QUEUE_REMOVE_ON_FAIL=50 # see notes below; raise to ~500 on paid Redis
# Course-generation queue tuning (FEAT-84)
JOB_QUEUE_COURSE_GEN_CONCURRENCY=2 # how many DIFFERENT courses can advance in parallel
JOB_QUEUE_COURSE_GEN_LIMITER_MS=2000 # min gap between Gemini calls (was 8000; tune higher on paid Gemini tier)
Production/staging values must be configured through deployment secrets. Do not log Redis credentials.
Redis must be configured with maxmemory-policy noeviction for reliable BullMQ behavior. Cache-oriented policies such as volatile-lru can evict queue keys under memory pressure and are not production-safe for background jobs.
Queue Names
Use centralized queue-name constants:
export enum JobQueueName {
OCR = 'ocr_queue',
AI_PROCESSING = 'ai_processing_queue',
INDEXING = 'indexing_queue',
EMAIL = 'email_queue',
NOTIFICATION = 'notification_queue',
COURSE_GENERATION = 'course_generation_queue', // FEAT-84
}
| Queue | Purpose | Initial Processor Responsibility |
|---|---|---|
ocr_queue | Extract text from uploaded documents | Receive document ID, process OCR, store extracted text/status |
ai_processing_queue | Run AI-heavy tasks | Receive target entity ID/task type, store output |
indexing_queue | Update searchable indexes | Receive entity ID/type, update index state |
email_queue | Send email asynchronously | Call MailService and log failures |
notification_queue | Persist user notifications asynchronously | Call NotificationsService.create() |
Proposed Module Structure
backend/src/job-queue/
├── dto/
│ ├── enqueue-job.dto.ts
│ └── job-status-query.dto.ts
├── enums/
│ ├── job-queue-name.enum.ts
│ ├── job-status.enum.ts
│ └── job-type.enum.ts
├── processors/
│ ├── ai-processing.processor.ts
│ ├── email.processor.ts
│ ├── indexing.processor.ts
│ ├── notification.processor.ts
│ └── ocr.processor.ts
├── job-queue.controller.ts
├── job-queue.module.ts
├── job-queue.service.ts
├── job-queue.types.ts
└── worker.ts
Implementation rules:
JobQueueModuleregisters BullMQ queues and exportsJobQueueService.JobQueueServiceis the single producer API used by other modules.- Processors own job execution and status updates.
worker.tsbootstraps worker-only runtime so workers can run independently from the API server.- The API server may enqueue jobs but must not process heavy jobs in request handlers.
Database Tracking
Add a Prisma model and SQL migration for a background_jobs table.
Required fields:
| Field | Purpose |
|---|---|
id | Internal UUID primary key |
queue_name | Queue where the job was submitted |
job_type | Logical task type inside the queue |
bull_job_id | BullMQ job ID for Redis lookup/debugging |
status | pending, processing, completed, failed |
payload_json | JSON payload used by the worker |
result_json | Optional worker result metadata |
error_message | Safe failure summary |
attempts_made | Number of attempts already made |
max_attempts | Maximum attempts configured for this job |
idempotency_key | Unique key to prevent duplicate side effects |
created_at | Job creation timestamp |
started_at | First processing timestamp |
finished_at | Completion/failure timestamp |
updated_at | Last status update timestamp |
Indexes:
queue_namestatuscreated_atidempotency_keyunique where present(queue_name, status)
Migration must follow /docs/database-schema/migration-convention: new folder, guarded SQL, no old migration edits, and PR to dev.
Job Status Lifecycle
flowchart LR
enqueue["API validates input"] --> pending["pending"]
pending --> processing["processing"]
processing --> completed["completed"]
processing --> retry["retry scheduled"]
retry --> processing
processing --> failed["failed"]
| Status | Meaning |
|---|---|
pending | Job accepted and queued, but not started |
processing | Worker has started the job |
completed | Worker finished successfully |
failed | Worker exhausted retries or failed permanently |
Producer Behavior
JobQueueService should expose typed enqueue methods:
enqueueOcrJob(payload, options?)enqueueAiProcessingJob(payload, options?)enqueueIndexingJob(payload, options?)enqueueEmailJob(payload, options?)enqueueNotificationJob(payload, options?)getJobStatus(jobId)
Producer requirements:
- Validate queue name and job type.
- Require or generate an idempotency key.
- Create a
background_jobsrow withpendingstatus. - Submit the BullMQ job with configured attempts and exponential backoff.
- Save BullMQ
job.idinbackground_jobs.bull_job_id. - Return
{ job_id, status }. - Log enqueue failures through
appLogger.
Processor Behavior
Each queue must have its own processor.
Processor requirements:
- Mark job as
processingwhen execution starts. - Log start time, end time, duration, queue name, job type, and job ID.
- Use idempotency checks before applying side effects.
- Mark job as
completedon success. - Let BullMQ retry retryable failures.
- Mark job as
failedafter max attempts are exhausted. - Store safe error summaries only.
API Behavior
Recommended endpoints:
POST /jobs/:queue
- Validates request body.
- Enqueues a job through
JobQueueService. - Returns
202 Accepted. - Restricted to internal/admin callers once auth policy is finalized.
Response:
{
"job_id": "uuid",
"status": "pending"
}
GET /jobs/:id
- Returns job status for a known job ID.
- Restricted to the job owner or admin for user-facing jobs.
- Returns 404 if job does not exist or requester is not allowed to view it.
Integration Points
Documents:
- Upload/import flows enqueue OCR and indexing jobs.
- Controllers/services must not perform OCR or indexing synchronously.
Notifications:
notification_queueprocessor callsNotificationsService.create().NotificationsServiceremains responsible only for persistence and read/update operations.
Email:
email_queueprocessor callsMailService.- Email failures are logged and retried by queue policy.
AI Processing:
- AI jobs should include cost-control and feature-access checks before enqueueing or before processor execution.
- Direct AI calls should not be added to controllers as part of FEAT-33.
Worker Runtime
Workers must run independently from the API server.
Suggested scripts:
{
"start:worker": "nest start --entryFile worker",
"start:worker:dev": "nest start --watch --entryFile worker"
}
Deployment model:
- API process: HTTP requests and job enqueueing.
- Worker process: Redis queue consumption and processor execution.
- Multiple worker processes may run concurrently with idempotency protections.
Retry, Backoff, and Concurrency
Default queue options:
- Attempts:
3 - Backoff: exponential
- Initial delay:
5000ms - Remove completed jobs after retention threshold
- Retain failed jobs for investigation
OCR and AI queues should use lower concurrency than email/notification queues.
Logging and Security
Every job execution should log:
- queue name
- job type
- job ID
- status transition
- start/end time
- duration
- attempts made
- safe error message
Security requirements:
- Do not expose internal queue payloads to unauthorized users.
- Protect job endpoints with auth/role guards when exposed.
- Validate all payloads with DTOs before enqueueing.
- Do not log secrets, tokens, email credentials, Redis URLs, full AI prompts, or private document content.
- Add rate limiting before public enqueue endpoints are introduced.
QA Test Scenarios
| Scenario ID | Description | Steps | Input | Expected Result |
|---|---|---|---|---|
| FEAT-33-01 | Enqueue OCR job happy path | Submit valid OCR enqueue request | Valid document ID and idempotency key | Returns 202 with job_id; DB row is pending; BullMQ job is created |
| FEAT-33-02 | Worker completes OCR job | Start worker with a pending OCR job | Existing pending OCR job | Job transitions pending -> processing -> completed; duration is logged |
| FEAT-33-03 | Invalid queue name rejected | Submit enqueue request for unknown queue | Unsupported queue param | Returns 400; no DB job row or BullMQ job is created |
| FEAT-33-04 | Invalid payload rejected | Submit enqueue request missing required payload fields | Missing target entity ID | Returns 400 validation error; no job is created |
| FEAT-33-05 | Redis unavailable on enqueue | Stop Redis, then enqueue a job | Valid payload while Redis is unavailable | Returns retryable failure; error is logged without secrets |
| FEAT-33-06 | Worker retry on transient failure | Force processor dependency to fail once, then recover | Job with attempts remaining | Job retries with exponential backoff and eventually completes |
| FEAT-33-07 | Max retries exhausted | Force processor to fail on every attempt | Job configured with max attempts | Job status becomes failed; safe error summary is stored |
| FEAT-33-08 | Idempotency prevents duplicate jobs | Submit same idempotency key twice | Same queue, job type, and idempotency key | Second request returns existing job or conflict behavior; duplicate side effect is not produced |
| FEAT-33-09 | Notification queue integration | Enqueue notification job | Valid notification payload | Processor calls NotificationsService.create(); notification is persisted once |
| FEAT-33-10 | Email queue integration | Enqueue email job | Valid recipient and email metadata | Processor calls MailService; API request is not blocked by SMTP delivery |
| FEAT-33-11 | Job status lookup | Query existing job status | Valid job ID | Returns job status and timestamps without exposing private payload content |
| FEAT-33-12 | Unauthorized status lookup | Query another user's job status | Valid auth for wrong user | Returns 404 or 403 according to finalized auth policy |
| FEAT-33-13 | Concurrent workers | Run two workers on same queue | Multiple pending jobs | Jobs are distributed safely; no duplicate effects occur |
| FEAT-33-14 | Queue-specific concurrency | Configure low OCR concurrency and enqueue multiple OCR jobs | Several OCR jobs | Worker processes no more than configured concurrency |
| FEAT-33-15 | Logging coverage | Process successful and failed jobs | One success, one failure | Logs include queue, job ID, status, attempts, and duration |
Edge Cases
- Duplicate enqueue requests: same idempotency key should not create duplicate side effects.
- Worker crash mid-job: BullMQ should make the job retryable according to stalled-job handling and attempts policy.
- Redis disconnect: API and worker should fail visibly through logs and health checks without leaking credentials.
- Redis eviction policy: Redis must use
maxmemory-policy noeviction; providers that force cache eviction policies should be treated as development-only for BullMQ. - DB status update failure: retry behavior must avoid marking a job completed when durable status was not saved.
- Poison jobs: malformed but accepted jobs should exhaust retries and become
failed, not retry forever. - Large payloads: store only identifiers and metadata in queue payloads; keep large file/document content in durable storage.
- Deleted target entity: if a document/user is deleted before processing, processor logs and marks job failed or completed-noop based on job type.
- Out-of-order jobs: indexing jobs may run before OCR jobs complete; processors must check current entity state before writing.
- Multiple workers: all processors must be idempotent because more than one worker process can run concurrently.
- Sensitive data: payloads and logs must not contain passwords, tokens, raw private document content, or full AI prompts.
Post-launch additions
These additions were made after the initial FEAT-33 merge to support scaling and FEAT-84:
Socket.IO Redis adapter (multi-replica realtime)
Without a Redis adapter, Socket.IO room broadcasts (server.to('course:abc').emit(...)) only reach clients connected to the same Node process. With multiple backend replicas behind a load balancer, the worker emitting an event lands on replica B while the client subscribed to that room is on replica A — the broadcast is silently lost.
backend/src/common/gateway/redis-io.adapter.ts wraps @socket.io/redis-adapter and is wired in main.ts:
const redisIoAdapter = new RedisIoAdapter(app);
await redisIoAdapter.connectToRedis();
app.useWebSocketAdapter(redisIoAdapter);
Two dedicated IORedis connections (pub + sub) are opened, separate from the BullMQ shared connection so pub/sub traffic does not block queue commands. Falls back to the default in-memory adapter when REDIS_URL is unset (single-instance dev only).
UnrecoverableError pattern (no-retry sentinel)
Some failures should NOT be retried — e.g. credit exhaustion mid-pipeline. The standard BullMQ UnrecoverableError thrown from a processor:
- Stops the current job immediately.
- Marks
background_jobs.status = 'failed'. - Does not consume additional
attempts_made— the job is terminal after one attempt regardless of the configuredmax_attempts.
The course-generation processor catches credit-limit BadRequestException from SubscriptionsService.useAiCredit, marks the course failed_insufficient_credits, emits a course_failed socket event, then throws UnrecoverableError. The outer catch handler treats error instanceof UnrecoverableError as terminal — it does NOT take the fail-soft "advance to next step" branch — so no downstream jobs are enqueued.
Daily cleanup cron
QueueHealthService.cleanupStaleJobs() runs at 3:00 AM via @nestjs/schedule's @Cron(CronExpression.EVERY_DAY_AT_3AM):
queue.clean(24h, 1000, 'completed' | 'failed' | 'wait' | 'delayed')per queue.- Belt-and-suspenders for
removeOnComplete/removeOnFailcount-based rotation, which only fires on new enqueues. An idle-but-stale queue can otherwise hold a tail of jobs indefinitely. - Idempotent:
queue.cleanis atomic, so duplicate runs across replicas (when the cron fires on multiple instances) just no-op. - Audit history in Postgres
background_jobsis untouched — only Redis state.
Requires ScheduleModule.forRoot() in AppModule.imports.
Course-generation processor (FEAT-84)
The course_generation_queue runs a self-chaining pipeline:
outline → module_lessons (per module, batched) → module_exam (per module)
→ … → final_exam → mastery_analysis (terminal)
Processor decorator:
@Processor(JobQueueName.COURSE_GENERATION, {
concurrency: COURSE_GEN_CONCURRENCY,
limiter: { max: 1, duration: COURSE_GEN_LIMITER_MS },
})
COURSE_GEN_CONCURRENCY(default 2) — different courses can advance in parallel; each course's own chain stays sequential because each step self-enqueues its successor.COURSE_GEN_LIMITER_MS(default 2000) — spaces back-to-back Gemini calls. Tune up if the Gemini tier's RPM limit changes.
Idempotency keys for every step (outline:${courseId}, module_lessons:${moduleId}, module_exam:${moduleId}, final_exam:${courseId}, mastery_analysis:${courseId}) prevent duplicates.
Full feature docs: 84-ai-course-generation.
Notes
- Feature flags: Queue-backed features can be gated later using FEAT-35. The queue infrastructure should be enabled in deployed environments only when Redis is configured.
- Dependencies: Redis, BullMQ, NestJS backend, Prisma,
appLogger,NotificationsService, andMailService. - Migration dependency: Add
background_jobsthrough a new migration following the migration convention. - Notification dependency: FEAT-40 assumes notification creation is triggered by the queue processor. FEAT-33 provides that queue layer.
- No frontend dashboard in scope: Admin monitoring UI and Bull Board are future work.
- No placeholder jobs/data: Do not seed fake jobs. Tests should create and clean up their own records.
- Known limitation: OCR, AI provider, and search-index implementations are not owned by FEAT-33. FEAT-33 provides queue infrastructure and integration contracts.
- Build gate: Run
pnpm buildbefore PR as required bydocs/AGENTS.md.