Skip to content

Heartbeat Mechanism

The heartbeat mechanism ensures jobs aren’t lost when a worker crashes. While processing, Monque periodically updates a timestamp on claimed jobs. On startup, Monque can recover jobs that were left stuck in processing.

When a worker crashes mid-processing:

  1. Job remains in processing status
  2. claimedBy still points to crashed instance
  3. No other worker can pick it up
  4. Job is effectively lost

While a job is processing, Monque periodically updates lastHeartbeat to signal liveness. Separately, Monque can recover jobs that have been processing for longer than lockTimeout by resetting them back to pending on startup.

It helps to think of these fields as serving different purposes:

  • lockedAt: Used for stale recovery. lockTimeout is an absolute limit on how long a job may remain locked/processing before it is considered stale and eligible for recovery.
  • lastHeartbeat: Used for monitoring and debugging. It lets you confirm a worker is still actively updating jobs, but it is not what the startup recovery logic uses to decide staleness.
sequenceDiagram
    participant W as Worker
    participant DB as MongoDB
    participant S as New Worker (startup)
    
    W->>DB: Claim job (set lastHeartbeat)
    loop Every 30 seconds
        W->>DB: Update lastHeartbeat
    end
    
    Note over W: Worker crashes!
    
    S->>DB: initialize(): Find jobs with old lockedAt
    S->>DB: Reset to pending
    
    Note over DB: Job available for retry
const monque = new Monque(db, {
  heartbeatInterval: 30000,   // Update heartbeat every 30s (default)
  lockTimeout: 1800000,       // Consider stale after 30 minutes (default)
  recoverStaleJobs: true      // Recover on startup (default)
});
OptionDefaultDescription
heartbeatInterval30000 (30s)How often Monque updates lastHeartbeat while processing (monitoring/debugging)
lockTimeout1800000 (30min)Maximum time since a job was claimed (lockedAt) before it is considered stale (absolute duration limit, not a heartbeat timeout)
recoverStaleJobstrueWhether to recover stale jobs on startup

When a worker claims a job:

// Job document after claim
{
  status: 'processing',
  claimedBy: 'worker-1',
  lockedAt: ISODate("2024-01-15T10:00:00Z"),
  lastHeartbeat: ISODate("2024-01-15T10:00:00Z")
}

Every heartbeatInterval milliseconds:

// Update query (internal)
await collection.updateMany(
  { claimedBy: this.instanceId, status: 'processing' },
  { $set: { lastHeartbeat: new Date(), updatedAt: new Date() } }
);

When job completes or fails:

// Fields cleared (unset)
{
  status: 'completed',  // or 'failed' / 'pending' for retry
  // claimedBy, lockedAt, lastHeartbeat, heartbeatInterval removed
}

During initialize() (when recoverStaleJobs: true), Monque recovers jobs that were left stuck in processing:

// Find and reset stale jobs
const cutoffTime = new Date(Date.now() - lockTimeout);

const result = await collection.updateMany(
  {
    status: 'processing',
    lockedAt: { $lt: cutoffTime }
  },
  {
    $set: { status: 'pending', updatedAt: new Date() },
    $unset: { lockedAt: '', claimedBy: '', lastHeartbeat: '', heartbeatInterval: '' }
  }
);

// Event emitted
monque.emit('stale:recovered', { count: result.modifiedCount });
monque.on('stale:recovered', ({ count }) => {
  if (count > 0) {
    console.log(`Recovered ${count} stale jobs`);
    metrics.increment('monque.stale_jobs_recovered', count);
  }
});

In some cases, you may want manual control:

const monque = new Monque(db, {
  recoverStaleJobs: false
});

// Manual recovery
async function recoverStaleJobs() {
  const cutoff = new Date(Date.now() - 30 * 60 * 1000);
  
  const result = await db.collection('monque_jobs').updateMany(
    { status: 'processing', lockedAt: { $lt: cutoff } },
    {
      $set: { status: 'pending', updatedAt: new Date() },
      $unset: { lockedAt: '', claimedBy: '', lastHeartbeat: '', heartbeatInterval: '' }
    }
  );
  
  return result.modifiedCount;
}
// Short jobs (< 1 minute)
const shortJobMonque = new Monque(db, {
  heartbeatInterval: 10000,   // 10 seconds
  lockTimeout: 120000         // 2 minutes
});

// Long jobs (up to 1 hour)
const longJobMonque = new Monque(db, {
  heartbeatInterval: 60000,   // 1 minute
  lockTimeout: 3600000        // 1 hour
});

Monque manages heartbeats automatically in the background. If a heartbeat update fails (e.g., database connectivity issues), Monque emits a job:error event.

Note that stale recovery is based on lockedAt + lockTimeout; lastHeartbeat is primarily an observability signal to verify worker liveness.

To verify heartbeats are updating normally during debugging, you can check the job status using the public API:

const job = await monque.getJob(jobId);
console.log('Last heartbeat:', job.lastHeartbeat);

For jobs that may exceed lockTimeout:

monque.register('long-analysis', async (job) => {
  const chunks = splitIntoChunks(job.data);
  
  for (const chunk of chunks) {
    // Process in chunks to allow heartbeats between
    await processChunk(chunk);
    // Heartbeat updates automatically during processing
  }
});
monque.on('stale:recovered', ({ count }) => {
  if (count > 0) {
    logger.warn('Stale jobs recovered', {
      count,
      possibleCauses: [
        'Worker crash',
        'Network partition',
        'Job exceeded lockTimeout'
      ]
    });
  }
});

Symptom: Jobs in progress are being reset

Solution: Increase lockTimeout:

const monque = new Monque(db, {
  lockTimeout: 3600000  // 1 hour instead of 30 minutes
});

Symptom: Stale jobs remain stuck

Checks:

  1. Verify recoverStaleJobs: true
  2. Check lockTimeout isn’t too high
  3. Ensure initialize() was called

Symptom: Many jobs recovered on each startup

Possible Causes:

  • Frequent worker crashes
  • Jobs taking longer than lockTimeout
  • Network instability causing heartbeat failures

Investigation:

// Query to find stale job patterns
const lockTimeout = 30 * 60 * 1000; // 30 minutes

const staleJobs = await db.collection('monque_jobs').find({
  status: 'processing',
  lockedAt: { $lt: new Date(Date.now() - lockTimeout) }
}).toArray();

// Analyze by job name
const byName = staleJobs.reduce((acc, job) => {
  acc[job.name] = (acc[job.name] || 0) + 1;
  return acc;
}, {});

console.log('Stale jobs by type:', byName);

Monque creates indexes to support both recovery and observability queries:

// Index used for stale recovery scans (and time-window analysis)
{ status: 1, lockedAt: 1, lastHeartbeat: 1 }

// Index used for monitoring (e.g., finding jobs with old heartbeats)
{ lastHeartbeat: 1, status: 1 }

// Index for heartbeat updates
{ claimedBy: 1, status: 1 }

The stale recovery query shown in this document filters by status (equality) and lockedAt (range), so the recovery index is ordered to start with { status: 1, lockedAt: 1 }. The lastHeartbeat suffix mainly supports additional monitoring/debugging access patterns (and leaves room for recovery strategies that also consider heartbeat age), rather than implying that lockTimeout is a heartbeat timeout.

Each scheduler instance:

  1. Maintains its own heartbeat interval
  2. Only updates heartbeats for jobs it owns (claimedBy: thisInstanceId)
  3. Can recover stale jobs from any crashed instance
// Instance 1 crashes while processing job
// Instance 2 (or 3, 4...) recovers the job on next startup or stale check