flowchart LR
start((Start)) --> one["`Create<br>a cluster`"]
one --> two
subgraph two[Prepare the session]
direction TB
copy[Copy objects]~~~eval[Evaluate<br>expressions]
eval~~~seed[Set seed]
end
two --> three[Do your<br>call]
three --> four[Stop the<br>cluster]
The parallel R package
This content was originally published in the book “Applied HPC with R” by George G. Vega Yon, Ph.D. You can find the book at https://book-hpc.ggvy.cl
Although R was not built for parallel computing, multiple ways of parallelizing your R code exist. One of these is the parallel package. This R package, shipped with base R, provides various functions to parallelize R code using embarrassingly parallel computing, i.e., a divide-and-conquer-type strategy. The basic idea is to start multiple R sessions (usually called child processes), connect the main session with those, and send them instructions. This section goes over a common workflow to work with R’s parallel.
Parallel workflow
(Usually) We do the following:
Create a
PSOCK/FORK(or other) cluster usingmakePSOCKCluster/makeForkCluster(ormakeCluster). How many child processes will depend on how many threads your computer has. A rule of thumb is to useparallel::detectCores() - 1cores (so you leave one free for the rest of your computer).Copy/prepare each R session (if you are using a
PSOCKcluster):Copy objects with
clusterExport. This would be all the objects that you need in the child sessions.Pass expressions with
clusterEvalQ. This would include loading R packages and other code into the other sessions.Set a seed (if you are doing something that involves randomness)
Do your call:
parApply,parLapply, etc.Stop the cluster with
clusterStop
As we mention later, step 2 will depend on the type of cluster you are using. If you are using a Socket connection (PSOCK cluster), then the spawned R sessions will be completely, fresh (no data or R packages pre-loaded); whereas using a Fork connection (FORK cluster) will copy the current R session, including all objects and loaded packages.
Types of clusters: PSOCK
Can be created with
makePSOCKClusterCreates brand new R Sessions (so nothing is inherited from the master), e.g.
# This creates a cluster with 4 R sessions cl <- makePSOCKCluster(4)Child sessions are connected to the master session via Socket connections
Can be created outside the current computer, i.e., across multiple computers!
Types of clusters: Fork
Fork Cluster
makeForkCluster:Uses OS Forking,
Copies the current R session locally (so everything is inherited from the master up to that point).
Data is only duplicated if altered (need to double check when this happens!)
Not available on Windows.
Other types are available via the function makeCluster from the snow R package (Simple Network of Workstations). These include MPI (Message Passing Interface) clusters and Slurm (Socket) clusters.
A template program
The following code chunk shows a template for using the parallel package in R. You can copy this and comment the bits that you don’t need:
library(parallel)
# 1. CREATING A CLUSTER ----------------
nnodes <- 4L # Could be less or more!
cl <- makePSOCKcluster(nnodes)
# 2. PREPARING THE CLUSTER -------------
# Mostly if using PSOCK
clusterEvalQ(cl, {
library(...) # Loading the necesary packages
source(...) # Source additional scripts
})
# Always if you are doing random numbers
clusterSetRNGStream(cl, 123)
# 3. DO YOUR CALL ----------------------
ans <- parLapply(
cl,
... long list to iterate ...,
function(x) {
...
},
... further arguments ...
)
# 4. STOP THE CLUSTER
stopCluster(cl)Generally, the ... long list to iterate ... will be a vector or another list that contains either data (e.g., individual datasets), a sequence of numbers (e.g., from 1 to 1000), a list of file paths (if you were processing files individually), or directly a short sequence with numbers from 1 to the number of nodes (least common application).
When calling parLapply or parSapply (the parallel versions of lapply and sapply respectively), the function call will automatically split the iterations across nodes using the splitIndices function. Here is an example of what happens under the hood:
# Distributing 9 iterations across two cores
(n_iterations <- parallel::splitIndices(nx = 9, ncl = 2))[[1]]
[1] 1 2 3 4
[[2]]
[1] 5 6 7 8 9
Which means that the first R session will get 4 jobs, wereas the second R session will get 5 jobs. This way, each spawned R session (child session) gets to do a similiar number of iterations.
Examples
The following three examples are a simple application of the package in which we are explicitly running as many replications as threads the cluster has. Generally, the number of replicates will be a function of the data.
Ex 1: Parallel RNG with makePSOCKCluster
Using more threads than cores available on your computer is never a good idea. As a rule of thumb, clusters should be created using parallel::detectCores() - 1 cores (so you leave one free for the rest of your computer.)
# 1. CREATING A CLUSTER
library(parallel)
nnodes <- 4L
cl <- makePSOCKcluster(nnodes)
# 2. PREPARING THE CLUSTER
clusterSetRNGStream(cl, 123) # Equivalent to `set.seed(123)`
# 3. DO YOUR CALL
ans <- parSapply(cl, 1:nnodes, function(x) runif(1e3))
(ans0 <- var(ans)) [,1] [,2] [,3] [,4]
[1,] 0.0861888293 -0.0001633431 5.939143e-04 -3.672845e-04
[2,] -0.0001633431 0.0853841838 2.390790e-03 -1.462154e-04
[3,] 0.0005939143 0.0023907904 8.114219e-02 -4.714618e-06
[4,] -0.0003672845 -0.0001462154 -4.714618e-06 8.467722e-02
Making sure it is reproducible
# I want to get the same!
clusterSetRNGStream(cl, 123)
ans1 <- var(parSapply(cl, 1:nnodes, function(x) runif(1e3)))
# 4. STOP THE CLUSTER
stopCluster(cl)
all.equal(ans0, ans1) # All equal![1] TRUE
Ex 2: Parallel RNG with makeForkCluster
In the case of makeForkCluster
# 1. CREATING A CLUSTER
library(parallel)
# The fork cluster will copy the -nsims- object
nsims <- 1e3
nnodes <- 4L
cl <- makeForkCluster(nnodes)
# 2. PREPARING THE CLUSTER
clusterSetRNGStream(cl, 123)
# 3. DO YOUR CALL
ans <- do.call(cbind, parLapply(cl, 1:nnodes, function(x) {
runif(nsims) # Look! we use the nsims object!
# This would have fail in makePSOCKCluster
# if we didn't copy -nsims- first.
}))
(ans0 <- var(ans)) [,1] [,2] [,3] [,4]
[1,] 0.0861888293 -0.0001633431 5.939143e-04 -3.672845e-04
[2,] -0.0001633431 0.0853841838 2.390790e-03 -1.462154e-04
[3,] 0.0005939143 0.0023907904 8.114219e-02 -4.714618e-06
[4,] -0.0003672845 -0.0001462154 -4.714618e-06 8.467722e-02
Again, we want to make sure this is reproducible
# Same sequence with same seed
clusterSetRNGStream(cl, 123)
ans1 <- var(do.call(cbind, parLapply(cl, 1:nnodes, function(x) runif(nsims))))
ans0 - ans1 # A matrix of zeros [,1] [,2] [,3] [,4]
[1,] 0 0 0 0
[2,] 0 0 0 0
[3,] 0 0 0 0
[4,] 0 0 0 0
# 4. STOP THE CLUSTER
stopCluster(cl)Ex 3: Parallel RNG with mclapply (Forking on the fly)
In the case of mclapply, the forking (cluster creation) is done on the fly!
# 1. CREATING A CLUSTER
library(parallel)
# The fork cluster will copy the -nsims- object
nsims <- 1e3
nnodes <- 4L
# cl <- makeForkCluster(nnodes) # mclapply does it on the fly
# 2. PREPARING THE CLUSTER
set.seed(123)
# 3. DO YOUR CALL
ans <- do.call(cbind, mclapply(1:nnodes, function(x) runif(nsims)))
(ans0 <- var(ans)) [,1] [,2] [,3] [,4]
[1,] 0.085384184 0.002390790 0.006576204 -0.003998278
[2,] 0.002390790 0.081142190 0.001846963 0.001476244
[3,] 0.006576204 0.001846963 0.085175347 -0.002807348
[4,] -0.003998278 0.001476244 -0.002807348 0.082425477
Once more, we want to make sure this is reproducible
# Same sequence with same seed
set.seed(123)
ans1 <- var(do.call(cbind, mclapply(1:nnodes, function(x) runif(nsims))))
ans0 - ans1 # A matrix of zeros [,1] [,2] [,3] [,4]
[1,] 0 0 0 0
[2,] 0 0 0 0
[3,] 0 0 0 0
[4,] 0 0 0 0
# 4. STOP THE CLUSTER
# stopCluster(cl) no need of doing this anymoreExercise: Overhead costs
Compare the timing of taking the sum of 100 numbers when parallelized versus not. For the unparallized (serialized) version, use the following:
set.seed(123)
x <- runif(n=100)
serial_sum <- function(x){
x_sum <- sum(x)
return(x_sum)
}For the parallized version, follow this outline
library(parallel)set.seed(123)
x <- runif(n=100)
parallel_sum <- function(){
# Set number of cores to use
# make cluster and export to the cluster the x variable
# Use "split function to divide x up into as many chunks as the number of cores
# Calculate partial sums doing something like:
partial_sums <- parallel::parSapply(cl, x_split, sum)
# Stop the cluster
# Add and return the partial sums
}Compare the timing of the two approaches:
microbenchmark::microbenchmark(
serial = serial_sum(x),
parallel = parallel_sum(x),
times = 10,
unit = "relative"
)