Feature: Job Queue System

Metadata

  • Issue ID: FEAT-33
  • Status: Done
  • Owner: Kzu0-afk
  • Related PRs: 33-job-queue-system -> dev
  • Canonical feature doc: docs/features/FEAT-33-Job-Queue-System.md

Overview

Centralized background job queue system for moving heavy or slow work out of the HTTP request cycle. The API validates requests, stores required metadata, enqueues work, and returns quickly while dedicated workers process OCR, AI processing, indexing, email, and notification jobs asynchronously.

The queue system is backend infrastructure. It provides Redis-backed queues, worker processes, retry handling, job status tracking, logging, and idempotency protections so feature modules can enqueue background work without embedding long-running logic in controllers.


Frontend Behavior

No end-user UI is required for the initial implementation.

  • API actions that enqueue background work return 202 Accepted with a job_id.
  • Frontend flows should show a non-blocking processing state.
  • Features that depend on queued results should poll an existing resource endpoint or a future job-status endpoint.
  • User-facing messages must not expose queue names, Redis errors, stack traces, or worker internals.

Future admin monitoring may show queue health, job counts, failure counts, and retry history. Any dashboard, including Bull Board, must be admin-protected before exposure.


Backend Behavior

Current Codebase Context

The backend is a NestJS app using ConfigModule, PrismaModule, module-scoped services, and centralized logging through appLogger.

Relevant integration points:

  • backend/src/app.module.ts registers global config and feature modules.
  • backend/src/prisma/prisma.module.ts provides database access.
  • backend/src/common/logger/app-logger.ts provides centralized logging.
  • backend/src/notifications/notifications.service.ts persists notifications.
  • backend/src/mail/mail.service.ts sends email.
  • backend/src/documents/documents.service.ts owns document reads and must not process OCR/indexing directly.

Technology Choice

Use BullMQ with Redis as the queue backend.

Required backend dependencies:

  • bullmq
  • @nestjs/bullmq if using Nest processor decorators

Required environment variables:

REDIS_URL=redis://localhost:6379
JOB_QUEUE_DEFAULT_ATTEMPTS=3
JOB_QUEUE_DEFAULT_BACKOFF_MS=5000
JOB_QUEUE_WORKER_CONCURRENCY=5
JOB_QUEUE_REMOVE_ON_COMPLETE=10        # free-tier Redis (~30 MB ceiling); raise to 1000+ on paid plans
JOB_QUEUE_REMOVE_ON_FAIL=50            # see notes below; raise to ~500 on paid Redis

# Course-generation queue tuning (FEAT-84)
JOB_QUEUE_COURSE_GEN_CONCURRENCY=2     # how many DIFFERENT courses can advance in parallel
JOB_QUEUE_COURSE_GEN_LIMITER_MS=2000   # min gap between Gemini calls (was 8000; tune higher on paid Gemini tier)

Production/staging values must be configured through deployment secrets. Do not log Redis credentials.

Redis must be configured with maxmemory-policy noeviction for reliable BullMQ behavior. Cache-oriented policies such as volatile-lru can evict queue keys under memory pressure and are not production-safe for background jobs.

Queue Names

Use centralized queue-name constants:

export enum JobQueueName {
  OCR = 'ocr_queue',
  AI_PROCESSING = 'ai_processing_queue',
  INDEXING = 'indexing_queue',
  EMAIL = 'email_queue',
  NOTIFICATION = 'notification_queue',
  COURSE_GENERATION = 'course_generation_queue',  // FEAT-84
}
QueuePurposeInitial Processor Responsibility
ocr_queueExtract text from uploaded documentsReceive document ID, process OCR, store extracted text/status
ai_processing_queueRun AI-heavy tasksReceive target entity ID/task type, store output
indexing_queueUpdate searchable indexesReceive entity ID/type, update index state
email_queueSend email asynchronouslyCall MailService and log failures
notification_queuePersist user notifications asynchronouslyCall NotificationsService.create()

Proposed Module Structure

backend/src/job-queue/
├── dto/
│   ├── enqueue-job.dto.ts
│   └── job-status-query.dto.ts
├── enums/
│   ├── job-queue-name.enum.ts
│   ├── job-status.enum.ts
│   └── job-type.enum.ts
├── processors/
│   ├── ai-processing.processor.ts
│   ├── email.processor.ts
│   ├── indexing.processor.ts
│   ├── notification.processor.ts
│   └── ocr.processor.ts
├── job-queue.controller.ts
├── job-queue.module.ts
├── job-queue.service.ts
├── job-queue.types.ts
└── worker.ts

Implementation rules:

  • JobQueueModule registers BullMQ queues and exports JobQueueService.
  • JobQueueService is the single producer API used by other modules.
  • Processors own job execution and status updates.
  • worker.ts bootstraps worker-only runtime so workers can run independently from the API server.
  • The API server may enqueue jobs but must not process heavy jobs in request handlers.

Database Tracking

Add a Prisma model and SQL migration for a background_jobs table.

Required fields:

FieldPurpose
idInternal UUID primary key
queue_nameQueue where the job was submitted
job_typeLogical task type inside the queue
bull_job_idBullMQ job ID for Redis lookup/debugging
statuspending, processing, completed, failed
payload_jsonJSON payload used by the worker
result_jsonOptional worker result metadata
error_messageSafe failure summary
attempts_madeNumber of attempts already made
max_attemptsMaximum attempts configured for this job
idempotency_keyUnique key to prevent duplicate side effects
created_atJob creation timestamp
started_atFirst processing timestamp
finished_atCompletion/failure timestamp
updated_atLast status update timestamp

Indexes:

  • queue_name
  • status
  • created_at
  • idempotency_key unique where present
  • (queue_name, status)

Migration must follow /docs/database-schema/migration-convention: new folder, guarded SQL, no old migration edits, and PR to dev.

Job Status Lifecycle

flowchart LR
  enqueue["API validates input"] --> pending["pending"]
  pending --> processing["processing"]
  processing --> completed["completed"]
  processing --> retry["retry scheduled"]
  retry --> processing
  processing --> failed["failed"]
StatusMeaning
pendingJob accepted and queued, but not started
processingWorker has started the job
completedWorker finished successfully
failedWorker exhausted retries or failed permanently

Producer Behavior

JobQueueService should expose typed enqueue methods:

  • enqueueOcrJob(payload, options?)
  • enqueueAiProcessingJob(payload, options?)
  • enqueueIndexingJob(payload, options?)
  • enqueueEmailJob(payload, options?)
  • enqueueNotificationJob(payload, options?)
  • getJobStatus(jobId)

Producer requirements:

  • Validate queue name and job type.
  • Require or generate an idempotency key.
  • Create a background_jobs row with pending status.
  • Submit the BullMQ job with configured attempts and exponential backoff.
  • Save BullMQ job.id in background_jobs.bull_job_id.
  • Return { job_id, status }.
  • Log enqueue failures through appLogger.

Processor Behavior

Each queue must have its own processor.

Processor requirements:

  • Mark job as processing when execution starts.
  • Log start time, end time, duration, queue name, job type, and job ID.
  • Use idempotency checks before applying side effects.
  • Mark job as completed on success.
  • Let BullMQ retry retryable failures.
  • Mark job as failed after max attempts are exhausted.
  • Store safe error summaries only.

API Behavior

Recommended endpoints:

POST /jobs/:queue

  • Validates request body.
  • Enqueues a job through JobQueueService.
  • Returns 202 Accepted.
  • Restricted to internal/admin callers once auth policy is finalized.

Response:

{
  "job_id": "uuid",
  "status": "pending"
}

GET /jobs/:id

  • Returns job status for a known job ID.
  • Restricted to the job owner or admin for user-facing jobs.
  • Returns 404 if job does not exist or requester is not allowed to view it.

Integration Points

Documents:

  • Upload/import flows enqueue OCR and indexing jobs.
  • Controllers/services must not perform OCR or indexing synchronously.

Notifications:

  • notification_queue processor calls NotificationsService.create().
  • NotificationsService remains responsible only for persistence and read/update operations.

Email:

  • email_queue processor calls MailService.
  • Email failures are logged and retried by queue policy.

AI Processing:

  • AI jobs should include cost-control and feature-access checks before enqueueing or before processor execution.
  • Direct AI calls should not be added to controllers as part of FEAT-33.

Worker Runtime

Workers must run independently from the API server.

Suggested scripts:

{
  "start:worker": "nest start --entryFile worker",
  "start:worker:dev": "nest start --watch --entryFile worker"
}

Deployment model:

  • API process: HTTP requests and job enqueueing.
  • Worker process: Redis queue consumption and processor execution.
  • Multiple worker processes may run concurrently with idempotency protections.

Retry, Backoff, and Concurrency

Default queue options:

  • Attempts: 3
  • Backoff: exponential
  • Initial delay: 5000ms
  • Remove completed jobs after retention threshold
  • Retain failed jobs for investigation

OCR and AI queues should use lower concurrency than email/notification queues.

Logging and Security

Every job execution should log:

  • queue name
  • job type
  • job ID
  • status transition
  • start/end time
  • duration
  • attempts made
  • safe error message

Security requirements:

  • Do not expose internal queue payloads to unauthorized users.
  • Protect job endpoints with auth/role guards when exposed.
  • Validate all payloads with DTOs before enqueueing.
  • Do not log secrets, tokens, email credentials, Redis URLs, full AI prompts, or private document content.
  • Add rate limiting before public enqueue endpoints are introduced.

QA Test Scenarios

Scenario IDDescriptionStepsInputExpected Result
FEAT-33-01Enqueue OCR job happy pathSubmit valid OCR enqueue requestValid document ID and idempotency keyReturns 202 with job_id; DB row is pending; BullMQ job is created
FEAT-33-02Worker completes OCR jobStart worker with a pending OCR jobExisting pending OCR jobJob transitions pending -> processing -> completed; duration is logged
FEAT-33-03Invalid queue name rejectedSubmit enqueue request for unknown queueUnsupported queue paramReturns 400; no DB job row or BullMQ job is created
FEAT-33-04Invalid payload rejectedSubmit enqueue request missing required payload fieldsMissing target entity IDReturns 400 validation error; no job is created
FEAT-33-05Redis unavailable on enqueueStop Redis, then enqueue a jobValid payload while Redis is unavailableReturns retryable failure; error is logged without secrets
FEAT-33-06Worker retry on transient failureForce processor dependency to fail once, then recoverJob with attempts remainingJob retries with exponential backoff and eventually completes
FEAT-33-07Max retries exhaustedForce processor to fail on every attemptJob configured with max attemptsJob status becomes failed; safe error summary is stored
FEAT-33-08Idempotency prevents duplicate jobsSubmit same idempotency key twiceSame queue, job type, and idempotency keySecond request returns existing job or conflict behavior; duplicate side effect is not produced
FEAT-33-09Notification queue integrationEnqueue notification jobValid notification payloadProcessor calls NotificationsService.create(); notification is persisted once
FEAT-33-10Email queue integrationEnqueue email jobValid recipient and email metadataProcessor calls MailService; API request is not blocked by SMTP delivery
FEAT-33-11Job status lookupQuery existing job statusValid job IDReturns job status and timestamps without exposing private payload content
FEAT-33-12Unauthorized status lookupQuery another user's job statusValid auth for wrong userReturns 404 or 403 according to finalized auth policy
FEAT-33-13Concurrent workersRun two workers on same queueMultiple pending jobsJobs are distributed safely; no duplicate effects occur
FEAT-33-14Queue-specific concurrencyConfigure low OCR concurrency and enqueue multiple OCR jobsSeveral OCR jobsWorker processes no more than configured concurrency
FEAT-33-15Logging coverageProcess successful and failed jobsOne success, one failureLogs include queue, job ID, status, attempts, and duration

Edge Cases

  • Duplicate enqueue requests: same idempotency key should not create duplicate side effects.
  • Worker crash mid-job: BullMQ should make the job retryable according to stalled-job handling and attempts policy.
  • Redis disconnect: API and worker should fail visibly through logs and health checks without leaking credentials.
  • Redis eviction policy: Redis must use maxmemory-policy noeviction; providers that force cache eviction policies should be treated as development-only for BullMQ.
  • DB status update failure: retry behavior must avoid marking a job completed when durable status was not saved.
  • Poison jobs: malformed but accepted jobs should exhaust retries and become failed, not retry forever.
  • Large payloads: store only identifiers and metadata in queue payloads; keep large file/document content in durable storage.
  • Deleted target entity: if a document/user is deleted before processing, processor logs and marks job failed or completed-noop based on job type.
  • Out-of-order jobs: indexing jobs may run before OCR jobs complete; processors must check current entity state before writing.
  • Multiple workers: all processors must be idempotent because more than one worker process can run concurrently.
  • Sensitive data: payloads and logs must not contain passwords, tokens, raw private document content, or full AI prompts.

Post-launch additions

These additions were made after the initial FEAT-33 merge to support scaling and FEAT-84:

Socket.IO Redis adapter (multi-replica realtime)

Without a Redis adapter, Socket.IO room broadcasts (server.to('course:abc').emit(...)) only reach clients connected to the same Node process. With multiple backend replicas behind a load balancer, the worker emitting an event lands on replica B while the client subscribed to that room is on replica A — the broadcast is silently lost.

backend/src/common/gateway/redis-io.adapter.ts wraps @socket.io/redis-adapter and is wired in main.ts:

const redisIoAdapter = new RedisIoAdapter(app);
await redisIoAdapter.connectToRedis();
app.useWebSocketAdapter(redisIoAdapter);

Two dedicated IORedis connections (pub + sub) are opened, separate from the BullMQ shared connection so pub/sub traffic does not block queue commands. Falls back to the default in-memory adapter when REDIS_URL is unset (single-instance dev only).

UnrecoverableError pattern (no-retry sentinel)

Some failures should NOT be retried — e.g. credit exhaustion mid-pipeline. The standard BullMQ UnrecoverableError thrown from a processor:

  1. Stops the current job immediately.
  2. Marks background_jobs.status = 'failed'.
  3. Does not consume additional attempts_made — the job is terminal after one attempt regardless of the configured max_attempts.

The course-generation processor catches credit-limit BadRequestException from SubscriptionsService.useAiCredit, marks the course failed_insufficient_credits, emits a course_failed socket event, then throws UnrecoverableError. The outer catch handler treats error instanceof UnrecoverableError as terminal — it does NOT take the fail-soft "advance to next step" branch — so no downstream jobs are enqueued.

Daily cleanup cron

QueueHealthService.cleanupStaleJobs() runs at 3:00 AM via @nestjs/schedule's @Cron(CronExpression.EVERY_DAY_AT_3AM):

  • queue.clean(24h, 1000, 'completed' | 'failed' | 'wait' | 'delayed') per queue.
  • Belt-and-suspenders for removeOnComplete / removeOnFail count-based rotation, which only fires on new enqueues. An idle-but-stale queue can otherwise hold a tail of jobs indefinitely.
  • Idempotent: queue.clean is atomic, so duplicate runs across replicas (when the cron fires on multiple instances) just no-op.
  • Audit history in Postgres background_jobs is untouched — only Redis state.

Requires ScheduleModule.forRoot() in AppModule.imports.

Course-generation processor (FEAT-84)

The course_generation_queue runs a self-chaining pipeline:

outline → module_lessons (per module, batched) → module_exam (per module)
       → … → final_exam → mastery_analysis (terminal)

Processor decorator:

@Processor(JobQueueName.COURSE_GENERATION, {
  concurrency: COURSE_GEN_CONCURRENCY,
  limiter: { max: 1, duration: COURSE_GEN_LIMITER_MS },
})
  • COURSE_GEN_CONCURRENCY (default 2) — different courses can advance in parallel; each course's own chain stays sequential because each step self-enqueues its successor.
  • COURSE_GEN_LIMITER_MS (default 2000) — spaces back-to-back Gemini calls. Tune up if the Gemini tier's RPM limit changes.

Idempotency keys for every step (outline:${courseId}, module_lessons:${moduleId}, module_exam:${moduleId}, final_exam:${courseId}, mastery_analysis:${courseId}) prevent duplicates.

Full feature docs: 84-ai-course-generation.


Notes

  • Feature flags: Queue-backed features can be gated later using FEAT-35. The queue infrastructure should be enabled in deployed environments only when Redis is configured.
  • Dependencies: Redis, BullMQ, NestJS backend, Prisma, appLogger, NotificationsService, and MailService.
  • Migration dependency: Add background_jobs through a new migration following the migration convention.
  • Notification dependency: FEAT-40 assumes notification creation is triggered by the queue processor. FEAT-33 provides that queue layer.
  • No frontend dashboard in scope: Admin monitoring UI and Bull Board are future work.
  • No placeholder jobs/data: Do not seed fake jobs. Tests should create and clean up their own records.
  • Known limitation: OCR, AI provider, and search-index implementations are not owned by FEAT-33. FEAT-33 provides queue infrastructure and integration contracts.
  • Build gate: Run pnpm build before PR as required by docs/AGENTS.md.