This guide will walk you through creating a working job queue in under 5 minutes.
A simple email notification system that:
- Accepts email jobs via
enqueue()
- Processes them asynchronously with a worker
- Handles failures with automatic retries
- Provides observability via events
import { MongoClient } from 'mongodb';
import { Monque } from '@monque/core';
// Connect to MongoDB
const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
const db = client.db('myapp');
// Create Monque instance
const monque = new Monque(db);
await monque.initialize();
TypeScript generics ensure type safety for your job payloads:
interface EmailJobData {
to: string;
subject: string;
body: string;
priority?: 'high' | 'normal' | 'low';
}
Workers process jobs of a specific type:
monque.register<EmailJobData>('send-email', async (job) => {
console.log(`📧 Sending email to ${job.data.to}`);
// Your email sending logic here
await sendEmail(job.data.to, job.data.subject, job.data.body);
console.log(`✅ Email sent successfully`);
});
Monitor job lifecycle for logging and metrics:
monque.on('job:start', (job) => {
console.log(`🚀 Starting job: ${job.name} (${job._id})`);
});
monque.on('job:complete', ({ job, duration }) => {
console.log(`✅ Completed: ${job.name} in ${duration}ms`);
});
monque.on('job:fail', ({ job, error, willRetry }) => {
console.log(`❌ Failed: ${job.name} - ${error.message}`);
if (willRetry) {
console.log(`🔄 Will retry (attempt ${job.failCount + 1})`);
}
});
monque.start();
console.log('🎯 Monque scheduler started');
Now you can enqueue jobs from anywhere in your application:
// Simple immediate job
await monque.now('send-email', {
to: 'user@example.com',
subject: 'Welcome!',
body: 'Thanks for signing up.'
});
// With options
await monque.enqueue('send-email', {
to: 'vip@example.com',
subject: 'Exclusive Offer',
body: 'Special deal just for you!',
priority: 'high'
}, {
// Prevent duplicates
uniqueKey: 'welcome-email-vip@example.com',
// Schedule for later
runAt: new Date(Date.now() + 60000) // 1 minute from now
});
Ensure in-progress jobs complete before your application exits:
process.on('SIGTERM', async () => {
console.log('🛑 Shutting down...');
await monque.stop(); // Waits for in-progress jobs
await client.close();
console.log('👋 Goodbye!');
process.exit(0);
});
If stop() hits the configured shutdownTimeout, in-progress jobs are left in processing so stale job recovery can safely re-claim them later.
Here’s the full working example:
import { MongoClient } from 'mongodb';
import { Monque } from '@monque/core';
// Types
interface EmailJobData {
to: string;
subject: string;
body: string;
}
async function main() {
// Setup
const client = new MongoClient('mongodb://localhost:27017');
await client.connect();
const db = client.db('myapp');
const monque = new Monque(db);
await monque.initialize();
// Register worker
monque.register<EmailJobData>('send-email', async (job) => {
console.log(`📧 Sending to ${job.data.to}: ${job.data.subject}`);
// Simulate sending email
await new Promise(resolve => setTimeout(resolve, 100));
});
// Events
monque.on('job:complete', ({ job, duration }) => {
console.log(`✅ ${job.name} completed in ${duration}ms`);
});
// Start processing
monque.start();
// Enqueue some jobs
await monque.enqueue('send-email', {
to: 'alice@example.com',
subject: 'Hello Alice',
body: 'Welcome to our platform!'
});
await monque.enqueue('send-email', {
to: 'bob@example.com',
subject: 'Hello Bob',
body: 'Welcome to our platform!'
});
// Graceful shutdown
process.on('SIGTERM', async () => {
await monque.stop();
await client.close();
process.exit(0);
});
console.log('🎯 Scheduler running. Press Ctrl+C to stop.');
}
main().catch(console.error);
For scheduled tasks, use cron expressions:
// Run every hour at minute 0
await monque.schedule('0 * * * *', 'hourly-cleanup', {
type: 'temp-files',
maxAge: 3600
});
// Daily at midnight
await monque.schedule('0 0 * * *', 'daily-report', {
reportType: 'sales'
});
// Every 5 minutes
await monque.schedule('*/5 * * * *', 'health-check', {
endpoints: ['api', 'database', 'cache']
});
Customize Monque behavior with options:
For the full list of options and defaults, see the MonqueOptions API Reference.
const monque = new Monque(db, {
collectionName: 'jobs',
pollInterval: 1000,
maxRetries: 10,
baseRetryInterval: 1000,
shutdownTimeout: 30000,
defaultConcurrency: 5,
lockTimeout: 1800000,
heartbeatInterval: 30000,
});