Skip to main content

Rayon & Thread Parallelism

Rayon provides data parallelism in Rust with minimal overhead, while WebAssembly's threading model enables true parallel execution in web browsers. Understanding both is crucial for high-performance web applications.

Rayon Fundamentals

Parallel Iterators

📁 Reference: parallel-processing.example.rs

use rayon::prelude::*;

// Sequential processing
let data: Vec<i32> = (0..1_000_000).collect();
let sum: i32 = data.iter().map(|x| x * x).sum();

// Parallel processing - just add .par_iter()
let parallel_sum: i32 = data.par_iter().map(|x| x * x).sum();

// More complex parallel operations
let results: Vec<_> = data
.par_iter()
.filter(|&&x| x % 2 == 0)
.map(|&x| expensive_computation(x))
.collect();

fn expensive_computation(x: i32) -> i32 {
// Simulate CPU-intensive work
(0..1000).fold(x, |acc, i| acc + i * x)
}

Parallel Patterns

📁 Reference: parallel-processing.example.rs

use rayon::prelude::*;

// Parallel map-reduce
fn parallel_word_count(texts: &[String]) -> usize {
texts
.par_iter()
.map(|text| text.split_whitespace().count())
.sum()
}

// Parallel search
fn parallel_find_max(data: &[f64]) -> Option<f64> {
data.par_iter()
.copied()
.max_by(|a, b| a.partial_cmp(b).unwrap())
}

// Parallel sorting
fn parallel_sort_by_key<T, K>(data: &mut [T], key_fn: impl Fn(&T) -> K + Sync)
where
T: Send,
K: Ord,
{
data.par_sort_by_key(key_fn);
}

// Custom parallel reduction
fn parallel_variance(data: &[f64]) -> f64 {
let mean = data.par_iter().sum::<f64>() / data.len() as f64;

let variance = data
.par_iter()
.map(|&x| (x - mean).powi(2))
.sum::<f64>() / data.len() as f64;

variance
}

Thread Pool Configuration

Custom Thread Pools

📁 Reference: parallel-processing.example.rs

use rayon::{ThreadPool, ThreadPoolBuilder};
use std::sync::Arc;

// Configure global thread pool
fn configure_global_pool() -> Result<(), rayon::ThreadPoolBuildError> {
rayon::ThreadPoolBuilder::new()
.num_threads(4)
.stack_size(2 * 1024 * 1024) // 2MB stack
.thread_name(|i| format!("rayon-worker-{}", i))
.build_global()
}

// Create custom thread pool for specific tasks
fn create_specialized_pool() -> ThreadPool {
ThreadPoolBuilder::new()
.num_threads(2)
.thread_name(|i| format!("compute-{}", i))
.build()
.expect("Failed to create thread pool")
}

// Use custom pool with install()
fn use_custom_pool() {
let pool = create_specialized_pool();

pool.install(|| {
// All rayon operations in this closure use the custom pool
let data: Vec<i32> = (0..1000).collect();
let result: i32 = data.par_iter().map(|x| x * 2).sum();
println!("Result: {}", result);
});
}

// Scoped thread pool for RAII
fn scoped_parallel_work() {
let data = vec![1, 2, 3, 4, 5];
let mut results = vec![0; data.len()];

rayon::scope(|s| {
for (i, &item) in data.iter().enumerate() {
s.spawn(move |_| {
// Expensive computation
results[i] = expensive_computation(item);
});
}
}); // All spawned tasks complete here

println!("Results: {:?}", results);
}

WebAssembly Integration

WASM Thread Configuration

📁 Reference: wasm-rayon-integration.example.rs

use wasm_bindgen::prelude::*;
use rayon::prelude::*;

#[wasm_bindgen]
pub struct ParallelProcessor {
thread_pool: Option<rayon::ThreadPool>,
}

#[wasm_bindgen]
impl ParallelProcessor {
#[wasm_bindgen(constructor)]
pub fn new(num_threads: usize) -> Result<ParallelProcessor, JsValue> {
// Enable console_error_panic_hook for better debugging
#[cfg(feature = "console_error_panic_hook")]
console_error_panic_hook::set_once();

// Initialize Rayon with specified thread count
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.map_err(|e| JsValue::from_str(&format!("Failed to create thread pool: {}", e)))?;

Ok(ParallelProcessor {
thread_pool: Some(pool),
})
}

#[wasm_bindgen]
pub fn parallel_sum(&self, data: &[i32]) -> i32 {
if let Some(ref pool) = self.thread_pool {
pool.install(|| {
data.par_iter().sum()
})
} else {
data.iter().sum()
}
}

#[wasm_bindgen]
pub fn parallel_map_square(&self, data: &[i32]) -> Vec<i32> {
if let Some(ref pool) = self.thread_pool {
pool.install(|| {
data.par_iter().map(|&x| x * x).collect()
})
} else {
data.iter().map(|&x| x * x).collect()
}
}
}

// Helper for complex parallel operations
// Full implementation: https://github.com/BaDaaS/web-learning/blob/main/examples/rust-wasm/wasm-rayon-integration.example.rs#L97-L145
#[wasm_bindgen]
pub fn parallel_matrix_multiply(
a: &[f64], a_rows: usize, a_cols: usize,
b: &[f64], b_rows: usize, b_cols: usize,
) -> Result<Vec<f64>, JsValue> {
if a_cols != b_rows {
return Err(JsValue::from_str("Matrix dimensions don't match"));
}

let mut result = vec![0.0; a_rows * b_cols];

// Parallel matrix multiplication
result
.par_chunks_mut(b_cols)
.enumerate()
.for_each(|(i, row)| {
for j in 0..b_cols {
let mut sum = 0.0;
for k in 0..a_cols {
sum += a[i * a_cols + k] * b[k * b_cols + j];
}
row[j] = sum;
}
});

Ok(result)
}

Advanced Patterns

Work-Stealing and Load Balancing

📁 Reference: parallel-processing.example.rs

use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};

// Uneven workload distribution
fn parallel_uneven_work(tasks: Vec<usize>) -> Vec<usize> {
let completed_count = AtomicUsize::new(0);

let results: Vec<_> = tasks
.par_iter()
.map(|&task_size| {
// Simulate variable work amounts
let result = (0..task_size).fold(0, |acc, i| acc + i);

let count = completed_count.fetch_add(1, Ordering::Relaxed);
if count % 100 == 0 {
println!("Completed {} tasks", count + 1);
}

result
})
.collect();

results
}

// Custom parallel join for divide-and-conquer
fn parallel_quicksort<T: Ord + Send>(data: &mut [T]) {
if data.len() <= 1 {
return;
}

let pivot_index = partition(data);
let (left, right) = data.split_at_mut(pivot_index);

// Use rayon::join for parallel divide-and-conquer
rayon::join(
|| parallel_quicksort(left),
|| parallel_quicksort(&mut right[1..])
);
}

fn partition<T: Ord>(data: &mut [T]) -> usize {
let len = data.len();
let pivot_index = len / 2;
data.swap(pivot_index, len - 1);

let mut store_index = 0;
for i in 0..len - 1 {
if data[i] <= data[len - 1] {
data.swap(i, store_index);
store_index += 1;
}
}
data.swap(store_index, len - 1);
store_index
}

Pipeline Parallelism

📁 Reference: See worker-pipeline.example.ts for TypeScript implementation

use crossbeam_channel::{bounded, select};
use rayon::prelude::*;
use std::thread;

// Multi-stage parallel pipeline
struct ParallelPipeline<T> {
input_tx: crossbeam_channel::Sender<T>,
output_rx: crossbeam_channel::Receiver<T>,
}

impl<T: Send + 'static> ParallelPipeline<T> {
fn new<F1, F2, F3, U, V>(
stage1: F1,
stage2: F2,
stage3: F3,
buffer_size: usize,
) -> Self
where
F1: Fn(T) -> U + Send + Sync + 'static,
F2: Fn(U) -> V + Send + Sync + 'static,
F3: Fn(V) -> T + Send + Sync + 'static,
U: Send + 'static,
V: Send + 'static,
{
let (input_tx, input_rx) = bounded(buffer_size);
let (stage1_tx, stage1_rx) = bounded(buffer_size);
let (stage2_tx, stage2_rx) = bounded(buffer_size);
let (output_tx, output_rx) = bounded(buffer_size);

// Stage 1: Parallel processing
thread::spawn(move || {
while let Ok(item) = input_rx.recv() {
let result = stage1(item);
if stage1_tx.send(result).is_err() {
break;
}
}
});

// Stage 2: Parallel processing
thread::spawn(move || {
while let Ok(item) = stage1_rx.recv() {
let result = stage2(item);
if stage2_tx.send(result).is_err() {
break;
}
}
});

// Stage 3: Parallel processing
thread::spawn(move || {
while let Ok(item) = stage2_rx.recv() {
let result = stage3(item);
if output_tx.send(result).is_err() {
break;
}
}
});

ParallelPipeline { input_tx, output_rx }
}

fn process(&self, item: T) -> Result<(), crossbeam_channel::SendError<T>> {
self.input_tx.send(item)
}

fn try_recv(&self) -> Result<T, crossbeam_channel::TryRecvError> {
self.output_rx.try_recv()
}
}

Performance Optimization

Memory Access Patterns

📁 Reference: parallel-processing.example.rs

use rayon::prelude::*;

// Cache-friendly parallel processing
fn cache_friendly_sum(matrix: &[Vec<i32>]) -> i32 {
// Row-major access (cache-friendly)
matrix
.par_iter()
.map(|row| row.iter().sum::<i32>())
.sum()
}

// NUMA-aware processing
fn numa_aware_processing(data: &[f64]) -> Vec<f64> {
let chunk_size = data.len() / rayon::current_num_threads();

data.par_chunks(chunk_size)
.flat_map(|chunk| {
// Process each chunk on the same NUMA node
chunk.iter().map(|&x| expensive_math_operation(x))
})
.collect()
}

fn expensive_math_operation(x: f64) -> f64 {
x.sin().cos().tan().sqrt()
}

// False sharing avoidance
use std::sync::atomic::{AtomicU64, Ordering};

struct PaddedCounter {
value: AtomicU64,
_padding: [u8; 64 - 8], // Cache line padding
}

fn avoid_false_sharing(data: &[i32]) -> u64 {
let num_threads = rayon::current_num_threads();
let counters: Vec<PaddedCounter> = (0..num_threads)
.map(|_| PaddedCounter {
value: AtomicU64::new(0),
_padding: [0; 56],
})
.collect();

data.par_chunks(data.len() / num_threads)
.enumerate()
.for_each(|(thread_id, chunk)| {
let local_sum: u64 = chunk.iter().map(|&x| x as u64).sum();
counters[thread_id].value.store(local_sum, Ordering::Relaxed);
});

counters
.iter()
.map(|counter| counter.value.load(Ordering::Relaxed))
.sum()
}

📁 Complete Example: See run_all_examples() for a comprehensive demonstration of all patterns.

Rayon Best Practices

Use .par_iter() for CPU-bound operations on large datasets. Rayon's work-stealing scheduler automatically balances load across available cores.

Common Pitfalls

Don't use Rayon for I/O-bound operations or very small datasets. The overhead of parallelization can exceed the benefits for small workloads.

Additional Examples

📁 Advanced Examples:

WebAssembly Considerations

Thread Pool Sizing

📁 Reference: wasm-rayon-integration.example.rs

// JavaScript side - detecting available cores
const numCores = navigator.hardwareConcurrency || 4;

// Initialize WASM with appropriate thread count
import init, { ParallelProcessor } from './pkg/my_wasm_module.js';

async function initializeWasm() {
await init();

// Reserve one core for main thread
const workerThreads = Math.max(1, numCores - 1);

try {
const processor = new ParallelProcessor(workerThreads);
return processor;
} catch (error) {
console.warn('Failed to create parallel processor:', error);
// Fallback to single-threaded
return new ParallelProcessor(1);
}
}

// Usage with proper error handling
async function processData(data) {
const processor = await initializeWasm();

try {
const result = processor.parallel_sum(data);
return result;
} catch (error) {
console.error('Processing failed:', error);
throw error;
}
}

Memory Management

📁 Reference: wasm-rayon-integration.example.rs

use wasm_bindgen::prelude::*;

#[wasm_bindgen]
pub struct MemoryEfficientProcessor {
buffer: Vec<f64>,
scratch_space: Vec<f64>,
}

#[wasm_bindgen]
impl MemoryEfficientProcessor {
#[wasm_bindgen(constructor)]
pub fn new(max_size: usize) -> Self {
Self {
buffer: Vec::with_capacity(max_size),
scratch_space: Vec::with_capacity(max_size),
}
}

#[wasm_bindgen]
pub fn process_inplace(&mut self, data: &[f64]) -> Vec<f64> {
// Reuse allocated buffers to avoid GC pressure
self.buffer.clear();
self.buffer.extend_from_slice(data);

self.scratch_space.clear();
self.scratch_space.resize(data.len(), 0.0);

// Parallel processing with pre-allocated memory
self.buffer
.par_chunks_mut(1024) // Process in chunks
.zip(self.scratch_space.par_chunks_mut(1024))
.for_each(|(input_chunk, output_chunk)| {
for (input, output) in input_chunk.iter().zip(output_chunk.iter_mut()) {
*output = input.sqrt() * 2.0;
}
});

self.scratch_space.clone()
}
}
Interview Question

How would you implement a parallel map-reduce operation in Rust that processes a large dataset while minimizing memory allocations and maximizing cache efficiency?

Expected Response

Strong candidates should mention:

  • Chunking strategy: Divide data into cache-friendly chunks (typically 4KB-64KB)
  • Rayon parallel iterators: Use .par_chunks() or .par_chunks_mut() for memory locality
  • Local reduction: Perform reduction within each thread's chunk before combining results
  • Memory pre-allocation: Reuse buffers and avoid allocations in hot loops
  • NUMA awareness: Consider thread affinity and memory placement on multi-socket systems

They should also understand the trade-offs between parallelism overhead and the size of work units.

Signals of Mastery

  • Understands work-stealing vs thread pools
  • Can optimize memory access patterns for parallel code
  • Knows when to use Rayon vs manual threading
  • Handles WebAssembly thread initialization properly
  • Implements efficient parallel algorithms with proper load balancing

Red Flags

  • Uses parallel iterators for small datasets or I/O operations
  • Ignores false sharing and cache line effects
  • Doesn't handle WebAssembly threading initialization errors
  • Creates excessive memory allocations in parallel code
  • Uses blocking operations inside parallel iterators