pbdMPI - Parallel and Big Data interface to MPI
Last updated on 2024-02-06 | Edit this page
Estimated time: 12 minutes
Overview
Questions
- What types of functions does MPI provide?
Objectives
- Demonstrate some of the functionality of the pbd bindings to the Message Passing Interface (MPI)
Introduction
Hello World in Serial
R
library( pbdMPI, quiet = TRUE )
text = paste( "Hello, world from", comm.rank() )
print( text )
finalize()
Rank
R
library( pbdMPI, quiet = TRUE )
my.rank <- comm.rank()
comm.print( my.rank, all.rank = TRUE )
finalize()
Hello World in Parallel
R
library( pbdMPI, quiet = TRUE )
print( "Hello, world print" )
comm.print( "Hello, world comm.print" )
comm.print( "Hello from all", all.rank = TRUE, quiet = TRUE )
finalize()
Map Reduce
R
library( pbdMPI , quiet = TRUE)
## Your "Map" code
n = comm.rank() + 1
## Now "Reduce" but give the result to all
all_sum = allreduce( n ) # Sum is default
text = paste( "Hello: n is", n, "sum is", all_sum )
comm.print( text, all.rank = TRUE )
finalize ()
Calculate Pi
R
### Compute pi by simulaton
library( pbdMPI, quiet = TRUE )
comm.set.seed( seed = 1234567, diff = TRUE )
my.N = 1e7 %/% comm.size()
my.X = matrix( runif( my.N * 2 ), ncol = 2 )
my.r = sum( rowSums( my.X^2 ) <= 1 )
r = allreduce( my.r )
PI = 4*r / ( my.N * comm.size() )
comm.print( PI )
finalize()
Broadcast
R
library( pbdMPI, quiet = TRUE )
if ( comm.rank() == 0 ){
x = matrix( 1:4, nrow = 2 )
} else {
x = NULL
}
y = bcast( x )
comm.print( y, all.rank = TRUE )
comm.print( x, all.rank = TRUE )
finalize()
Gather
R
library( pbdMPI, quiet = TRUE )
comm.set.seed( seed = 1234567, diff = TRUE )
my_rank = comm.rank()
n = sample( 1:10, size = my_rank + 1 )
comm.print(n, all.rank = TRUE)
gt = gather(n)
obj_len = gather(length(n))
comm.cat("gathered unequal size objects. lengths =", unlist(obj_len), "\n")
comm.print( unlist( gt ), all.rank = TRUE )
finalize()
Gather Unequal
R
library( pbdMPI, quiet = TRUE )
comm.set.seed( seed = 1234567, diff = TRUE )
my_rank = comm.rank( )
n = sample( 1:10, size = my_rank + 1 )
comm.print( n, all.rank = TRUE )
gt = gather( n )
obj_len = gather( length( n ) )
comm.cat( "gathered unequal size objects. lengths =", obj_len, "\n" )
comm.print( unlist( gt ), all.rank = TRUE )
finalize( )
Gather Named
R
library( pbdMPI, quiet = TRUE )
comm.set.seed( seed = 1234567, diff = TRUE )
my_rank = comm.rank()
n = sample( 1:10, size = my_rank + 1 )
names(n) = paste0("a", 1:(my_rank + 1))
comm.print(n, all.rank = TRUE)
gt = gather( n )
comm.print( unlist( gt ), all.rank = TRUE )
finalize()
Chunk
R
library( pbdMPI, quiet = TRUE )
my.rank = comm.rank( )
k = comm.chunk( 10 )
comm.cat( my.rank, ":", k, "\n", all.rank = TRUE, quiet = TRUE)
k = comm.chunk( 10 , form = "vector")
comm.cat( my.rank, ":", k, "\n", all.rank = TRUE, quiet = TRUE)
k = comm.chunk( 10 , form = "vector", type = "equal")
comm.cat( my.rank, ":", k, "\n", all.rank = TRUE, quiet = TRUE)
finalize( )
Timing
R
library( pbdMPI, quiet = TRUE )
comm.set.seed( seed = 1234567, diff = T )
test = function( timed )
{
ltime = system.time( timed )[ 3 ]
mintime = allreduce( ltime, op='min' )
maxtime = allreduce( ltime, op='max' )
meantime = allreduce( ltime, op='sum' ) / comm.size()
return( data.frame( min = mintime, mean = meantime, max = maxtime ) )
}
# generate 10,000,000 random normal values (total)
times = test( rnorm( 1e7/comm.size() ) ) # ~76 MiB of data
comm.print( times )
finalize()
Are there bindings to MPI_wtime()?
Covariance
R
library( pbdMPI, quiet = TRUE )
comm.set.seed( seed = 1234567, diff = TRUE )
## Generate 10 rows and 3 columns of data per process
my.X = matrix( rnorm(10*3), ncol = 3 )
## Compute mean
N = allreduce( nrow( my.X ), op = "sum" )
mu = allreduce( colSums( my.X ) / N, op = "sum" )
## Sweep out mean and compute crossproducts sum
my.X = sweep( my.X, STATS = mu, MARGIN = 2 )
Cov.X = allreduce( crossprod( my.X ), op = "sum" ) / ( N - 1 )
comm.print( Cov.X )
finalize()
Matrix reduction
R
library( pbdMPI, quiet = TRUE )
x <- matrix( 10*comm.rank() + (1:6), nrow = 2 )
comm.print( x, all.rank = TRUE )
z <- reduce( x ) # knows it's a matrix
comm.print( z, all.rank = TRUE )
finalize()
Ordinary Least Squares
R
### Least Squares Fit wia Normal Equations (see lm.fit for a better way)
library( pbdMPI, quiet = TRUE )
comm.set.seed( seed = 12345, diff = TRUE )
## 10 rows and 3 columns of data per process
my.X = matrix( rnorm(10*3), ncol = 3 )
my.y = matrix( rnorm(10*1), ncol = 1 )
## Form the Normal Equations components
my.Xt = t( my.X )
XtX = allreduce( my.Xt %*% my.X, op = "sum" )
Xty = allreduce( my.Xt %*% my.y, op = "sum" )
## Everyone solve the Normal Equations
ols = solve( XtX, Xty )
comm.print( ols )
finalize()
QR Decomposition
R
library(cop, quiet = TRUE)
rank = comm.rank()
size = comm.size()
rows = 3
cols = 3
xb = matrix((1:(rows*cols*size))^2, ncol = cols) # a full matrix
xa = xb[(1:rows) + rank*rows, ] # split by row blocks
comm.print(xa, all.rank = TRUE)
comm.print(xb)
## compute usual QR from full matrix
rb = qr.R(qr(xb))
comm.print(rb)
## compute QR from gathered local QRs
rloc = qr.R(qr(xa)) # compute local QRs
rra = allgather(rloc) # gather them into a list
rra = do.call(rbind, rra) # rbind list elements
comm.print(rra) # print combined local QRs
ra = qr.R(qr(rra)) # QR the combined local QRs
comm.print(ra)
## use cop package to do it again via qr_allreduce
ra = qr_allreduce(xa)
comm.print(ra)
finalize()
Collective communication for a one dimensional domain decomposition
R
## Splits the world communicator into two sets of smaller communicators and
## demonstrates how a sum collective works
library(pbdMPI)
.pbd_env$SPMD.CT
comm_world = .pbd_env$SPMD.CT$comm # default communicator
my_rank = comm.rank(comm_world) # my default rank in world communicator
comm_new = 5L # new communicators can be 5 and up (0-4 are taken)
row_color = my_rank %/% 2L # set new partition colors and split accordingly
comm.split(comm_world, color = row_color, key = my_rank, newcomm = comm_new)
barrier()
my_newrank = comm.rank(comm_new)
comm.cat("comm_world:", comm_world, "comm_new", comm_new, "row_color:",
row_color, "my_rank:", my_rank, "my_newrank", my_newrank, "\n",
all.rank = TRUE)
x = my_rank + 1
comm.cat("x", x, "\n", all.rank = TRUE, comm = comm_world)
xa = allreduce(x, comm = comm_world)
xb = allreduce(x, comm = comm_new)
comm.cat("xa", xa, "xb", xb, "\n", all.rank = TRUE, comm = comm_world)
comm.free(comm_new)
finalize()
Collective communication for a two dimensional domain decomposition
R
## Run with:
## mpiexec -np 32 Rscript comm_split8x4.R
##
## Splits a 32-rank communicator into 4 row-communicators of size 8 and
## othogonal to them 8 column communicators of size 4. Prints rank assignments,
## and demonstrates how sum collectives work in each set of communicators.
##
## Useful row ooperations or column operations on tile-distrobued matrices. But
## note there is package pbdDMAT that already has these operations powered by
## ScaLAPACK.
## It can also serve for any two levels of distributed parallelism that are
## nested.
##
library(pbdMPI)
ncol = 8
nrow = 4
if(comm.size() != ncol*nrow) stop("Error: Must run with -np 32")
## Get world communicator rank
comm_w = .pbd_env$SPMD.CT$comm # world communicator (normallly assigned 0)
rank_w = comm.rank(comm_w) # world rank
## Split comm_w into ncol communicators of size nrow
comm_c = 5L # assign them a number
color_c = rank_w %/% nrow # ranks of same color are in same communicator
comm.split(comm_w, color = color_c, key = rank_w, newcomm = comm_c)
## Split comm_w into nrow communicators of size ncol
comm_r = 6L # assign them a number
color_r = rank_w %% nrow # make these orthogonal to the row communicators
comm.split(comm_w, color = color_r, key = rank_w, newcomm = comm_r)
## Print the resulting communicator colors and ranks
comm.cat(comm.rank(comm = comm_w),
paste0("(", color_r, ":", comm.rank(comm = comm_r), ")"),
paste0("(", color_c, ":", comm.rank(comm = comm_c), ")"),
"\n", all.rank = TRUE, quiet = TRUE, comm = comm_w)
## Print sums of rank numbers across each communicator to illustrate collectives
x = comm.rank(comm_w)
w = allreduce(x, op = "sum", comm = comm_w)
comm.cat(" ", w, all.rank = TRUE, quiet = TRUE)
comm.cat("\n", quiet = TRUE)
r = allreduce(x, op = "sum", comm = comm_r)
comm.cat(" ", r, all.rank = TRUE, quiet = TRUE)
comm.cat("\n", quiet = TRUE)
c = allreduce(x, op = "sum", comm = comm_c)
comm.cat(" ", c, all.rank = TRUE, quiet = TRUE)
comm.cat("\n", quiet = TRUE)
#comm.free(comm_c)
#comm.free(comm_r)
finalize()