DataQueueDataQueue
Usage

Long-Running Server

The Process Jobs page covers processing jobs in a serverless environment using cron-triggered API routes. If you're running a long-lived server (Express, Fastify, plain Node.js, etc.), you can instead run the processor continuously in the background and handle lifecycle management yourself.

Starting the Processor in the Background

Use startInBackground() to run the processor as a continuous polling loop. It will check for new jobs every pollInterval milliseconds (default: 5 seconds) and process them automatically.

import { initJobQueue } from '@nicnocquee/dataqueue';
import { jobHandlers } from './job-handlers';

const jobQueue = initJobQueue({
  databaseConfig: {
    connectionString: process.env.PG_DATAQUEUE_DATABASE,
  },
});

const processor = jobQueue.createProcessor(jobHandlers, {
  workerId: `server-${process.pid}`,
  batchSize: 10,
  concurrency: 3,
  pollInterval: 5000, // check for new jobs every 5 seconds
  onError: (error) => {
    // Called when an unexpected error occurs during batch processing.
    // Use this to send errors to your monitoring service.
    console.error('Processor error:', error);
  },
});

processor.startInBackground();

When a full batch is returned (i.e., the number of processed jobs equals batchSize), the processor immediately polls again without waiting, so it can drain a large backlog quickly. Once a batch returns fewer jobs than batchSize, it waits pollInterval before checking again.

Configuration Tips

  • pollInterval -- Lower values (e.g., 1000) reduce latency for new jobs but increase database load. Higher values (e.g., 10000) are gentler on the database but introduce more delay. 5 seconds is a good default.
  • concurrency -- Keep this proportional to your server's resources. If jobs call external APIs with rate limits, keep it low.
  • batchSize -- Larger batches reduce polling overhead but hold a database lock longer during claim. 10-20 is typical.
  • onError -- Always set this in production. Without it, errors default to console.error which is easy to miss.

Graceful Shutdown

When your server receives a termination signal (e.g., SIGTERM from a container orchestrator), you should stop the processor and wait for in-flight jobs to finish before exiting. Use stopAndDrain() for this.

async function shutdown() {
  console.log('Shutting down...');

  // Stop polling and wait for the current batch to finish (up to 30 seconds)
  await processor.stopAndDrain(30000);

  // Close the database connection pool
  // PostgreSQL:
  jobQueue.getPool().end();
  // Redis:
  // jobQueue.getRedisClient().quit();

  console.log('Shutdown complete');
  process.exit(0);
}

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

stopAndDrain() accepts an optional timeout in milliseconds (default: 30000). If the current batch does not finish within that time, the promise resolves anyway so your process is not stuck indefinitely.

Use stopAndDrain() instead of stop() for graceful shutdown. stop() halts the polling loop immediately without waiting for in-flight jobs, which can leave jobs stuck in the processing state until they are reclaimed.

Scheduling Maintenance Tasks

In a serverless setup, you use cron-triggered API routes for cleanup and reclaim. In a long-running server, you can use setInterval instead.

// Reclaim stuck jobs every 10 minutes
const reclaimInterval = setInterval(
  async () => {
    try {
      const reclaimed = await jobQueue.reclaimStuckJobs(10);
      if (reclaimed > 0) console.log(`Reclaimed ${reclaimed} stuck jobs`);
    } catch (error) {
      console.error('Reclaim error:', error);
    }
  },
  10 * 60 * 1000,
);

// Clean up completed jobs older than 30 days, once per day
const cleanupInterval = setInterval(
  async () => {
    try {
      const deleted = await jobQueue.cleanupOldJobs(30);
      if (deleted > 0) console.log(`Cleaned up ${deleted} old jobs`);

      const deletedEvents = await jobQueue.cleanupOldJobEvents(30);
      if (deletedEvents > 0)
        console.log(`Cleaned up ${deletedEvents} old job events`);
    } catch (error) {
      console.error('Cleanup error:', error);
    }
  },
  24 * 60 * 60 * 1000,
);

Make sure to clear these intervals during shutdown:

async function shutdown() {
  clearInterval(reclaimInterval);
  clearInterval(cleanupInterval);

  await processor.stopAndDrain(30000);

  jobQueue.getPool().end();
  process.exit(0);
}

If you use the wait/token feature (PostgreSQL only), also call expireTimedOutTokens() on an interval to expire tokens that have passed their timeout.

Full Example

Here is a complete Express server that ties everything together:

server.ts
import express from 'express';
import { initJobQueue } from '@nicnocquee/dataqueue';
import { jobHandlers } from './job-handlers';

// --- Initialize the queue ---
const jobQueue = initJobQueue({
  databaseConfig: {
    connectionString: process.env.PG_DATAQUEUE_DATABASE!,
  },
});

// --- Create and start the processor ---
const processor = jobQueue.createProcessor(jobHandlers, {
  workerId: `server-${process.pid}`,
  batchSize: 10,
  concurrency: 3,
  pollInterval: 5000,
  onError: (error) => {
    console.error('Processor error:', error);
  },
});

processor.startInBackground();

// --- Schedule maintenance ---
const reclaimInterval = setInterval(
  async () => {
    try {
      await jobQueue.reclaimStuckJobs(10);
    } catch (e) {
      console.error('Reclaim error:', e);
    }
  },
  10 * 60 * 1000,
);

const cleanupInterval = setInterval(
  async () => {
    try {
      await jobQueue.cleanupOldJobs(30);
      await jobQueue.cleanupOldJobEvents(30);
    } catch (e) {
      console.error('Cleanup error:', e);
    }
  },
  24 * 60 * 60 * 1000,
);

// --- Express app ---
const app = express();
app.use(express.json());

app.post('/jobs', async (req, res) => {
  const { jobType, payload } = req.body;
  const jobId = await jobQueue.addJob({ jobType, payload });
  res.json({ jobId });
});

app.get('/jobs/:id', async (req, res) => {
  const job = await jobQueue.getJob(Number(req.params.id));
  if (!job) return res.status(404).json({ error: 'Not found' });
  res.json(job);
});

const server = app.listen(3000, () => {
  console.log('Server running on port 3000');
});

// --- Graceful shutdown ---
async function shutdown() {
  console.log('Shutting down gracefully...');

  // Stop accepting new HTTP connections
  server.close();

  // Clear maintenance intervals
  clearInterval(reclaimInterval);
  clearInterval(cleanupInterval);

  // Wait for in-flight jobs to finish
  await processor.stopAndDrain(30000);

  // Close the database pool
  jobQueue.getPool().end();

  console.log('Shutdown complete');
  process.exit(0);
}

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);