Skip to content

Quick Start

This guide will walk you through creating a working job queue in under 5 minutes.

A simple email notification system that:

  • Accepts email jobs via enqueue()
  • Processes them asynchronously with a worker
  • Handles failures with automatic retries
  • Provides observability via events
import { MongoClient } from 'mongodb';
import { Monque } from '@monque/core';

// Connect to MongoDB
const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
const db = client.db('myapp');

// Create Monque instance
const monque = new Monque(db);
await monque.initialize();

TypeScript generics ensure type safety for your job payloads:

interface EmailJobData {
  to: string;
  subject: string;
  body: string;
  priority?: 'high' | 'normal' | 'low';
}

Workers process jobs of a specific type:

monque.register<EmailJobData>('send-email', async (job) => {
  console.log(`📧 Sending email to ${job.data.to}`);
  
  // Your email sending logic here
  await sendEmail(job.data.to, job.data.subject, job.data.body);
  
  console.log(`✅ Email sent successfully`);
});

Monitor job lifecycle for logging and metrics:

monque.on('job:start', (job) => {
  console.log(`🚀 Starting job: ${job.name} (${job._id})`);
});

monque.on('job:complete', ({ job, duration }) => {
  console.log(`✅ Completed: ${job.name} in ${duration}ms`);
});

monque.on('job:fail', ({ job, error, willRetry }) => {
  console.log(`❌ Failed: ${job.name} - ${error.message}`);
  if (willRetry) {
    console.log(`🔄 Will retry (attempt ${job.failCount + 1})`);
  }
});
monque.start();
console.log('🎯 Monque scheduler started');

Now you can enqueue jobs from anywhere in your application:

// Simple immediate job
await monque.now('send-email', {
  to: 'user@example.com',
  subject: 'Welcome!',
  body: 'Thanks for signing up.'
});

// With options
await monque.enqueue('send-email', {
  to: 'vip@example.com',
  subject: 'Exclusive Offer',
  body: 'Special deal just for you!',
  priority: 'high'
}, {
  // Prevent duplicates
  uniqueKey: 'welcome-email-vip@example.com',
  // Schedule for later
  runAt: new Date(Date.now() + 60000) // 1 minute from now
});

Ensure in-progress jobs complete before your application exits:

process.on('SIGTERM', async () => {
  console.log('🛑 Shutting down...');
  await monque.stop(); // Waits for in-progress jobs
  await client.close();
  console.log('👋 Goodbye!');
  process.exit(0);
});

If stop() hits the configured shutdownTimeout, in-progress jobs are left in processing so stale job recovery can safely re-claim them later.

Here’s the full working example:

import { MongoClient } from 'mongodb';
import { Monque } from '@monque/core';

// Types
interface EmailJobData {
  to: string;
  subject: string;
  body: string;
}

async function main() {
  // Setup
  const client = new MongoClient('mongodb://localhost:27017');
  await client.connect();
  const db = client.db('myapp');
  
  const monque = new Monque(db);
  await monque.initialize();

  // Register worker
  monque.register<EmailJobData>('send-email', async (job) => {
    console.log(`📧 Sending to ${job.data.to}: ${job.data.subject}`);
    // Simulate sending email
    await new Promise(resolve => setTimeout(resolve, 100));
  });

  // Events
  monque.on('job:complete', ({ job, duration }) => {
    console.log(`✅ ${job.name} completed in ${duration}ms`);
  });

  // Start processing
  monque.start();

  // Enqueue some jobs
  await monque.enqueue('send-email', {
    to: 'alice@example.com',
    subject: 'Hello Alice',
    body: 'Welcome to our platform!'
  });

  await monque.enqueue('send-email', {
    to: 'bob@example.com',
    subject: 'Hello Bob',
    body: 'Welcome to our platform!'
  });

  // Graceful shutdown
  process.on('SIGTERM', async () => {
    await monque.stop();
    await client.close();
    process.exit(0);
  });

  console.log('🎯 Scheduler running. Press Ctrl+C to stop.');
}

main().catch(console.error);

For scheduled tasks, use cron expressions:

// Run every hour at minute 0
await monque.schedule('0 * * * *', 'hourly-cleanup', {
  type: 'temp-files',
  maxAge: 3600
});

// Daily at midnight
await monque.schedule('0 0 * * *', 'daily-report', {
  reportType: 'sales'
});

// Every 5 minutes
await monque.schedule('*/5 * * * *', 'health-check', {
  endpoints: ['api', 'database', 'cache']
});

Customize Monque behavior with options:

For the full list of options and defaults, see the MonqueOptions API Reference.

const monque = new Monque(db, {
  collectionName: 'jobs',
  pollInterval: 1000,
  maxRetries: 10,
  baseRetryInterval: 1000,
  shutdownTimeout: 30000,
  defaultConcurrency: 5,
  lockTimeout: 1800000,
  heartbeatInterval: 30000,
});