Skip to content

Change Streams

MongoDB Change Streams provide real-time notifications when jobs are created or updated, eliminating polling latency for instant job processing.

Change Streams allow applications to subscribe to real-time data changes in MongoDB collections. Instead of polling (“Are there new jobs?”), Monque receives push notifications (“A new job was just added!”).

When you call start(), Monque:

  1. Opens a Change Stream on the jobs collection
  2. Listens for events: new job insertions and status changes
  3. Triggers immediate processing when relevant changes occur
  4. Maintains polling as backup in case Change Streams fail
const monque = new Monque(db);
await monque.initialize();
monque.start();

// Behind the scenes:
// - Change stream opened
// - Polling continues at `pollInterval`
// - Change stream events trigger a debounced poll for low-latency processing
// Monque watches for:
{
  $or: [
    { operationType: 'insert' },  // New jobs
    {
      operationType: 'update',
      'updateDescription.updatedFields.status': { $exists: true }
    }  // Status changes (e.g., retry → pending)
  ]
}
Without Change StreamsWith Change Streams
Reacts on poll cycleReacts on change events
Latency: up to pollIntervalLatency: typically within the debounce window + processing time
Poll queries run while idlePolling still runs, but change events reduce time-to-start
More idle work at scaleFewer “wait for poll” delays at scale

Change Streams require:

  • MongoDB 4.0+
  • Replica Set or Sharded Cluster (not standalone)

When Monque successfully opens a Change Stream:

monque.on('changestream:connected', () => {
  console.log('✅ Change Stream connected - instant job notifications active');
});

When a job is inserted or becomes ready:

// Internal flow:
// 1. Change Stream receives notification
// 2. Debounce timer (100ms) prevents claim storms
// 3. Worker attempts to claim job
// 4. Job is processed if claim succeeds

If the Change Stream encounters an error:

monque.on('changestream:error', ({ error }) => {
  console.error('Change Stream error:', error.message);
  // Monque will attempt to reconnect
});

After 3 failed reconnection attempts:

monque.on('changestream:fallback', ({ reason }) => {
  console.warn('Falling back to polling:', reason);
  // System continues working, just with higher latency
});

On graceful shutdown:

monque.on('changestream:closed', () => {
  console.log('Change Stream closed');
});

pollInterval always controls how often the scheduler polls the database:

const monque = new Monque(db, {
  pollInterval: 10000
});

Monque implements exponential backoff for reconnection:

// Reconnection attempts:
// Attempt 1: Wait 1 second
// Attempt 2: Wait 2 seconds
// Attempt 3: Wait 4 seconds
// After 3 failures: Fall back to polling-only mode
// isHealthy() indicates the scheduler is started and initialized.
// Change Streams are optional and do not affect this.
if (monque.isHealthy()) {
  // Scheduler is running
}
// Track Change Stream state
let changeStreamActive = false;

monque.on('changestream:connected', () => {
  changeStreamActive = true;
  metrics.gauge('monque.changestream_active', 1);
});

monque.on('changestream:fallback', () => {
  changeStreamActive = false;
  metrics.gauge('monque.changestream_active', 0);
  alerting.warn('Monque Change Stream fell back to polling');
});

monque.on('changestream:closed', () => {
  changeStreamActive = false;
  metrics.gauge('monque.changestream_active', 0);
});

To prevent “claim storms” when many jobs arrive simultaneously:

// Internal behavior:
// - Multiple change events within 100ms are batched
// - Single claim attempt processes available jobs
// - Prevents overwhelming the database with concurrent claims
  1. Check MongoDB version: Must be 4.0+

    mongosh --eval "db.version()"
  2. Verify replica set: Change Streams require replica set

    mongosh --eval "rs.status()"
  3. Check events: Listen for error events

    monque.on('changestream:error', ({ error }) => {
      console.error('Change Stream error:', error);
    });
  1. Check if fallback occurred: Listen for changestream:fallback
  2. Verify network latency: High latency to MongoDB affects notifications
  3. Monitor debounce timing: Very high job volume may show 100ms debounce delay

If you see repeated connection/disconnection:

let reconnectCount = 0;

monque.on('changestream:connected', () => {
  reconnectCount = 0;
});

monque.on('changestream:error', () => {
  reconnectCount++;
  if (reconnectCount > 10) {
    console.error('Frequent Change Stream disconnections - check MongoDB stability');
  }
});

Monque does not publish benchmark numbers for Change Streams.

In general:

  • Polling-only pickup latency is bounded by pollInterval.
  • With Change Streams enabled, pickup latency is typically bounded by the Change Stream notification delay plus Monque’s 100ms debounce window and processing time.

The exact numbers depend on MongoDB deployment (local vs Atlas), network latency, job volume, and worker load. If you want hard data, measure pickup latency in your application:

const enqueueTime = Date.now();

await monque.enqueue('measured-job', { enqueueTime });

monque.register('measured-job', async (job) => {
  const pickupLatency = Date.now() - job.data.enqueueTime;
  // Replace `yourMetrics` with your metrics client (Prometheus, StatsD, OpenTelemetry, ...)
  yourMetrics.histogram('monque.pickup_latency_ms', pickupLatency);
});

To compare polling-only vs Change Streams:

  • Run against a standalone MongoDB (no replica set) to force polling-only mode.
  • Run against a replica set/sharded cluster (e.g., Atlas) to enable Change Streams.

Enabling Change Streams maintains an open cursor to the MongoDB server. While typically lightweight, the actual memory footprint depends on your specific workload:

  • Driver Buffering: The MongoDB Node.js driver buffers events internally.
  • Event Volume: High update rates will increase memory pressure if consumers lag.
  • Document Size: Large updateDescription fields in change events increase payload size.

If memory usage is a critical concern, we recommend profiling your application under load, as the overhead is generally negligible for most use cases but can vary.

monque.on('changestream:connected', () => {
  logger.info('Change Stream connected');
});

monque.on('changestream:fallback', ({ reason }) => {
  logger.warn('Change Stream fallback', { reason });
});

monque.on('changestream:error', ({ error }) => {
  logger.error('Change Stream error', { error: error.message });
});
// Your application works whether Change Streams are active or not
// Just with different latency characteristics

The snippet above shows one simple way to capture pickup latency.