Skip to content

Production Checklist

Before deploying Monque to production, review this checklist to avoid common pitfalls and ensure reliable job processing.

TopicKey Point
IdempotencyWorkers must handle duplicate execution safely
DeduplicationUse uniqueKey to prevent duplicate jobs
Replica SetRequired for Change Streams
ShutdownConfigure shutdownTimeout appropriately
Lock TimeoutSet lockTimeout to 2× longest job duration

Jobs may be executed more than once due to:

  • Retries: Failed jobs are automatically retried with exponential backoff
  • Stale Recovery: If a worker crashes, the heartbeat mechanism recovers stuck jobs
  • Network Issues: Transient failures during completion acknowledgment
import { Monque } from '@monque/core';

// ✅ Good: Check state before mutating
monque.register('charge-payment', async (job) => {
  const { paymentId } = job.data;
  
  const payment = await db.payments.findOne({ _id: paymentId });
  if (payment.status === 'charged') {
    return; // Already processed, skip safely
  }
  
  await chargePayment(payment);
  await db.payments.updateOne(
    { _id: paymentId },
    { $set: { status: 'charged' } }
  );
});

// ✅ Good: Use database transactions for atomic operations
monque.register('transfer-funds', async (job) => {
  const { fromAccount, toAccount, amount, transferId } = job.data;
  
  // Use transferId as idempotency key
  const existing = await db.transfers.findOne({ transferId });
  if (existing) return;
  
  const session = client.startSession();
  await session.withTransaction(async () => {
    await db.accounts.updateOne({ _id: fromAccount }, { $inc: { balance: -amount } }, { session });
    await db.accounts.updateOne({ _id: toAccount }, { $inc: { balance: amount } }, { session });
    await db.transfers.insertOne({ transferId, status: 'completed' }, { session });
  });
});

See Idempotent Operations for more patterns.


Prevent duplicate jobs from being enqueued using uniqueKey:

// Only one sync job per user at a time
await monque.enqueue('sync-user', { userId: '123' }, {
  uniqueKey: 'sync-user-123'
});
// ✅ Safe for app restarts and scaling
await monque.schedule('0 * * * *', 'hourly-report', {}, {
  uniqueKey: 'hourly-report-singleton'
});

Without uniqueKey, calling schedule() multiple times (e.g., on each app restart) creates duplicate scheduled jobs.


Without a replica set:

  • Monque falls back to polling only
  • Job pickup latency increases to pollInterval (default: 1000ms)
  • Real-time job notifications are unavailable

Use Docker Compose for local development:

# docker-compose.yml
services:
  mongo:
    image: mongo:7
    command: ["--replSet", "rs0", "--bind_ip_all"]
    ports:
      - "27017:27017"

Initialize the replica set:

docker exec -it <container> mongosh --eval "rs.initiate()"

Ensure in-progress jobs complete before your application exits:

const monque = new Monque(db, {
  shutdownTimeout: 30000 // Wait up to 30s for jobs to finish
});

process.on('SIGTERM', async () => {
  console.log('Shutting down...');
  await monque.stop(); // Waits for in-progress jobs
  await client.close();
  process.exit(0);
});

Set terminationGracePeriodSeconds higher than shutdownTimeout:

spec:
  terminationGracePeriodSeconds: 45  # > shutdownTimeout (30s)
  containers:
    - name: worker
      lifecycle:
        preStop:
          exec:
            command: ["sh", "-c", "sleep 5"]

The lockTimeout determines how long a job can remain in processing before it’s considered stale and recovered.

const monque = new Monque(db, {
  lockTimeout: 3600000,      // 1 hour (for jobs up to 30 min)
  heartbeatInterval: 30000   // Update heartbeat every 30s
});
SymptomLikely Cause
Jobs running twicelockTimeout too short
Jobs stuck in processingWorker crashed, recoverStaleJobs: false
Slow failure detectionlockTimeout too long

Configure workerConcurrency and per-worker concurrency based on your workload:

const monque = new Monque(db, {
  workerConcurrency: 5
});

// CPU-bound: low concurrency
monque.register('video-transcode', handler, { concurrency: 2 });

// I/O-bound: higher concurrency
monque.register('api-sync', handler, { concurrency: 10 });

// External API with rate limits: match the limit
monque.register('stripe-webhook', handler, { concurrency: 5 });

Use instanceConcurrency to limit the total jobs processed by a single instance across all workers:

const monque = new Monque(db, {
  instanceConcurrency: 20,      // Instance-wide limit
  workerConcurrency: 10         // Per-worker default
});

This prevents a single instance from overwhelming system resources when running multiple high-concurrency workers.

See Concurrency Strategy for guidelines.


7. Consider Producer-Consumer Architecture

Section titled “7. Consider Producer-Consumer Architecture”

For high-scale deployments, separate job producers (API servers) from consumers (worker servers):

  • Independent Scaling: Scale API and worker tiers separately based on load
  • Resource Isolation: API response times unaffected by background job processing
  • Deployment Flexibility: Deploy worker updates without touching API servers

Ts.ED Integration:

// API Server (Producer-only)
@Configuration({
  monque: {
    dbFactory: async () => client.db('myapp'),
    disableJobProcessing: true  // Only enqueue, don't process
  }
})
export class ApiServer {}

// Worker Server (Consumer)
@Configuration({
  monque: {
    dbFactory: async () => client.db('myapp'),
    instanceConcurrency: 20  // Limit total concurrent jobs
  }
})
export class WorkerServer {}

Core Package:

// Producer: Initialize but don't start
const producer = new Monque(db);
await producer.initialize();
// Don't call start() - just use enqueue()

// Consumer: Full lifecycle
const consumer = new Monque(db, { instanceConcurrency: 20 });
consumer.register('my-job', handler);
await consumer.initialize();
await consumer.start();

See Ts.ED Producer-only Mode for details.


Subscribe to events for observability:

// Track failures
monque.on('job:fail', ({ job, error, willRetry }) => {
  logger.error('Job failed', {
    jobId: job._id,
    name: job.name,
    error: error.message,
    willRetry,
    failCount: job.failCount
  });
  
  if (!willRetry) {
    alerting.notify(`Job ${job.name} permanently failed`);
  }
});

// Monitor stale recovery
monque.on('stale:recovered', ({ count }) => {
  if (count > 0) {
    metrics.increment('monque.stale_jobs_recovered', count);
    logger.warn(`Recovered ${count} stale jobs`);
  }
});

// Track Change Stream health
monque.on('changestream:fallback', ({ reason }) => {
  logger.warn('Change Streams unavailable, using polling', { reason });
});

See MonqueEventMap for all available events.


Prevent unbounded database growth by configuring retention:

const monque = new Monque(db, {
  jobRetention: {
    completed: 24 * 60 * 60 * 1000,    // 24 hours
    failed: 7 * 24 * 60 * 60 * 1000,   // 7 days
    interval: 60 * 60 * 1000           // Cleanup every hour
  }
});

Monque creates MongoDB indexes during initialize(). Your database user must have index creation permissions.

If index creation is restricted in production, create indexes ahead of time:

// Required indexes (see Jobs documentation for full list)
db.monque_jobs.createIndex({ status: 1, nextRunAt: 1 });
db.monque_jobs.createIndex({ name: 1, status: 1 });
db.monque_jobs.createIndex({ claimedBy: 1, status: 1 });
db.monque_jobs.createIndex({ status: 1, nextRunAt: 1, claimedBy: 1 });
// ... see Jobs docs for complete list

See Indexes for the complete list.


  • Workers are idempotent (safe for duplicate execution)
  • Scheduled jobs use uniqueKey
  • MongoDB is a replica set (for Change Streams)
  • SIGTERM handler calls monque.stop()
  • lockTimeout is 2× longest job duration
  • Concurrency matches workload characteristics
  • instanceConcurrency configured if running multiple workers
  • Producer-consumer separation considered for high-scale deployments
  • Event handlers for job:fail and stale:recovered
  • Job retention is configured
  • Database user has index creation permissions (or indexes pre-created)