Long-Running Server
The Process Jobs page covers processing jobs in a serverless environment using cron-triggered API routes. If you're running a long-lived server (Express, Fastify, plain Node.js, etc.), you can instead run the processor continuously in the background and handle lifecycle management yourself.
Starting the Processor in the Background
Use startInBackground() to run the processor as a continuous polling loop. It will check for new jobs every pollInterval milliseconds (default: 5 seconds) and process them automatically.
import { initJobQueue } from '@nicnocquee/dataqueue';
import { jobHandlers } from './job-handlers';
const jobQueue = initJobQueue({
databaseConfig: {
connectionString: process.env.PG_DATAQUEUE_DATABASE,
},
});
const processor = jobQueue.createProcessor(jobHandlers, {
workerId: `server-${process.pid}`,
batchSize: 10,
concurrency: 3,
pollInterval: 5000, // check for new jobs every 5 seconds
onError: (error) => {
// Called when an unexpected error occurs during batch processing.
// Use this to send errors to your monitoring service.
console.error('Processor error:', error);
},
});
processor.startInBackground();When a full batch is returned (i.e., the number of processed jobs equals batchSize), the processor immediately polls again without waiting, so it can drain a large backlog quickly. Once a batch returns fewer jobs than batchSize, it waits pollInterval before checking again.
Configuration Tips
pollInterval-- Lower values (e.g.,1000) reduce latency for new jobs but increase database load. Higher values (e.g.,10000) are gentler on the database but introduce more delay. 5 seconds is a good default.concurrency-- Keep this proportional to your server's resources. If jobs call external APIs with rate limits, keep it low.batchSize-- Larger batches reduce polling overhead but hold a database lock longer during claim. 10-20 is typical.onError-- Always set this in production. Without it, errors default toconsole.errorwhich is easy to miss.
Graceful Shutdown
When your server receives a termination signal (e.g., SIGTERM from a container orchestrator), you should stop the processor and wait for in-flight jobs to finish before exiting. Use stopAndDrain() for this.
async function shutdown() {
console.log('Shutting down...');
// Stop polling and wait for the current batch to finish (up to 30 seconds)
await processor.stopAndDrain(30000);
// Close the database connection pool
// PostgreSQL:
jobQueue.getPool().end();
// Redis:
// jobQueue.getRedisClient().quit();
console.log('Shutdown complete');
process.exit(0);
}
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);stopAndDrain() accepts an optional timeout in milliseconds (default: 30000). If the current batch does not finish within that time, the promise resolves anyway so your process is not stuck indefinitely.
Use stopAndDrain() instead of stop() for graceful shutdown. stop() halts
the polling loop immediately without waiting for in-flight jobs, which can
leave jobs stuck in the processing state until they are reclaimed.
Scheduling Maintenance Tasks
In a serverless setup, you use cron-triggered API routes for cleanup and reclaim. In a long-running server, you can use setInterval instead.
// Reclaim stuck jobs every 10 minutes
const reclaimInterval = setInterval(
async () => {
try {
const reclaimed = await jobQueue.reclaimStuckJobs(10);
if (reclaimed > 0) console.log(`Reclaimed ${reclaimed} stuck jobs`);
} catch (error) {
console.error('Reclaim error:', error);
}
},
10 * 60 * 1000,
);
// Clean up completed jobs older than 30 days, once per day
const cleanupInterval = setInterval(
async () => {
try {
const deleted = await jobQueue.cleanupOldJobs(30);
if (deleted > 0) console.log(`Cleaned up ${deleted} old jobs`);
const deletedEvents = await jobQueue.cleanupOldJobEvents(30);
if (deletedEvents > 0)
console.log(`Cleaned up ${deletedEvents} old job events`);
} catch (error) {
console.error('Cleanup error:', error);
}
},
24 * 60 * 60 * 1000,
);Make sure to clear these intervals during shutdown:
async function shutdown() {
clearInterval(reclaimInterval);
clearInterval(cleanupInterval);
await processor.stopAndDrain(30000);
jobQueue.getPool().end();
process.exit(0);
}If you use the wait/token feature (PostgreSQL only), also call
expireTimedOutTokens() on an interval to expire tokens that have passed
their timeout.
Full Example
Here is a complete Express server that ties everything together:
import express from 'express';
import { initJobQueue } from '@nicnocquee/dataqueue';
import { jobHandlers } from './job-handlers';
// --- Initialize the queue ---
const jobQueue = initJobQueue({
databaseConfig: {
connectionString: process.env.PG_DATAQUEUE_DATABASE!,
},
});
// --- Create and start the processor ---
const processor = jobQueue.createProcessor(jobHandlers, {
workerId: `server-${process.pid}`,
batchSize: 10,
concurrency: 3,
pollInterval: 5000,
onError: (error) => {
console.error('Processor error:', error);
},
});
processor.startInBackground();
// --- Schedule maintenance ---
const reclaimInterval = setInterval(
async () => {
try {
await jobQueue.reclaimStuckJobs(10);
} catch (e) {
console.error('Reclaim error:', e);
}
},
10 * 60 * 1000,
);
const cleanupInterval = setInterval(
async () => {
try {
await jobQueue.cleanupOldJobs(30);
await jobQueue.cleanupOldJobEvents(30);
} catch (e) {
console.error('Cleanup error:', e);
}
},
24 * 60 * 60 * 1000,
);
// --- Express app ---
const app = express();
app.use(express.json());
app.post('/jobs', async (req, res) => {
const { jobType, payload } = req.body;
const jobId = await jobQueue.addJob({ jobType, payload });
res.json({ jobId });
});
app.get('/jobs/:id', async (req, res) => {
const job = await jobQueue.getJob(Number(req.params.id));
if (!job) return res.status(404).json({ error: 'Not found' });
res.json(job);
});
const server = app.listen(3000, () => {
console.log('Server running on port 3000');
});
// --- Graceful shutdown ---
async function shutdown() {
console.log('Shutting down gracefully...');
// Stop accepting new HTTP connections
server.close();
// Clear maintenance intervals
clearInterval(reclaimInterval);
clearInterval(cleanupInterval);
// Wait for in-flight jobs to finish
await processor.stopAndDrain(30000);
// Close the database pool
jobQueue.getPool().end();
console.log('Shutdown complete');
process.exit(0);
}
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);