Skip to content

Production Checklist

Before deploying Monque to production, review this checklist to avoid common pitfalls and ensure reliable job processing.

| Topic | Key Point | |-------|-----------| | Idempotency | Workers must handle duplicate execution safely | | Deduplication | Use uniqueKey to prevent duplicate jobs | | Replica Set | Required for Change Streams | | Shutdown | Configure shutdownTimeout appropriately | | Lock Timeout | Set lockTimeout to 2× longest job duration |


Jobs may be executed more than once due to:

  • Retries: Failed jobs are automatically retried with exponential backoff
  • Stale Recovery: If a worker crashes, the heartbeat mechanism recovers stuck jobs
  • Network Issues: Transient failures during completion acknowledgment
import { Monque } from '@monque/core';

// ✅ Good: Check state before mutating
monque.register('charge-payment', async (job) => {
  const { paymentId } = job.data;
  
  const payment = await db.payments.findOne({ _id: paymentId });
  if (payment.status === 'charged') {
    return; // Already processed, skip safely
  }
  
  await chargePayment(payment);
  await db.payments.updateOne(
    { _id: paymentId },
    { $set: { status: 'charged' } }
  );
});

// ✅ Good: Use database transactions for atomic operations
monque.register('transfer-funds', async (job) => {
  const { fromAccount, toAccount, amount, transferId } = job.data;
  
  // Use transferId as idempotency key
  const existing = await db.transfers.findOne({ transferId });
  if (existing) return;
  
  const session = client.startSession();
  await session.withTransaction(async () => {
    await db.accounts.updateOne({ _id: fromAccount }, { $inc: { balance: -amount } }, { session });
    await db.accounts.updateOne({ _id: toAccount }, { $inc: { balance: amount } }, { session });
    await db.transfers.insertOne({ transferId, status: 'completed' }, { session });
  });
});

See Idempotent Operations for more patterns.


Prevent duplicate jobs from being enqueued using uniqueKey:

// Only one sync job per user at a time
await monque.enqueue('sync-user', { userId: '123' }, {
  uniqueKey: 'sync-user-123'
});
// ✅ Safe for app restarts and scaling
await monque.schedule('0 * * * *', 'hourly-report', {}, {
  uniqueKey: 'hourly-report-singleton'
});

Without uniqueKey, calling schedule() multiple times (e.g., on each app restart) creates duplicate scheduled jobs.


Without a replica set:

  • Monque falls back to polling only
  • Job pickup latency increases to pollInterval (default: 1000ms)
  • Real-time job notifications are unavailable

Use Docker Compose for local development:

# docker-compose.yml
services:
  mongo:
    image: mongo:7
    command: ["--replSet", "rs0", "--bind_ip_all"]
    ports:
      - "27017:27017"

Initialize the replica set:

docker exec -it <container> mongosh --eval "rs.initiate()"

Ensure in-progress jobs complete before your application exits:

const monque = new Monque(db, {
  shutdownTimeout: 30000 // Wait up to 30s for jobs to finish
});

process.on('SIGTERM', async () => {
  console.log('Shutting down...');
  await monque.stop(); // Waits for in-progress jobs
  await client.close();
  process.exit(0);
});

Set terminationGracePeriodSeconds higher than shutdownTimeout:

spec:
  terminationGracePeriodSeconds: 45  # > shutdownTimeout (30s)
  containers:
    - name: worker
      lifecycle:
        preStop:
          exec:
            command: ["sh", "-c", "sleep 5"]

The lockTimeout determines how long a job can remain in processing before it’s considered stale and recovered.

const monque = new Monque(db, {
  lockTimeout: 3600000,      // 1 hour (for jobs up to 30 min)
  heartbeatInterval: 30000   // Update heartbeat every 30s
});

| Symptom | Likely Cause | |---------|--------------| | Jobs running twice | lockTimeout too short | | Jobs stuck in processing | Worker crashed, recoverStaleJobs: false | | Slow failure detection | lockTimeout too long |


Configure workerConcurrency and per-worker concurrency based on your workload:

const monque = new Monque(db, {
  workerConcurrency: 5
});

// CPU-bound: low concurrency
monque.register('video-transcode', handler, { concurrency: 2 });

// I/O-bound: higher concurrency
monque.register('api-sync', handler, { concurrency: 10 });

// External API with rate limits: match the limit
monque.register('stripe-webhook', handler, { concurrency: 5 });

Use instanceConcurrency to limit the total jobs processed by a single instance across all workers:

const monque = new Monque(db, {
  instanceConcurrency: 20,      // Instance-wide limit
  workerConcurrency: 10         // Per-worker default
});

This prevents a single instance from overwhelming system resources when running multiple high-concurrency workers.

See Concurrency Strategy for guidelines.


7. Consider Producer-Consumer Architecture

Section titled “7. Consider Producer-Consumer Architecture”

For high-scale deployments, separate job producers (API servers) from consumers (worker servers):

  • Independent Scaling: Scale API and worker tiers separately based on load
  • Resource Isolation: API response times unaffected by background job processing
  • Deployment Flexibility: Deploy worker updates without touching API servers

Ts.ED Integration:

// API Server (Producer-only)
@Configuration({
  monque: {
    dbFactory: async () => client.db('myapp'),
    disableJobProcessing: true  // Only enqueue, don't process
  }
})
export class ApiServer {}

// Worker Server (Consumer)
@Configuration({
  monque: {
    dbFactory: async () => client.db('myapp'),
    instanceConcurrency: 20  // Limit total concurrent jobs
  }
})
export class WorkerServer {}

Core Package:

// Producer: Initialize but don't start
const producer = new Monque(db);
await producer.initialize();
// Don't call start() - just use enqueue()

// Consumer: Full lifecycle
const consumer = new Monque(db, { instanceConcurrency: 20 });
consumer.register('my-job', handler);
await consumer.initialize();
await consumer.start();

See Ts.ED Producer-only Mode for details.


Subscribe to events for observability:

// Track failures
monque.on('job:fail', ({ job, error, willRetry }) => {
  logger.error('Job failed', {
    jobId: job._id,
    name: job.name,
    error: error.message,
    willRetry,
    failCount: job.failCount
  });
  
  if (!willRetry) {
    alerting.notify(`Job ${job.name} permanently failed`);
  }
});

// Monitor stale recovery
monque.on('stale:recovered', ({ count }) => {
  if (count > 0) {
    metrics.increment('monque.stale_jobs_recovered', count);
    logger.warn(`Recovered ${count} stale jobs`);
  }
});

// Track Change Stream health
monque.on('changestream:fallback', ({ reason }) => {
  logger.warn('Change Streams unavailable, using polling', { reason });
});

See MonqueEventMap for all available events.


Prevent unbounded database growth by configuring retention:

const monque = new Monque(db, {
  jobRetention: {
    completed: 24 * 60 * 60 * 1000,    // 24 hours
    failed: 7 * 24 * 60 * 60 * 1000,   // 7 days
    interval: 60 * 60 * 1000           // Cleanup every hour
  }
});

Monque creates MongoDB indexes during initialize(). Your database user must have index creation permissions.

If index creation is restricted in production, create indexes ahead of time:

// Required indexes (see Jobs documentation for full list)
db.monque_jobs.createIndex({ status: 1, nextRunAt: 1 });
db.monque_jobs.createIndex({ name: 1, status: 1 });
db.monque_jobs.createIndex({ claimedBy: 1, status: 1 });
db.monque_jobs.createIndex({ status: 1, nextRunAt: 1, claimedBy: 1 });
// ... see Jobs docs for complete list

See Indexes for the complete list.


  • [ ] Workers are idempotent (safe for duplicate execution)
  • [ ] Scheduled jobs use uniqueKey
  • [ ] MongoDB is a replica set (for Change Streams)
  • [ ] SIGTERM handler calls monque.stop()
  • [ ] lockTimeout is 2× longest job duration
  • [ ] Concurrency matches workload characteristics
  • [ ] instanceConcurrency configured if running multiple workers
  • [ ] Producer-consumer separation considered for high-scale deployments
  • [ ] Event handlers for job:fail and stale:recovered
  • [ ] Job retention is configured
  • [ ] Database user has index creation permissions (or indexes pre-created)