Skip to content

Jobs

Jobs are the fundamental unit of work in Monque. This guide explains how jobs work, their lifecycle, and how to work with them effectively.

Method / TypeDescription
JobJob document interface
enqueue()Create a job with optional scheduling
now()Create a job for immediate execution
schedule()Create a recurring cron-based job
getJob()Retrieve a single job by ID
getJobs()Query jobs with filters
EnqueueOptionsOptions for enqueue()
JobStatusStatus values object

A job represents a unit of work to be processed asynchronously. Each job has:

  • Name: Identifies which worker should process it
  • Data: The payload containing information needed to complete the work
  • Status: Current state in the lifecycle (pending, processing, completed, failed). Can be referenced via the JobStatus Object from @monque/core.
  • Scheduling: When the job should run (nextRunAt)

For the full interface definition, see the Job API Reference.

stateDiagram-v2
    [*] --> pending: enqueue()
    pending --> processing: worker claims
    pending --> cancelled: cancelJob()
    processing --> completed: success
    processing --> pending: failure (retries left)
    processing --> failed: max retries exceeded
    completed --> pending: recurring job reschedules
    cancelled --> pending: retryJob()
StatusDescription
pendingJob is waiting to be picked up by a worker
processingA worker has claimed and is executing the job
completedJob finished successfully
failedJob permanently failed after exhausting all retries
cancelledJob was manually cancelled via cancelJob()

Use now() for jobs that should run immediately. This is sugar for enqueue() without scheduling options:

import { Monque } from '@monque/core';
import { MongoClient } from 'mongodb';

const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
const db = client.db('myapp');

const monque = new Monque(db);
await monque.initialize();

// Enqueue for immediate execution
const job = await monque.now('send-email', {
  to: 'user@example.com',
  subject: 'Hello!'
});

console.log(job._id); // ObjectId
console.log(job.status); // 'pending'

Use enqueue() with runAt for delayed execution:

// Run in 1 hour
const job = await monque.enqueue('send-reminder', {
  userId: '123',
  message: 'Check back soon!'
}, {
  runAt: new Date(Date.now() + 3600000)
});

Use schedule() for cron-based recurring jobs:

// Every day at midnight
const job = await monque.schedule('0 0 * * *', 'daily-report', {
  reportType: 'sales'
});

Use uniqueKey to prevent duplicate jobs:

// Only one sync job per user at a time
await monque.enqueue('sync-user', { userId: '123' }, {
  uniqueKey: 'sync-user-123'
});

// Attempting to enqueue again returns the existing job
const existing = await monque.enqueue('sync-user', { userId: '123' }, {
  uniqueKey: 'sync-user-123'
});
// existing._id === original job's _id
import { JobStatus } from '@monque/core';

// All pending email jobs
const pendingEmails = await monque.getJobs({
  name: 'send-email',
  status: JobStatus.PENDING
});

// All failed jobs (with pagination)
const failedJobs = await monque.getJobs({
  status: JobStatus.FAILED,
  limit: 50,
  skip: 0
});

// Multiple statuses
const activeJobs = await monque.getJobs({
  status: [JobStatus.PENDING, JobStatus.PROCESSING]
});
import { ObjectId } from 'mongodb';

const job = await monque.getJob(new ObjectId('...'));
if (job) {
  console.log(job.status, job.data);
}

Job data should be reference data, not full documents:

// ✅ Good - Store ID, fetch data in worker
await monque.enqueue('process-order', { orderId: '123' });

// ❌ Bad - Large embedded data
await monque.enqueue('process-order', { 
  order: { /* entire order document */ }
});

Define interfaces for your job data:

interface ProcessOrderJob {
  orderId: string;
  priority: 'urgent' | 'normal';
}

// Type-safe enqueue
await monque.enqueue<ProcessOrderJob>('process-order', {
  orderId: '123',
  priority: 'urgent'
});

// Type-safe worker
monque.register<ProcessOrderJob>('process-order', async (job) => {
  const { orderId, priority } = job.data; // TypeScript knows the types
});

Design workers to handle duplicate execution safely:

monque.register('charge-payment', async (job) => {
  const { paymentId } = job.data;
  
  // Check if already processed
  const payment = await db.payments.findOne({ _id: paymentId });
  if (payment.status === 'charged') {
    return; // Already done, safe to skip
  }
  
  // Process payment
  await chargePayment(payment);
});

Monque provides built-in job retention to automatically clean up completed and failed jobs. This prevents your database from growing indefinitely.

You can configure retention when initializing Monque:

const monque = new Monque(db, {
  // ... other options
  jobRetention: {
    completed: 24 * 60 * 60 * 1000,    // Keep completed jobs for 24 hours
    failed: 7 * 24 * 60 * 60 * 1000,   // Keep failed jobs for 7 days
    interval: 60 * 60 * 1000           // Run cleanup every hour (default)
  }
});

Monque creates MongoDB indexes during initialize() to keep polling, claiming, deduplication, and recovery queries fast.

The indexes Monque creates are:

  • { status: 1, nextRunAt: 1 } for efficient polling (ready-to-run jobs)
  • { name: 1, uniqueKey: 1 } as a partial unique index for deduplication (only when uniqueKey exists and status is pending/processing)
  • { name: 1, status: 1 } for querying by job type and status
  • { claimedBy: 1, status: 1 } for ownership queries and heartbeat updates
  • { lastHeartbeat: 1, status: 1 } for heartbeat-based monitoring queries
  • { status: 1, nextRunAt: 1, claimedBy: 1 } for the atomic claim query
  • { status: 1, lockedAt: 1, lastHeartbeat: 1 } to support stale recovery queries

You can use the deletion APIs for manual cleanup:

  • deleteJob(jobId): Permanently delete a single job.
  • deleteJobs(filter): Delete multiple jobs matching criteria (name, status, age).
// Delete a specific job
await monque.deleteJob(jobId);

// Delete all failed jobs older than 7 days
await monque.deleteJobs({
  status: JobStatus.FAILED,
  olderThan: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
});

If you need more complex retention logic (e.g., keep important sales reports longer), implement a custom scheduled job that uses the MongoDB driver directly:

import { JobStatus } from '@monque/core';

await monque.schedule('0 3 * * *', 'custom-cleanup', {});

monque.register('custom-cleanup', async () => {
  // Access the jobs collection via your own MongoDB client.
  // Use the same collection name you configured for Monque (default: 'monque_jobs').
  const collection = db.collection('monque_jobs');
  
  const cutoffDate = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); // 30 days ago
  
  // Delete old completed/failed jobs (never delete 'processing' jobs!)
  await collection.deleteMany({
    status: { $in: [JobStatus.COMPLETED, JobStatus.FAILED] },
    updatedAt: { $lt: cutoffDate },
    // Add custom filter logic here, e.g., exclude important reports
    name: { $ne: 'important-report' }
  });
});