Skip to content

Workers

Workers are the execution engines of Monque. They pick up jobs and execute your business logic.

Method / TypeDescription
register()Register a job handler
WorkerOptionsWorker configuration (concurrency, replace)
start()Start processing jobs
stop()Graceful shutdown
now()Enqueue for immediate execution
MonqueOptions.workerConcurrencyGlobal concurrency default

Use the register() method to register a handler for a specific job type:

monque.register('job-name', async (job) => {
  // Your processing logic
});

Define the expected data type for better IDE support and compile-time checks:

interface ImageProcessJob {
  imageUrl: string;
  width: number;
  height: number;
  format: 'jpeg' | 'png' | 'webp';
}

monque.register<ImageProcessJob>('process-image', async (job) => {
  const { imageUrl, width, height, format } = job.data;
  // TypeScript knows all property types
  await processImage(imageUrl, { width, height, format });
});

Configure worker-specific settings via WorkerOptions:

monque.register('heavy-computation', async (job) => {
  await compute(job.data);
}, {
  concurrency: 2 // Only 2 concurrent jobs (overrides workerConcurrency)
});
OptionTypeDefaultDescription
concurrencynumber5 (from workerConcurrency)Maximum concurrent jobs for this worker
replacebooleanfalseReplace an existing worker with the same name instead of throwing

When start() is called, Monque begins looking for jobs:

  1. Polling runs every pollInterval milliseconds
  2. Change Streams (if available) trigger an additional, debounced poll for low-latency processing

Jobs are claimed atomically using findOneAndUpdate:

// Simplified internal logic
const job = await collection.findOneAndUpdate(
  {
    name: workerName,
    status: 'pending',
    nextRunAt: { $lte: new Date() },
    $or: [{ claimedBy: null }, { claimedBy: { $exists: false } }]
  },
  {
    $set: {
      status: 'processing',
      claimedBy: schedulerInstanceId,
      lockedAt: new Date(),
      lastHeartbeat: new Date(),
      heartbeatInterval: 30000,
      updatedAt: new Date()
    }
  }
);

This prevents two scheduler instances from claiming the same pending job at the same time.

Your handler function is called with the full job document:

monque.register('example', async (job) => {
  console.log(job._id);      // ObjectId
  console.log(job.name);     // 'example'
  console.log(job.data);     // Your payload
  console.log(job.failCount); // Previous failures
});
  • Success: Job status set to completed
  • Error thrown: Job retried or set to failed (see Retry & Backoff)

If your handler throws or returns a rejected promise, the job is automatically retried:

monque.register('flaky-api', async (job) => {
  const response = await fetch(job.data.apiUrl);
  if (!response.ok) {
    throw new Error(`API returned ${response.status}`);
  }
  // Job will be retried with exponential backoff
});

To control retry behavior for expected failures, you can check job.failCount or throw MonqueError. Note that throwing any error inside a monque.register handler will trigger the retry policy by default.

monque.register('check-external-resource', async (job) => {
  // Check if resource exists before retrying expensive operations
  if (job.failCount > 0) {
    const exists = await checkResourceExists(job.data.resourceId);
    if (!exists) {
      console.error('Resource deleted, skipping retry');
      return; // Skip retry for deleted resources
    }
  }

  await processResource(job.data.resourceId);
});

Set in MonqueOptions:

const monque = new Monque(db, {
  workerConcurrency: 10 // All workers default to 10 concurrent jobs
});
// Low-priority background tasks: higher concurrency
monque.register('log-analytics', handler, { concurrency: 20 });

// Resource-intensive tasks: lower concurrency
monque.register('video-transcode', handler, { concurrency: 2 });

// External API calls: match rate limits
monque.register('api-sync', handler, { concurrency: 5 });

Use instanceConcurrency to limit the total number of jobs processed by a single scheduler instance across all workers:

const monque = new Monque(db, {
  instanceConcurrency: 10,   // Instance processes max 10 jobs total
  workerConcurrency: 5       // Each worker defaults to 5 concurrent jobs
});

// With 3 workers at concurrency 5, normally 15 jobs could run simultaneously.
// With instanceConcurrency: 10, only 10 jobs run at any time across all workers.
monque.register('emails', emailHandler);      // Up to 5 concurrent
monque.register('notifications', notifyHandler); // Up to 5 concurrent
monque.register('webhooks', webhookHandler);  // Up to 5 concurrent
// But total across all three never exceeds 10
ScenarioRecommended Concurrency
CPU-bound tasks1-2 per core
I/O-bound tasks (database, API)5-20
Memory-intensive tasksLimited by available RAM
External API with rate limitsMatch API limits

Register multiple workers for different job types:

// Email processing
monque.register<EmailJob>('send-email', async (job) => {
  await sendEmail(job.data);
});

// Image processing
monque.register<ImageJob>('resize-image', async (job) => {
  await resizeImage(job.data);
});

// Notification dispatch
monque.register<NotificationJob>('push-notification', async (job) => {
  await sendPushNotification(job.data);
});

Workers can be registered before or after start():

// Register all workers first
monque.register('job-a', handlerA);
monque.register('job-b', handlerB);

// Then start processing
monque.start();

Each worker should do one thing well:

// ✅ Good - Single responsibility
monque.register('send-email', sendEmailHandler);
monque.register('update-stats', updateStatsHandler);

// ❌ Bad - Too much in one handler
monque.register('process-order', async (job) => {
  await sendConfirmationEmail();
  await updateInventory();
  await notifyShipping();
  await updateAnalytics();
});

Design for idempotency:

monque.register('sync-data', async (job) => {
  const { batchId } = job.data;
  
  // Track progress in external state
  const progress = await getProgress(batchId);
  
  // Resume from where we left off
  for (const item of progress.remaining) {
    await processItem(item);
    await updateProgress(batchId, item);
  }
});

Use job events for centralized logging:

monque.on('job:start', (job) => {
  logger.info('Job started', { jobId: job._id, name: job.name });
});

monque.on('job:complete', ({ job, duration }) => {
  logger.info('Job completed', { 
    jobId: job._id, 
    name: job.name, 
    duration 
  });
});

monque.on('job:fail', ({ job, error, willRetry }) => {
  logger.error('Job failed', {
    jobId: job._id,
    name: job.name,
    error: error.message,
    willRetry,
    failCount: job.failCount
  });
});