Queues

Queues let you offload slow or unreliable work — sending emails, processing uploads, generating reports — to a background worker so your HTTP response returns immediately. Pearl's queue system is powered by BullMQ and backed by Redis.

Setup

Register QueueManager in your service provider:

src/providers/AppServiceProvider.ts
import { QueueManager } from '@pearl-framework/pearl'

// Inside register():
this.container.singleton(QueueManager, () =>
  new QueueManager({
    connection: {
      host: process.env.REDIS_HOST ?? 'localhost',
      port: Number(process.env.REDIS_PORT ?? 6379),
    },
    workers: [
      { queue: 'default', concurrency: 5 },
      { queue: 'mail',    concurrency: 2 },
    ],
  })
)

Create a job

Generate one with the CLI:

···
pearl make:job SendWelcomeEmailJob
# → src/jobs/SendWelcomeEmailJob.ts

Important: Job payload must be plain public properties — not constructor arguments. The worker reconstructs jobs using new JobClass() and then restores data via Object.assign. Constructor arguments are lost after serialization.

src/jobs/SendWelcomeEmailJob.ts
import { Job } from '@pearl-framework/pearl'

export class SendWelcomeEmailJob extends Job {
  readonly queue   = 'mail'
  get tries()      { return 3 }
  get retryDelay() { return 2_000 }  // ms — doubles on each retry

  // ✅ Payload as plain properties
  userId!: number

  async handle(): Promise<void> {
    const user = await User.find(db, this.userId)
    if (!user) return
    await mailer.send(new WelcomeMail(user))
  }

  async failed(error: Error): Promise<void> {
    // Called when all retry attempts are exhausted
    console.error(`SendWelcomeEmailJob failed for user ${this.userId}:`, error.message)
  }
}

Dispatch a job

Resolve QueueManager from the container and call dispatch(). The job runs in a background worker — your route handler returns immediately:

···
const queue = app.container.make(QueueManager)

// Dispatch immediately
const job = new SendWelcomeEmailJob()
job.userId = user.id
await queue.dispatch(job)

// Dispatch with a delay (runs in 5 seconds)
await queue.dispatchAfter(job, 5_000)

// Dispatch multiple jobs at once
await queue.dispatchBulk([
  Object.assign(new SendWelcomeEmailJob(), { userId: 1 }),
  Object.assign(new SendWelcomeEmailJob(), { userId: 2 }),
])

Multiple queues

Set readonly queue on a job to route it to a dedicated worker. Use this to isolate slow or high-priority work:

···
// High-priority — 10 concurrent workers
export class ProcessPayment extends Job {
  readonly queue = 'critical'
  paymentId!: number
  async handle() { /* ... */ }
}

// Low-priority — 1 worker
export class GenerateReport extends Job {
  readonly queue = 'reports'
  reportId!: number
  async handle() { /* ... */ }
}
src/providers/AppServiceProvider.ts
workers: [
  { queue: 'default',  concurrency: 5  },
  { queue: 'critical', concurrency: 10 },
  { queue: 'reports',  concurrency: 1  },
]

Custom BullMQ options

Override jobOptions on a job class for full control over BullMQ options — backoff strategy, priority, retention, and more:

···
import type { JobsOptions } from 'bullmq'

export class ProcessReport extends Job {
  readonly queue = 'reports'
  reportId!: number

  get jobOptions(): JobsOptions {
    return {
      attempts:         5,
      backoff:          { type: 'fixed', delay: 30_000 },
      removeOnComplete: true,
      removeOnFail:     false,  // keep failed jobs for inspection
      priority:         10,
    }
  }

  async handle(): Promise<void> {
    await generateReport(this.reportId)
  }
}