Queues
Queues let you offload slow or unreliable work — sending emails, processing uploads, generating reports — to a background worker so your HTTP response returns immediately. Pearl's queue system is powered by BullMQ and backed by Redis.
Setup
Register QueueManager in your service provider:
import { QueueManager } from '@pearl-framework/pearl'
// Inside register():
this.container.singleton(QueueManager, () =>
new QueueManager({
connection: {
host: process.env.REDIS_HOST ?? 'localhost',
port: Number(process.env.REDIS_PORT ?? 6379),
},
workers: [
{ queue: 'default', concurrency: 5 },
{ queue: 'mail', concurrency: 2 },
],
})
)Create a job
Generate one with the CLI:
pearl make:job SendWelcomeEmailJob
# → src/jobs/SendWelcomeEmailJob.tsImportant: Job payload must be plain public properties — not constructor arguments. The worker reconstructs jobs using new JobClass() and then restores data via Object.assign. Constructor arguments are lost after serialization.
import { Job } from '@pearl-framework/pearl'
export class SendWelcomeEmailJob extends Job {
readonly queue = 'mail'
get tries() { return 3 }
get retryDelay() { return 2_000 } // ms — doubles on each retry
// ✅ Payload as plain properties
userId!: number
async handle(): Promise<void> {
const user = await User.find(db, this.userId)
if (!user) return
await mailer.send(new WelcomeMail(user))
}
async failed(error: Error): Promise<void> {
// Called when all retry attempts are exhausted
console.error(`SendWelcomeEmailJob failed for user ${this.userId}:`, error.message)
}
}Dispatch a job
Resolve QueueManager from the container and call dispatch(). The job runs in a background worker — your route handler returns immediately:
const queue = app.container.make(QueueManager)
// Dispatch immediately
const job = new SendWelcomeEmailJob()
job.userId = user.id
await queue.dispatch(job)
// Dispatch with a delay (runs in 5 seconds)
await queue.dispatchAfter(job, 5_000)
// Dispatch multiple jobs at once
await queue.dispatchBulk([
Object.assign(new SendWelcomeEmailJob(), { userId: 1 }),
Object.assign(new SendWelcomeEmailJob(), { userId: 2 }),
])Multiple queues
Set readonly queue on a job to route it to a dedicated worker. Use this to isolate slow or high-priority work:
// High-priority — 10 concurrent workers
export class ProcessPayment extends Job {
readonly queue = 'critical'
paymentId!: number
async handle() { /* ... */ }
}
// Low-priority — 1 worker
export class GenerateReport extends Job {
readonly queue = 'reports'
reportId!: number
async handle() { /* ... */ }
}workers: [
{ queue: 'default', concurrency: 5 },
{ queue: 'critical', concurrency: 10 },
{ queue: 'reports', concurrency: 1 },
]Custom BullMQ options
Override jobOptions on a job class for full control over BullMQ options — backoff strategy, priority, retention, and more:
import type { JobsOptions } from 'bullmq'
export class ProcessReport extends Job {
readonly queue = 'reports'
reportId!: number
get jobOptions(): JobsOptions {
return {
attempts: 5,
backoff: { type: 'fixed', delay: 30_000 },
removeOnComplete: true,
removeOnFail: false, // keep failed jobs for inspection
priority: 10,
}
}
async handle(): Promise<void> {
await generateReport(this.reportId)
}
}