Skip to main content

Worker pools

Worker pools are essential for managing concurrent tasks efficiently in web applications, especially when working with CPU-intensive operations or WebAssembly modules that benefit from parallel processing.

Overview

A worker pool manages a collection of Web Workers, distributing tasks among them to maximize CPU utilization while avoiding the overhead of creating and destroying workers for each task.

Implementation

A robust worker pool implementation with load balancing, error handling, timeout management, and comprehensive statistics tracking.

Complete Worker Pool Implementation
/**
* Worker Pool Implementation for Managing Concurrent Tasks
*
* This example demonstrates a robust worker pool that can handle
* multiple concurrent tasks with proper load balancing and error handling.
*/

interface Task<T = any> {
id: string;
data: T;
resolve: (result: any) => void;
reject: (error: Error) => void;
timestamp: number;
}

interface WorkerPoolOptions {
workerScript: string;
maxWorkers?: number;
taskTimeout?: number;
maxRetries?: number;
}

interface WorkerInstance {
worker: Worker;
busy: boolean;
currentTask: Task | null;
tasksCompleted: number;
}

export class WorkerPool {
private workers: WorkerInstance[] = [];
private taskQueue: Task[] = [];
private maxWorkers: number;
private taskTimeout: number;
private maxRetries: number;
private workerScript: string;
private isDestroyed: boolean = false;

constructor(options: WorkerPoolOptions) {
this.workerScript = options.workerScript;
this.maxWorkers = options.maxWorkers || navigator.hardwareConcurrency || 4;
this.taskTimeout = options.taskTimeout || 30000; // 30 seconds
this.maxRetries = options.maxRetries || 3;

// Initialize workers
this.initializeWorkers();
}

/**
* Execute a task using the worker pool
*/
async execute<T, R>(data: T): Promise<R> {
if (this.isDestroyed) {
throw new Error('Worker pool has been destroyed');
}

return new Promise<R>((resolve, reject) => {
const task: Task<T> = {
id: this.generateTaskId(),
data,
resolve,
reject,
timestamp: Date.now(),
};

this.taskQueue.push(task);
this.processQueue();
});
}

/**
* Get pool statistics
*/
getStats() {
const busyWorkers = this.workers.filter((w) => w.busy).length;
const totalTasksCompleted = this.workers.reduce(
(sum, w) => sum + w.tasksCompleted,
0
);

return {
totalWorkers: this.workers.length,
busyWorkers,
availableWorkers: this.workers.length - busyWorkers,
queueLength: this.taskQueue.length,
totalTasksCompleted,
};
}

/**
* Terminate all workers and clean up resources
*/
destroy(): void {
this.isDestroyed = true;

// Reject all queued tasks
this.taskQueue.forEach((task) => {
task.reject(new Error('Worker pool destroyed'));
});
this.taskQueue = [];

// Terminate all workers
this.workers.forEach((workerInstance) => {
if (workerInstance.currentTask) {
workerInstance.currentTask.reject(new Error('Worker terminated'));
}
workerInstance.worker.terminate();
});
this.workers = [];
}

private initializeWorkers(): void {
for (let i = 0; i < this.maxWorkers; i++) {
this.createWorker();
}
}

private createWorker(): void {
const worker = new Worker(this.workerScript);
const workerInstance: WorkerInstance = {
worker,
busy: false,
currentTask: null,
tasksCompleted: 0,
};

worker.addEventListener('message', (event) => {
this.handleWorkerMessage(workerInstance, event);
});

worker.addEventListener('error', (error) => {
this.handleWorkerError(workerInstance, error);
});

this.workers.push(workerInstance);
}

private handleWorkerMessage(
workerInstance: WorkerInstance,
event: MessageEvent
): void {
const { taskId, result, error } = event.data;
const task = workerInstance.currentTask;

if (!task || task.id !== taskId) {
console.warn('Received message for unknown task:', taskId);
return;
}

// Mark worker as available
workerInstance.busy = false;
workerInstance.currentTask = null;
workerInstance.tasksCompleted++;

if (error) {
task.reject(new Error(error));
} else {
task.resolve(result);
}

// Process next task in queue
this.processQueue();
}

private handleWorkerError(
workerInstance: WorkerInstance,
error: ErrorEvent
): void {
console.error('Worker error:', error);

const task = workerInstance.currentTask;
if (task) {
workerInstance.busy = false;
workerInstance.currentTask = null;
task.reject(new Error(`Worker error: ${error.message}`));
}

// Replace the failed worker
this.replaceWorker(workerInstance);

// Process queue with remaining workers
this.processQueue();
}

private replaceWorker(failedWorker: WorkerInstance): void {
// Remove failed worker
const index = this.workers.indexOf(failedWorker);
if (index > -1) {
this.workers.splice(index, 1);
failedWorker.worker.terminate();
}

// Create replacement worker
this.createWorker();
}

private processQueue(): void {
if (this.taskQueue.length === 0) {
return;
}

// Find available worker
const availableWorker = this.workers.find((w) => !w.busy);
if (!availableWorker) {
return;
}

// Get next task
const task = this.taskQueue.shift();
if (!task) {
return;
}

// Check if task has timed out while in queue
if (Date.now() - task.timestamp > this.taskTimeout) {
task.reject(new Error('Task timed out in queue'));
this.processQueue(); // Try next task
return;
}

// Assign task to worker
availableWorker.busy = true;
availableWorker.currentTask = task;

// Set up timeout for the task
const timeoutId = setTimeout(() => {
if (availableWorker.currentTask?.id === task.id) {
availableWorker.busy = false;
availableWorker.currentTask = null;
task.reject(new Error('Task execution timeout'));
this.processQueue();
}
}, this.taskTimeout);

// Send task to worker
availableWorker.worker.postMessage({
taskId: task.id,
data: task.data,
});

// Clear timeout when task completes
const originalResolve = task.resolve;
const originalReject = task.reject;

task.resolve = (result) => {
clearTimeout(timeoutId);
originalResolve(result);
};

task.reject = (error) => {
clearTimeout(timeoutId);
originalReject(error);
};

// Continue processing queue if more workers available
this.processQueue();
}

private generateTaskId(): string {
return `task_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}

// Example usage:
/*
const pool = new WorkerPool({
workerScript: './data-processor.worker.js',
maxWorkers: 4,
taskTimeout: 10000,
});

try {
const result = await pool.execute({ data: [1, 2, 3, 4, 5] });
console.log('Processing result:', result);
} catch (error) {
console.error('Task failed:', error);
} finally {
pool.destroy();
}
*/

Key Features

1. Load Balancing

  • Automatic distribution of tasks to available workers
  • Queue management for when all workers are busy
  • Prevention of worker starvation

2. Error Handling

  • Worker crash recovery with automatic replacement
  • Task timeout management
  • Graceful degradation when workers fail

3. Resource Management

  • Configurable pool size based on system capabilities
  • Proper cleanup and worker termination
  • Memory usage monitoring

4. Performance Monitoring

  • Task execution time tracking
  • Worker utilization statistics
  • Queue length monitoring

Best Practices

  1. Size the pool appropriately - Usually navigator.hardwareConcurrency or slightly less
  2. Handle worker failures - Always have a recovery strategy for crashed workers
  3. Implement timeouts - Prevent tasks from hanging indefinitely
  4. Monitor performance - Track metrics to optimize pool configuration
  5. Clean up resources - Always call destroy() when the pool is no longer needed

Integration with WebAssembly

Worker pools are particularly effective when combined with WebAssembly modules for CPU-intensive tasks:

// Example worker script for WASM processing
// worker-wasm-processor.js
import init, { process_data } from './pkg/my_wasm_module.js';

let wasmInitialized = false;

self.onmessage = async function (e) {
const { taskId, data } = e.data;

try {
if (!wasmInitialized) {
await init();
wasmInitialized = true;
}

const result = process_data(new Uint8Array(data));
self.postMessage({ taskId, result: Array.from(result) });
} catch (error) {
self.postMessage({ taskId, error: error.message });
}
};

Next Steps