Workers
Workers are the execution engines of Monque. They pick up jobs and execute your business logic.
Quick Reference
Section titled “Quick Reference”| Method / Type | Description |
|---|---|
register() | Register a job handler |
WorkerOptions | Worker configuration (concurrency, replace) |
start() | Start processing jobs |
stop() | Graceful shutdown |
now() | Enqueue for immediate execution |
MonqueOptions.workerConcurrency | Global concurrency default |
Registering a Worker
Section titled “Registering a Worker”Use the register() method to register a handler for a specific job type:
With Type Safety
Section titled “With Type Safety”Define the expected data type for better IDE support and compile-time checks:
With Worker Options
Section titled “With Worker Options”Configure worker-specific settings via WorkerOptions:
Worker Options
Section titled “Worker Options”| Option | Type | Default | Description |
|---|---|---|---|
concurrency | number | 5 (from workerConcurrency) | Maximum concurrent jobs for this worker |
replace | boolean | false | Replace an existing worker with the same name instead of throwing |
Worker Lifecycle
Section titled “Worker Lifecycle”1. Job Acquisition
Section titled “1. Job Acquisition”When start() is called, Monque begins looking for jobs:
- Polling runs every
pollIntervalmilliseconds - Change Streams (if available) trigger an additional, debounced poll for low-latency processing
2. Atomic Claim
Section titled “2. Atomic Claim”Jobs are claimed atomically using findOneAndUpdate:
This prevents two scheduler instances from claiming the same pending job at the same time.
3. Execution
Section titled “3. Execution”Your handler function is called with the full job document:
4. Completion or Failure
Section titled “4. Completion or Failure”- Success: Job status set to
completed - Error thrown: Job retried or set to
failed(see Retry & Backoff)
Error Handling
Section titled “Error Handling”Automatic Retry
Section titled “Automatic Retry”If your handler throws or returns a rejected promise, the job is automatically retried:
Manual Failure
Section titled “Manual Failure”To control retry behavior for expected failures, you can check job.failCount or throw MonqueError. Note that throwing any error inside a monque.register handler will trigger the retry policy by default.
Concurrency Control
Section titled “Concurrency Control”Global Default
Section titled “Global Default”Set in MonqueOptions:
Per-Worker Override
Section titled “Per-Worker Override”Instance-level Concurrency
Section titled “Instance-level Concurrency”Use instanceConcurrency to limit the total number of jobs processed by a single scheduler instance across all workers:
Concurrency Strategy
Section titled “Concurrency Strategy”| Scenario | Recommended Concurrency |
|---|---|
| CPU-bound tasks | 1-2 per core |
| I/O-bound tasks (database, API) | 5-20 |
| Memory-intensive tasks | Limited by available RAM |
| External API with rate limits | Match API limits |
Multiple Workers
Section titled “Multiple Workers”Register multiple workers for different job types:
Worker Registration Timing
Section titled “Worker Registration Timing”Workers can be registered before or after start():
Best Practices
Section titled “Best Practices”1. Keep Handlers Focused
Section titled “1. Keep Handlers Focused”Each worker should do one thing well:
2. Handle Partial Failures
Section titled “2. Handle Partial Failures”Design for idempotency:
3. Log Strategically
Section titled “3. Log Strategically”Use job events for centralized logging:
Next Steps
Section titled “Next Steps”- Scheduling - Configure recurring jobs
- Retry & Backoff - Understand failure handling
- Heartbeat Mechanism - Learn about liveness tracking
API Reference
Section titled “API Reference”Monque.register()- Register a worker handlerMonque.start()- Start the schedulerMonque.stop()- Graceful shutdownMonque.now()- Immediate job executionWorkerOptions- Worker configurationMonqueOptions- Scheduler optionsJob- Job document passed to handlersMonqueEventMap- Available events