Rayon & Thread Parallelism
Rayon provides data parallelism in Rust with minimal overhead, while WebAssembly's threading model enables true parallel execution in web browsers. Understanding both is crucial for high-performance web applications.
Rayon Fundamentals
Parallel Iterators
📁 Reference: parallel-processing.example.rs
use rayon::prelude::*;
// Sequential processing
let data: Vec<i32> = (0..1_000_000).collect();
let sum: i32 = data.iter().map(|x| x * x).sum();
// Parallel processing - just add .par_iter()
let parallel_sum: i32 = data.par_iter().map(|x| x * x).sum();
// More complex parallel operations
let results: Vec<_> = data
.par_iter()
.filter(|&&x| x % 2 == 0)
.map(|&x| expensive_computation(x))
.collect();
fn expensive_computation(x: i32) -> i32 {
// Simulate CPU-intensive work
(0..1000).fold(x, |acc, i| acc + i * x)
}
Parallel Patterns
📁 Reference: parallel-processing.example.rs
use rayon::prelude::*;
// Parallel map-reduce
fn parallel_word_count(texts: &[String]) -> usize {
texts
.par_iter()
.map(|text| text.split_whitespace().count())
.sum()
}
// Parallel search
fn parallel_find_max(data: &[f64]) -> Option<f64> {
data.par_iter()
.copied()
.max_by(|a, b| a.partial_cmp(b).unwrap())
}
// Parallel sorting
fn parallel_sort_by_key<T, K>(data: &mut [T], key_fn: impl Fn(&T) -> K + Sync)
where
T: Send,
K: Ord,
{
data.par_sort_by_key(key_fn);
}
// Custom parallel reduction
fn parallel_variance(data: &[f64]) -> f64 {
let mean = data.par_iter().sum::<f64>() / data.len() as f64;
let variance = data
.par_iter()
.map(|&x| (x - mean).powi(2))
.sum::<f64>() / data.len() as f64;
variance
}
Thread Pool Configuration
Custom Thread Pools
📁 Reference: parallel-processing.example.rs
use rayon::{ThreadPool, ThreadPoolBuilder};
use std::sync::Arc;
// Configure global thread pool
fn configure_global_pool() -> Result<(), rayon::ThreadPoolBuildError> {
rayon::ThreadPoolBuilder::new()
.num_threads(4)
.stack_size(2 * 1024 * 1024) // 2MB stack
.thread_name(|i| format!("rayon-worker-{}", i))
.build_global()
}
// Create custom thread pool for specific tasks
fn create_specialized_pool() -> ThreadPool {
ThreadPoolBuilder::new()
.num_threads(2)
.thread_name(|i| format!("compute-{}", i))
.build()
.expect("Failed to create thread pool")
}
// Use custom pool with install()
fn use_custom_pool() {
let pool = create_specialized_pool();
pool.install(|| {
// All rayon operations in this closure use the custom pool
let data: Vec<i32> = (0..1000).collect();
let result: i32 = data.par_iter().map(|x| x * 2).sum();
println!("Result: {}", result);
});
}
// Scoped thread pool for RAII
fn scoped_parallel_work() {
let data = vec![1, 2, 3, 4, 5];
let mut results = vec![0; data.len()];
rayon::scope(|s| {
for (i, &item) in data.iter().enumerate() {
s.spawn(move |_| {
// Expensive computation
results[i] = expensive_computation(item);
});
}
}); // All spawned tasks complete here
println!("Results: {:?}", results);
}
WebAssembly Integration
WASM Thread Configuration
📁 Reference: wasm-rayon-integration.example.rs
use wasm_bindgen::prelude::*;
use rayon::prelude::*;
#[wasm_bindgen]
pub struct ParallelProcessor {
thread_pool: Option<rayon::ThreadPool>,
}
#[wasm_bindgen]
impl ParallelProcessor {
#[wasm_bindgen(constructor)]
pub fn new(num_threads: usize) -> Result<ParallelProcessor, JsValue> {
// Enable console_error_panic_hook for better debugging
#[cfg(feature = "console_error_panic_hook")]
console_error_panic_hook::set_once();
// Initialize Rayon with specified thread count
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.map_err(|e| JsValue::from_str(&format!("Failed to create thread pool: {}", e)))?;
Ok(ParallelProcessor {
thread_pool: Some(pool),
})
}
#[wasm_bindgen]
pub fn parallel_sum(&self, data: &[i32]) -> i32 {
if let Some(ref pool) = self.thread_pool {
pool.install(|| {
data.par_iter().sum()
})
} else {
data.iter().sum()
}
}
#[wasm_bindgen]
pub fn parallel_map_square(&self, data: &[i32]) -> Vec<i32> {
if let Some(ref pool) = self.thread_pool {
pool.install(|| {
data.par_iter().map(|&x| x * x).collect()
})
} else {
data.iter().map(|&x| x * x).collect()
}
}
}
// Helper for complex parallel operations
// Full implementation: https://github.com/BaDaaS/web-learning/blob/main/examples/rust-wasm/wasm-rayon-integration.example.rs#L97-L145
#[wasm_bindgen]
pub fn parallel_matrix_multiply(
a: &[f64], a_rows: usize, a_cols: usize,
b: &[f64], b_rows: usize, b_cols: usize,
) -> Result<Vec<f64>, JsValue> {
if a_cols != b_rows {
return Err(JsValue::from_str("Matrix dimensions don't match"));
}
let mut result = vec![0.0; a_rows * b_cols];
// Parallel matrix multiplication
result
.par_chunks_mut(b_cols)
.enumerate()
.for_each(|(i, row)| {
for j in 0..b_cols {
let mut sum = 0.0;
for k in 0..a_cols {
sum += a[i * a_cols + k] * b[k * b_cols + j];
}
row[j] = sum;
}
});
Ok(result)
}
Advanced Patterns
Work-Stealing and Load Balancing
📁 Reference: parallel-processing.example.rs
use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
// Uneven workload distribution
fn parallel_uneven_work(tasks: Vec<usize>) -> Vec<usize> {
let completed_count = AtomicUsize::new(0);
let results: Vec<_> = tasks
.par_iter()
.map(|&task_size| {
// Simulate variable work amounts
let result = (0..task_size).fold(0, |acc, i| acc + i);
let count = completed_count.fetch_add(1, Ordering::Relaxed);
if count % 100 == 0 {
println!("Completed {} tasks", count + 1);
}
result
})
.collect();
results
}
// Custom parallel join for divide-and-conquer
fn parallel_quicksort<T: Ord + Send>(data: &mut [T]) {
if data.len() <= 1 {
return;
}
let pivot_index = partition(data);
let (left, right) = data.split_at_mut(pivot_index);
// Use rayon::join for parallel divide-and-conquer
rayon::join(
|| parallel_quicksort(left),
|| parallel_quicksort(&mut right[1..])
);
}
fn partition<T: Ord>(data: &mut [T]) -> usize {
let len = data.len();
let pivot_index = len / 2;
data.swap(pivot_index, len - 1);
let mut store_index = 0;
for i in 0..len - 1 {
if data[i] <= data[len - 1] {
data.swap(i, store_index);
store_index += 1;
}
}
data.swap(store_index, len - 1);
store_index
}
Pipeline Parallelism
📁 Reference: See worker-pipeline.example.ts for TypeScript implementation
use crossbeam_channel::{bounded, select};
use rayon::prelude::*;
use std::thread;
// Multi-stage parallel pipeline
struct ParallelPipeline<T> {
input_tx: crossbeam_channel::Sender<T>,
output_rx: crossbeam_channel::Receiver<T>,
}
impl<T: Send + 'static> ParallelPipeline<T> {
fn new<F1, F2, F3, U, V>(
stage1: F1,
stage2: F2,
stage3: F3,
buffer_size: usize,
) -> Self
where
F1: Fn(T) -> U + Send + Sync + 'static,
F2: Fn(U) -> V + Send + Sync + 'static,
F3: Fn(V) -> T + Send + Sync + 'static,
U: Send + 'static,
V: Send + 'static,
{
let (input_tx, input_rx) = bounded(buffer_size);
let (stage1_tx, stage1_rx) = bounded(buffer_size);
let (stage2_tx, stage2_rx) = bounded(buffer_size);
let (output_tx, output_rx) = bounded(buffer_size);
// Stage 1: Parallel processing
thread::spawn(move || {
while let Ok(item) = input_rx.recv() {
let result = stage1(item);
if stage1_tx.send(result).is_err() {
break;
}
}
});
// Stage 2: Parallel processing
thread::spawn(move || {
while let Ok(item) = stage1_rx.recv() {
let result = stage2(item);
if stage2_tx.send(result).is_err() {
break;
}
}
});
// Stage 3: Parallel processing
thread::spawn(move || {
while let Ok(item) = stage2_rx.recv() {
let result = stage3(item);
if output_tx.send(result).is_err() {
break;
}
}
});
ParallelPipeline { input_tx, output_rx }
}
fn process(&self, item: T) -> Result<(), crossbeam_channel::SendError<T>> {
self.input_tx.send(item)
}
fn try_recv(&self) -> Result<T, crossbeam_channel::TryRecvError> {
self.output_rx.try_recv()
}
}
Performance Optimization
Memory Access Patterns
📁 Reference: parallel-processing.example.rs
use rayon::prelude::*;
// Cache-friendly parallel processing
fn cache_friendly_sum(matrix: &[Vec<i32>]) -> i32 {
// Row-major access (cache-friendly)
matrix
.par_iter()
.map(|row| row.iter().sum::<i32>())
.sum()
}
// NUMA-aware processing
fn numa_aware_processing(data: &[f64]) -> Vec<f64> {
let chunk_size = data.len() / rayon::current_num_threads();
data.par_chunks(chunk_size)
.flat_map(|chunk| {
// Process each chunk on the same NUMA node
chunk.iter().map(|&x| expensive_math_operation(x))
})
.collect()
}
fn expensive_math_operation(x: f64) -> f64 {
x.sin().cos().tan().sqrt()
}
// False sharing avoidance
use std::sync::atomic::{AtomicU64, Ordering};
struct PaddedCounter {
value: AtomicU64,
_padding: [u8; 64 - 8], // Cache line padding
}
fn avoid_false_sharing(data: &[i32]) -> u64 {
let num_threads = rayon::current_num_threads();
let counters: Vec<PaddedCounter> = (0..num_threads)
.map(|_| PaddedCounter {
value: AtomicU64::new(0),
_padding: [0; 56],
})
.collect();
data.par_chunks(data.len() / num_threads)
.enumerate()
.for_each(|(thread_id, chunk)| {
let local_sum: u64 = chunk.iter().map(|&x| x as u64).sum();
counters[thread_id].value.store(local_sum, Ordering::Relaxed);
});
counters
.iter()
.map(|counter| counter.value.load(Ordering::Relaxed))
.sum()
}
📁 Complete Example: See run_all_examples() for a comprehensive demonstration of all patterns.
Use .par_iter()
for CPU-bound operations on large datasets. Rayon's
work-stealing scheduler automatically balances load across available cores.
Don't use Rayon for I/O-bound operations or very small datasets. The overhead of parallelization can exceed the benefits for small workloads.
Additional Examples
📁 Advanced Examples:
- WasmMatrixProcessor - High-performance matrix operations
- WasmImageProcessor - Parallel image processing
- WasmBatchProcessor - Memory-efficient batch operations
- Performance Benchmarking - Comparing sequential vs parallel performance
WebAssembly Considerations
Thread Pool Sizing
📁 Reference: wasm-rayon-integration.example.rs
// JavaScript side - detecting available cores
const numCores = navigator.hardwareConcurrency || 4;
// Initialize WASM with appropriate thread count
import init, { ParallelProcessor } from './pkg/my_wasm_module.js';
async function initializeWasm() {
await init();
// Reserve one core for main thread
const workerThreads = Math.max(1, numCores - 1);
try {
const processor = new ParallelProcessor(workerThreads);
return processor;
} catch (error) {
console.warn('Failed to create parallel processor:', error);
// Fallback to single-threaded
return new ParallelProcessor(1);
}
}
// Usage with proper error handling
async function processData(data) {
const processor = await initializeWasm();
try {
const result = processor.parallel_sum(data);
return result;
} catch (error) {
console.error('Processing failed:', error);
throw error;
}
}
Memory Management
📁 Reference: wasm-rayon-integration.example.rs
use wasm_bindgen::prelude::*;
#[wasm_bindgen]
pub struct MemoryEfficientProcessor {
buffer: Vec<f64>,
scratch_space: Vec<f64>,
}
#[wasm_bindgen]
impl MemoryEfficientProcessor {
#[wasm_bindgen(constructor)]
pub fn new(max_size: usize) -> Self {
Self {
buffer: Vec::with_capacity(max_size),
scratch_space: Vec::with_capacity(max_size),
}
}
#[wasm_bindgen]
pub fn process_inplace(&mut self, data: &[f64]) -> Vec<f64> {
// Reuse allocated buffers to avoid GC pressure
self.buffer.clear();
self.buffer.extend_from_slice(data);
self.scratch_space.clear();
self.scratch_space.resize(data.len(), 0.0);
// Parallel processing with pre-allocated memory
self.buffer
.par_chunks_mut(1024) // Process in chunks
.zip(self.scratch_space.par_chunks_mut(1024))
.for_each(|(input_chunk, output_chunk)| {
for (input, output) in input_chunk.iter().zip(output_chunk.iter_mut()) {
*output = input.sqrt() * 2.0;
}
});
self.scratch_space.clone()
}
}
How would you implement a parallel map-reduce operation in Rust that processes a large dataset while minimizing memory allocations and maximizing cache efficiency?
Strong candidates should mention:
- Chunking strategy: Divide data into cache-friendly chunks (typically 4KB-64KB)
- Rayon parallel iterators: Use
.par_chunks()
or.par_chunks_mut()
for memory locality - Local reduction: Perform reduction within each thread's chunk before combining results
- Memory pre-allocation: Reuse buffers and avoid allocations in hot loops
- NUMA awareness: Consider thread affinity and memory placement on multi-socket systems
They should also understand the trade-offs between parallelism overhead and the size of work units.
Signals of Mastery
- Understands work-stealing vs thread pools
- Can optimize memory access patterns for parallel code
- Knows when to use Rayon vs manual threading
- Handles WebAssembly thread initialization properly
- Implements efficient parallel algorithms with proper load balancing
Red Flags
- Uses parallel iterators for small datasets or I/O operations
- Ignores false sharing and cache line effects
- Doesn't handle WebAssembly threading initialization errors
- Creates excessive memory allocations in parallel code
- Uses blocking operations inside parallel iterators