Skip to content

Jobs

Jobs are the fundamental unit of work in Monque. This guide explains how jobs work, their lifecycle, and how to work with them effectively.

| Method / Type | Description | |---------------|-------------| | Job | Job document interface | | enqueue() | Create a job with optional scheduling | | now() | Create a job for immediate execution | | schedule() | Create a recurring cron-based job | | getJob() | Retrieve a single job by ID | | getJobs() | Query jobs with filters | | EnqueueOptions | Options for enqueue() | | JobStatus | Status values object |

A job represents a unit of work to be processed asynchronously. Each job has:

  • Name: Identifies which worker should process it
  • Data: The payload containing information needed to complete the work
  • Status: Current state in the lifecycle (pending, processing, completed, failed). Can be referenced via the JobStatus Object from @monque/core.
  • Scheduling: When the job should run (nextRunAt)

For the full interface definition, see the Job API Reference.

stateDiagram-v2
    [*] --> pending: enqueue()
    pending --> processing: worker claims
    pending --> cancelled: cancelJob()
    processing --> completed: success
    processing --> pending: failure (retries left)
    processing --> failed: max retries exceeded
    completed --> pending: recurring job reschedules
    cancelled --> pending: retryJob()

| Status | Description | | ------------ | --------------------------------------------------- | | pending | Job is waiting to be picked up by a worker | | processing | A worker has claimed and is executing the job | | completed | Job finished successfully | | failed | Job permanently failed after exhausting all retries | | cancelled | Job was manually cancelled via cancelJob() |

Use now() for jobs that should run immediately. This is sugar for enqueue() without scheduling options:

import { Monque } from '@monque/core';
import { MongoClient } from 'mongodb';

const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
const db = client.db('myapp');

const monque = new Monque(db);
await monque.initialize();

// Enqueue for immediate execution
const job = await monque.now('send-email', {
  to: 'user@example.com',
  subject: 'Hello!'
});

console.log(job._id); // ObjectId
console.log(job.status); // 'pending'

Use enqueue() with runAt for delayed execution:

// Run in 1 hour
const job = await monque.enqueue('send-reminder', {
  userId: '123',
  message: 'Check back soon!'
}, {
  runAt: new Date(Date.now() + 3600000)
});

Use schedule() for cron-based recurring jobs:

// Every day at midnight
const job = await monque.schedule('0 0 * * *', 'daily-report', {
  reportType: 'sales'
});

Use uniqueKey to prevent duplicate jobs:

// Only one sync job per user at a time
await monque.enqueue('sync-user', { userId: '123' }, {
  uniqueKey: 'sync-user-123'
});

// Attempting to enqueue again returns the existing job
const existing = await monque.enqueue('sync-user', { userId: '123' }, {
  uniqueKey: 'sync-user-123'
});
// existing._id === original job's _id
import { JobStatus } from '@monque/core';

// All pending email jobs
const pendingEmails = await monque.getJobs({
  name: 'send-email',
  status: JobStatus.PENDING
});

// All failed jobs (with pagination)
const failedJobs = await monque.getJobs({
  status: JobStatus.FAILED,
  limit: 50,
  skip: 0
});

// Multiple statuses
const activeJobs = await monque.getJobs({
  status: [JobStatus.PENDING, JobStatus.PROCESSING]
});
import { ObjectId } from 'mongodb';

const job = await monque.getJob(new ObjectId('...'));
if (job) {
  console.log(job.status, job.data);
}

Job data should be reference data, not full documents:

// ✅ Good - Store ID, fetch data in worker
await monque.enqueue('process-order', { orderId: '123' });

// ❌ Bad - Large embedded data
await monque.enqueue('process-order', { 
  order: { /* entire order document */ }
});

Use maxPayloadSize to enforce a hard BSON byte limit on all job payloads. When set, enqueue(), now(), and schedule() measure the payload using BSON.calculateObjectSize() before insertion and throw PayloadTooLargeError if the limit is exceeded.

import { Monque, PayloadTooLargeError } from '@monque/core';

const monque = new Monque(db, {
  maxPayloadSize: 100_000  // 100 KB hard limit
});

try {
  await monque.enqueue('process-report', hugePayload);
} catch (error) {
  if (error instanceof PayloadTooLargeError) {
    console.error(
      `Payload too large: ${error.actualSize} bytes (limit: ${error.maxSize} bytes)`
    );
  }
}

PayloadTooLargeError exposes two properties:

  • actualSize — measured BSON byte size of the payload
  • maxSize — the configured limit

When maxPayloadSize is not set (the default), no validation is performed.

Define interfaces for your job data:

interface ProcessOrderJob {
  orderId: string;
  priority: 'urgent' | 'normal';
}

// Type-safe enqueue
await monque.enqueue<ProcessOrderJob>('process-order', {
  orderId: '123',
  priority: 'urgent'
});

// Type-safe worker
monque.register<ProcessOrderJob>('process-order', async (job) => {
  const { orderId, priority } = job.data; // TypeScript knows the types
});

Design workers to handle duplicate execution safely:

monque.register('charge-payment', async (job) => {
  const { paymentId } = job.data;
  
  // Check if already processed
  const payment = await db.payments.findOne({ _id: paymentId });
  if (payment.status === 'charged') {
    return; // Already done, safe to skip
  }
  
  // Process payment
  await chargePayment(payment);
});

Monque provides built-in job retention to automatically clean up completed and failed jobs. This prevents your database from growing indefinitely.

You can configure retention when initializing Monque:

const monque = new Monque(db, {
  // ... other options
  jobRetention: {
    completed: 24 * 60 * 60 * 1000,    // Keep completed jobs for 24 hours
    failed: 7 * 24 * 60 * 60 * 1000,   // Keep failed jobs for 7 days
    interval: 60 * 60 * 1000           // Run cleanup every hour (default)
  }
});

Monque creates MongoDB indexes during initialize() to keep polling, claiming, deduplication, and recovery queries fast.

The indexes Monque creates are:

  • { status: 1, nextRunAt: 1 } for efficient polling (ready-to-run jobs)
  • { name: 1, uniqueKey: 1 } as a partial unique index for deduplication (only when uniqueKey exists and status is pending/processing)
  • { name: 1, status: 1 } for querying by job type and status
  • { claimedBy: 1, status: 1 } for ownership queries and heartbeat updates
  • { lastHeartbeat: 1, status: 1 } for heartbeat-based monitoring queries
  • { status: 1, nextRunAt: 1, claimedBy: 1 } for the atomic claim query
  • { status: 1, lockedAt: 1, lastHeartbeat: 1 } to support stale recovery queries

You can use the deletion APIs for manual cleanup:

  • deleteJob(jobId): Permanently delete a single job.
  • deleteJobs(filter): Delete multiple jobs matching criteria (name, status, age).
// Delete a specific job
await monque.deleteJob(jobId);

// Delete all failed jobs older than 7 days
await monque.deleteJobs({
  status: JobStatus.FAILED,
  olderThan: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
});

If you need more complex retention logic (e.g., keep important sales reports longer), implement a custom scheduled job that uses the MongoDB driver directly:

import { JobStatus } from '@monque/core';

await monque.schedule('0 3 * * *', 'custom-cleanup', {});

monque.register('custom-cleanup', async () => {
  // Access the jobs collection via your own MongoDB client.
  // Use the same collection name you configured for Monque (default: 'monque_jobs').
  const collection = db.collection('monque_jobs');
  
  const cutoffDate = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 30 days ago
  
  // Delete old completed/failed jobs (never delete 'processing' jobs!)
  await collection.deleteMany({
    status: { $in: [JobStatus.COMPLETED, JobStatus.FAILED] },
    updatedAt: { $lt: cutoffDate },
    // Add custom filter logic here, e.g., exclude important reports
    name: { $ne: 'important-report' }
  });
});