DataQueueDataQueue
Usage

Scaling to Thousands of Jobs

DataQueue is designed to handle high-volume workloads out of the box. This page covers how to tune your setup for thousands (or more) of concurrent jobs, whether you're using PostgreSQL or Redis.

How Throughput Works

When a processor runs, it follows this cycle:

  1. Claim a batch of ready jobs from the database (atomically, using FOR UPDATE SKIP LOCKED in PostgreSQL or Lua scripts in Redis).
  2. Process up to concurrency jobs in parallel from that batch.
  3. Repeat immediately if the batch was full, or wait pollInterval before checking again.

The theoretical maximum throughput of a single processor instance is:

throughput = batchSize / (avgJobDuration + pollInterval)

For example, with batchSize: 20, concurrency: 10, pollInterval: 2000ms, and jobs averaging 500ms each, a single processor can handle roughly 10 jobs/second (600/minute).

When a full batch is returned, the processor immediately polls again without waiting. This means it can drain backlogs much faster than the formula suggests during peak load.

Tuning Processor Settings

Batch Size

batchSize controls how many jobs are claimed per polling cycle.

EnvironmentRecommendedWhy
Serverless (Vercel, Lambda)1-10Functions have execution time limits
Long-running server10-50Larger batches reduce polling overhead
High-throughput worker50-100Maximizes throughput per poll cycle

Concurrency

concurrency controls how many jobs from the batch run in parallel.

  • CPU-bound jobs (image processing, compression): keep concurrency low (1-4) to avoid CPU saturation.
  • IO-bound jobs (API calls, email sending): higher concurrency (5-20) works well since jobs spend most time waiting.
  • Rate-limited APIs: match concurrency to the API's rate limit to avoid throttling.

Poll Interval

pollInterval controls how often the processor checks for new jobs when idle.

  • 1000ms: Low latency, higher database load. Good for real-time workloads.
  • 5000ms (default): Balanced. Good for most use cases.
  • 10000-30000ms: Gentle on the database. Use when latency tolerance is high.
const processor = jobQueue.createProcessor(jobHandlers, {
  batchSize: 30,
  concurrency: 10,
  pollInterval: 2000,
});

Horizontal Scaling with Multiple Workers

DataQueue supports running multiple processor instances simultaneously -- on the same server, across multiple servers, or in separate containers. No coordination is needed between workers.

How It Works

  • Each processor gets a unique workerId (auto-generated, or set manually).
  • PostgreSQL uses FOR UPDATE SKIP LOCKED to ensure no two workers claim the same job.
  • Redis uses atomic Lua scripts for the same guarantee.
  • Workers can safely run on different machines pointing at the same database.

Example: Multiple Workers

// Worker 1 (e.g., on server A)
const processor1 = jobQueue.createProcessor(jobHandlers, {
  workerId: 'worker-a',
  batchSize: 20,
  concurrency: 5,
});
processor1.startInBackground();

// Worker 2 (e.g., on server B)
const processor2 = jobQueue.createProcessor(jobHandlers, {
  workerId: 'worker-b',
  batchSize: 20,
  concurrency: 5,
});
processor2.startInBackground();

Specialized Workers

Use the jobType filter to create workers dedicated to specific job types. This lets you scale different workloads independently:

// Fast worker for lightweight jobs
const emailWorker = jobQueue.createProcessor(jobHandlers, {
  jobType: 'email',
  batchSize: 50,
  concurrency: 20,
  pollInterval: 1000,
});

// Slow worker for heavy jobs
const reportWorker = jobQueue.createProcessor(jobHandlers, {
  jobType: 'report',
  batchSize: 5,
  concurrency: 1,
  pollInterval: 5000,
});

PostgreSQL Scaling

Connection Pool

Each processor instance uses database connections from the pool. The default max is 10, which works for a single processor. If you run multiple processors in the same process, increase the pool size:

const jobQueue = initJobQueue({
  databaseConfig: {
    connectionString: process.env.DATABASE_URL,
    max: 20, // increase for multiple processors
  },
});

If you run processors on separate servers, each has its own pool. The total connections across all servers should stay within your database's max_connections setting (typically 100 for managed databases like Neon or Supabase).

Table Maintenance

As completed jobs accumulate, the job_queue table grows. This doesn't affect processing speed (claim queries use partial indexes that only cover active jobs), but it does increase storage and slow down full-table queries like getJobs().

Run cleanup regularly:

// Delete completed jobs older than 30 days
await jobQueue.cleanupOldJobs(30);

// Delete old job events
await jobQueue.cleanupOldJobEvents(30);

Cleanup operations are batched internally (1000 rows at a time) so they won't lock the table or timeout even with hundreds of thousands of old jobs.

Run reclaim regularly:

// Reclaim jobs stuck in 'processing' for more than 10 minutes
await jobQueue.reclaimStuckJobs(10);

This recovers jobs from workers that crashed or timed out.

Monitoring Table Size

You can monitor the job_queue table size with this query:

SELECT
  pg_size_pretty(pg_total_relation_size('job_queue')) AS total_size,
  (SELECT count(*) FROM job_queue WHERE status = 'pending') AS pending,
  (SELECT count(*) FROM job_queue WHERE status = 'processing') AS processing,
  (SELECT count(*) FROM job_queue WHERE status = 'completed') AS completed,
  (SELECT count(*) FROM job_queue WHERE status = 'failed') AS failed;

Performance Indexes

DataQueue includes optimized partial indexes out of the box:

  • idx_job_queue_claimable -- speeds up job claiming (pending jobs by priority).
  • idx_job_queue_failed_retry -- speeds up retry scheduling.
  • idx_job_queue_stuck -- speeds up stuck job reclamation.
  • idx_job_queue_cleanup -- speeds up cleanup of old completed jobs.

These are created automatically when you run migrations.

Redis Scaling

Memory Usage

Redis stores everything in memory. Estimate memory usage as:

JobsApproximate Memory
1,0001-2 MB
10,00010-20 MB
100,000100-200 MB

This assumes payloads under 1 KB each. Larger payloads increase memory proportionally.

Key Best Practices

  • Enable persistence: Use AOF (Append Only File) for durability. Without it, a Redis restart loses all jobs.
  • Use keyPrefix to isolate multiple queues in the same Redis instance:
const jobQueue = initJobQueue({
  backend: 'redis',
  redisConfig: {
    url: process.env.REDIS_URL,
    keyPrefix: 'myapp:jobs:',
  },
});
  • Run cleanup regularly to free memory from completed jobs. Like PostgreSQL, Redis cleanup is batched internally using cursor-based scanning, so it's safe to run even with a large number of completed jobs.

Payload Best Practices

Keep payloads small. Store references (IDs, URLs) rather than full data:

// Good: small payload with reference
await jobQueue.addJob({
  jobType: 'processImage',
  payload: { imageId: 'img_abc123', bucket: 's3://uploads' },
});

// Avoid: large payload with embedded data
await jobQueue.addJob({
  jobType: 'processImage',
  payload: { imageData: '<base64 string of 5MB image>' }, // too large
});

A good target is under 10 KB per payload. This keeps database queries fast and Redis memory predictable.

Example: Processing 10,000 Jobs per Hour

Here's a configuration that can comfortably process ~10,000 jobs per hour, assuming each job takes about 1 second:

worker.ts
import { initJobQueue } from '@nicnocquee/dataqueue';
import { jobHandlers } from './job-handlers';

const jobQueue = initJobQueue({
  databaseConfig: {
    connectionString: process.env.DATABASE_URL,
    max: 15, // room for processor + maintenance queries
  },
});

const processor = jobQueue.createProcessor(jobHandlers, {
  batchSize: 30,
  concurrency: 10, // 10 jobs in parallel
  pollInterval: 2000,
  onError: (error) => {
    console.error('Processor error:', error);
  },
});

processor.startInBackground();

// Maintenance
setInterval(
  async () => {
    try {
      await jobQueue.reclaimStuckJobs(10);
    } catch (e) {
      console.error(e);
    }
  },
  10 * 60 * 1000,
);

setInterval(
  async () => {
    try {
      await jobQueue.cleanupOldJobs(30);
      await jobQueue.cleanupOldJobEvents(30);
    } catch (e) {
      console.error(e);
    }
  },
  24 * 60 * 60 * 1000,
);

// Graceful shutdown
process.on('SIGTERM', async () => {
  await processor.stopAndDrain(30000);
  jobQueue.getPool().end();
  process.exit(0);
});

With 10 concurrent jobs and ~1s each, a single worker handles roughly 10 jobs/second = 36,000 jobs/hour. For higher throughput, add more worker instances.

Quick Reference

ScaleWorkersbatchSizeconcurrencypollIntervalNotes
< 100 jobs/hour11035000msDefault settings work fine
100-1,000/hour12053000msSingle worker is sufficient
1,000-10,000/hour1-230102000msAdd a second worker if needed
10,000-100,000/hour2-550151000msMultiple workers recommended
100,000+/hour5+50-100201000msSpecialized workers per job type