Skip to content

Job Management

Monque serves a comprehensive set of APIs for managing jobs throughout their lifecycle.

MethodDescription
cancelJob()Cancel a single job
retryJob()Retry a job
rescheduleJob()Reschedule a job
deleteJob()Delete a job
cancelJobs()Bulk cancel
retryJobs()Bulk retry
deleteJobs()Bulk delete
getJobsWithCursor()Paginated list
getQueueStats()Queue statistics
getQueueViewSummaries()Queue View summaries

Manage individual jobs using their unique ID.

Use cancelJob() to prevent a pending or scheduled job from running.

const job = await monque.cancelJob('job-id');
  • Effect: Status becomes cancelled.
  • Event: Emits job:cancelled only when the job transitions from pending to cancelled.
  • Constraint: Cannot cancel processing, completed, or failed jobs (unless they return to pending).

Use retryJob() to retry a failed or cancelled job immediately.

const job = await monque.retryJob('job-id');
  • Effect: Status becomes pending, nextRunAt set to now, failure count reset.
  • Event: Emits job:retried.

Use rescheduleJob() to change the execution time of a pending job.

const nextHour = new Date(Date.now() + 3600000);
const job = await monque.rescheduleJob('job-id', nextHour);

Use deleteJob() to permanently remove a job from the queue.

const success = await monque.deleteJob('job-id');
  • Effect: Document is removed from MongoDB.
  • Event: Emits job:deleted.

Perform operations on multiple jobs matching a filter.

Use cancelJobs() to cancel all pending jobs matching a criteria.

const result = await monque.cancelJobs({
  name: 'email-sender',
  status: 'pending'
});

console.log(`Cancelled ${result.count} jobs`);

Use retryJobs() to retry all failed jobs.

const result = await monque.retryJobs({
  status: 'failed'
});

Use deleteJobs() to clean up old jobs.

const result = await monque.deleteJobs({
  status: ['completed', 'cancelled'],
  olderThan: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000) // 7 days ago
});
  • Effect: Removing matching documents from MongoDB.
  • Event: Emits jobs:deleted with count.

Efficiently iterate through large sets of jobs using getJobsWithCursor().

// First page
const page1 = await monque.getJobsWithCursor({ limit: 50 });

// Next page
if (page1.hasNextPage && page1.cursor) {
  const page2 = await monque.getJobsWithCursor({
    cursor: page1.cursor,
    limit: 50
  });
}

Supports filtering:

const failedEmails = await monque.getJobsWithCursor({
  filter: { name: 'email', status: 'failed' },
  limit: 20
});

Get real-time insights into your queue health using getQueueStats().

const stats = await monque.getQueueStats();

console.log('Pending:', stats.pending);
console.log('Processing:', stats.processing);
console.log('Failed:', stats.failed);

You can also scope statistics to a specific job name:

const emailStats = await monque.getQueueStats({ name: 'email' });

getQueueStats() runs a MongoDB aggregation pipeline. To avoid re-running that pipeline on every call (e.g. in a metrics endpoint), Monque caches results with a TTL+LRU strategy.

By default the TTL is 5 seconds. Configure it via statsCacheTtlMs:

const monque = new Monque(db, {
  statsCacheTtlMs: 10_000  // Cache stats for 10 seconds
});

Behaviour details:

  • Each unique filter (global vs. name-scoped) has its own cache entry.
  • The cache is size-capped at 100 entries. When full, it uses an LRU eviction policy—the least-recently-used filter entry is evicted to make room for new entries.
  • Each per-filter entry adheres to the configured TTL (default 5s).
  • Cached results are returned without hitting MongoDB until the TTL expires.
  • The cache is cleared entirely when stop() is called.
  • Set statsCacheTtlMs: 0 to disable caching entirely and always run the aggregation live.
const monque = new Monque(db, {
  statsCacheTtlMs: 0  // Always run live aggregation
});

Use getQueueViewSummaries() to list operator-facing Job Name groups so callers don’t need to run their own MongoDB queries.

const queueViews = await monque.getQueueViewSummaries();

for (const view of queueViews) {
  console.log(view.name, view.stats.total, view.worker?.activeCount ?? 0);
}

Each summary includes:

  • persisted Job statistics for that Job Name
  • whether persisted Jobs exist
  • whether this scheduler has a local Worker registered
  • local Worker concurrency and activeCount when registered

The summaries include historical-only Job Names, registered-worker-only Job Names, and Job Names that have both persisted Jobs and Workers. Output is sorted by Job Name. Worker internals such as handler functions, Worker maps, and active Job ids are not exposed.