5  The Different Flavors of Parallelism

5.2 Lowest Level: SIMD

Recall SIMD, the idea that processors can run multiple commands simultaneously on specially structured data. “Single Instruction Multiple Data”. SIMD is parallelism within a single core.

5.2.1 High Level Idea of SIMD

Calculations can occur in parallel in the processor if there is sufficient structure in the computation.

5.2.2 How to do SIMD

The simplest way to do SIMD is simply to make sure that your values are aligned. If they are, then great, LLVM’s autovectorizer pass has a good chance of automatic vectorization (in the world of computing, “SIMD” is synonymous with vectorization since it is taking specific values and instead computing on small vectors. That is not to be confused with “vectorization” in the sense of Python/R/MATLAB, which is a programming style which prefers using C-defined primitive functions, like broadcast or matrix multiplication).

You can check for auto-vectorization inside of the LLVM IR by looking for statements like:

%wide.load24 = load <4 x double>, <4 x double> addrspac(13)* %46, align 8
; └
; ┌ @ float.jl:395 within `+'
%47 = fadd <4 x double> %wide.load, %wide.load24

which means that 4 additions are happening simultaneously. The amount of vectorization is heavily dependent on your architecture. The ancient form of SIMD, the SSE(2) instructions, required that your data was aligned. Now there’s a bit more leeway, but generally it holds that making your the data you’re trying to SIMD over is aligned. Thus there can be major differences in computing using a struct of array format instead of an arrays of structs format. For example:

struct MyComplex
  real::Float64
  imag::Float64
end
arr = [MyComplex(rand(),rand()) for i in 1:100]
100-element Vector{MyComplex}:
 MyComplex(0.5171425799729418, 0.5389833329835383)
 MyComplex(0.9273852992839166, 0.6711676516426572)
 MyComplex(0.9188839358734163, 0.69566633863532)
 MyComplex(0.5149805563839515, 0.34684634193884556)
 MyComplex(0.5025762934009232, 0.9317528981610566)
 MyComplex(0.6182965305758257, 0.4714678558437191)
 MyComplex(0.9637130781038232, 0.35381659885390127)
 MyComplex(0.05303934651739051, 0.5283571896609601)
 MyComplex(0.7640379612452078, 0.030918641696845883)
 MyComplex(0.7879963381227947, 0.877727975786527)
 MyComplex(0.40923630065090066, 0.2841032996700653)
 MyComplex(0.5310039504202523, 0.3809548581888238)
 MyComplex(0.28644668740025714, 0.040403772416303)
 ⋮
 MyComplex(0.966889263377449, 0.225967114853175)
 MyComplex(0.7583992061643928, 0.8376264040502798)
 MyComplex(0.4950710312486035, 0.38216405552956967)
 MyComplex(0.5577584208258319, 0.03476798870449893)
 MyComplex(0.2612858158174869, 0.9293738045669759)
 MyComplex(0.16877190046116053, 0.6768799829602348)
 MyComplex(0.610591500686496, 0.1714927381851986)
 MyComplex(0.025248616089485476, 0.8845033232694497)
 MyComplex(0.6623917047313289, 0.45970378019654157)
 MyComplex(0.22731260398681374, 0.38286776865734873)
 MyComplex(0.01214026543643354, 0.15850002116286088)
 MyComplex(0.17568792291939672, 0.7317199034423749)

is represented in memory as

[real1,imag1,real2,imag2,...]

while the struct of array formats are

struct MyComplexes
  real::Vector{Float64}
  imag::Vector{Float64}
end
arr2 = MyComplexes(rand(100),rand(100))
MyComplexes([0.5240456439259698, 0.7641990890595322, 0.3272012223563314, 0.32665635557581374, 0.6208235049962002, 0.017636574146488226, 0.6754741671723067, 0.6606017902042781, 0.9031998261731529, 0.7511761900676958  …  0.35946892646270157, 0.8366647043642217, 0.80495314181679, 0.17012580793194387, 0.4266440935361443, 0.8248150023004187, 0.7475526145122053, 0.6682331655066719, 0.7488365584042757, 0.6216662994614262], [0.5147671464058595, 0.4030123955856699, 0.8925325487199312, 0.8974490320038392, 0.26696452517979885, 0.15703223245821918, 0.5753006191470085, 0.18369838826811646, 0.1616695840779412, 0.9874418694199811  …  0.9053886752455029, 0.23022308636435107, 0.6651321483834207, 0.6417777842027098, 0.030151577749300107, 0.03298300488010508, 0.58854566946127, 0.7105727582002626, 0.9149637304123415, 0.5216242759285099])

Now let’s check what happens when we perform a reduction:

using InteractiveUtils
Base.:+(x::MyComplex,y::MyComplex) = MyComplex(x.real+y.real,x.imag+y.imag)
Base.:/(x::MyComplex,y::Int) = MyComplex(x.real/y,x.imag/y)
average(x::Vector{MyComplex}) = sum(x)/length(x)
@code_llvm average(arr)
;  @ In[5]:4 within `average`
define 
void @julia_average_1306([2 x double]* noalias nocapture noundef nonnull sret([2 x double]) align 8 dereferenceable(16) %0, {}* noundef nonnull align 16 dereferenceable(40) %1) #0 {
top:
  %2 = alloca [4 x {}*], align 8
  %3 = alloca <2 x double>, align 16
  %tmpcast = bitcast <2 x double>* %3 to [2 x double]*
  %.sub = getelementptr inbounds [4 x {}*], [4 x {}*]* %2, i64 0, i64 0
; ┌ @ reducedim.jl:994 within `sum`
; │┌ @ reducedim.jl:994 within `#sum#808`
; ││┌ @ reducedim.jl:998 within `_sum`
; │││┌ @ reducedim.jl:998 within `#_sum#810`
; ││││┌ @ reducedim.jl:999 within `_sum`
; │││││┌ @ reducedim.jl:999 within `#_sum#811`
; ││││││┌ @ reducedim.jl:357 within `mapreduce`
; │││││││┌ @ reducedim.jl:357 within `#mapreduce#801`
; ││││││││┌ @ reducedim.jl:365 within `_mapreduce_dim`
; │││││││││┌ @ reduce.jl:424 within `_mapreduce`
; ││││││││││┌ @ indices.jl:486 within `LinearIndices`
; │││││││││││┌ @ abstractarray.jl:98 within `axes`
; ││││││││││││┌ @ array.jl:149 within `size`
               %4 = bitcast {}* %1 to { i8*, i64, i16, i16, i32 }*
               %5 = getelementptr inbounds { i8*, i64, i16, i16, i32 }, { i8*, i64, i16, i16, i32 }* %4, i64 0, i32 1
               %6 = load i64, i64* %5, align 8
; ││││││││││└└└
; ││││││││││ @ reduce.jl:426 within `_mapreduce`
            switch i64 %6, label %L14 [
    i64 0, label %L8
    i64 1, label %L12
  ]

L8:                                               ; preds = %top
; ││││││││││ @ reduce.jl:427 within `_mapreduce`
            store {}* inttoptr (i64 140224218384336 to {}*), {}** %.sub, align 8
            %7 = getelementptr inbounds [4 x {}*], [4 x {}*]* %2, i64 0, i64 1
            store {}* inttoptr (i64 140224214304000 to {}*), {}** %7, align 8
            %8 = getelementptr inbounds [4 x {}*], [4 x {}*]* %2, i64 0, i64 2
            store {}* %1, {}** %8, align 8
            %9 = getelementptr inbounds [4 x {}*], [4 x {}*]* %2, i64 0, i64 3
            store {}* inttoptr (i64 140224235746528 to {}*), {}** %9, align 8
            %10 = call nonnull {}* @ijl_invoke({}* inttoptr (i64 140224233214240 to {}*), {}** nonnull %.sub, i32 4, {}* inttoptr (i64 140224414853696 to {}*))
            call void @llvm.trap()
            unreachable

L12:                                              ; preds = %top
; ││││││││││ @ reduce.jl:429 within `_mapreduce`
; ││││││││││┌ @ essentials.jl:13 within `getindex`
             %11 = bitcast {}* %1 to <2 x double>**
             %12 = load <2 x double>*, <2 x double>** %11, align 8
             %13 = load <2 x double>, <2 x double>* %12, align 1
             br label %L46

L14:                                              ; preds = %top
; ││││││││││└
; ││││││││││ @ reduce.jl:431 within `_mapreduce`
; ││││││││││┌ @ int.jl:83 within `<`
             %14 = icmp ugt i64 %6, 15
; ││││││││││└
            br i1 %14, label %L42, label %L16

L16:                                              ; preds = %L14
; ││││││││││ @ reduce.jl:433 within `_mapreduce`
; ││││││││││┌ @ essentials.jl:13 within `getindex`
             %15 = bitcast {}* %1 to [2 x double]**
             %16 = load [2 x double]*, [2 x double]** %15, align 8
             %17 = bitcast [2 x double]* %16 to <2 x double>*
             %18 = load <2 x double>, <2 x double>* %17, align 1
; ││││││││││└
; ││││││││││ @ reduce.jl:434 within `_mapreduce`
; ││││││││││┌ @ essentials.jl:13 within `getindex`
             %.sroa.028.0..sroa_idx = getelementptr inbounds [2 x double], [2 x double]* %16, i64 1, i64 0
             %19 = bitcast double* %.sroa.028.0..sroa_idx to <2 x double>*
             %20 = load <2 x double>, <2 x double>* %19, align 1
; ││││││││││└
; ││││││││││ @ reduce.jl:435 within `_mapreduce`
; ││││││││││┌ @ reduce.jl:24 within `add_sum`
; │││││││││││┌ @ In[5]:2 within `+` @ float.jl:408
              %21 = fadd <2 x double> %18, %20
; ││││││││││└└
; ││││││││││ @ reduce.jl:436 within `_mapreduce`
; ││││││││││┌ @ int.jl:83 within `<`
             %.not6482 = icmp ugt i64 %6, 2
; ││││││││││└

            br i1 %.not6482, label %L33, label %L46

L33:                                              ; preds = %L33, %L16
            %value_phi285 = phi i64 [ %23, %L33 ], [ 2, %L16 ]
            %22 = phi <2 x double> [ %26, %L33 ], [ %21, %L16 ]
; ││││││││││ @ reduce.jl:437 within `_mapreduce`
; ││││││││││┌ @ int.jl:87 within `+`
             %23 = add nuw nsw i64 %value_phi285, 1
; ││││││││││└
; ││││││││││┌ @ essentials.jl:13 within `getindex`
             %.sroa.0.0..sroa_idx = getelementptr inbounds [2 x double], [2 x double]* %16, i64 %value_phi285, i64 0
             %24 = bitcast double* %.sroa.0.0..sroa_idx to <2 x double>*
             %25 = load <2 x double>, <2 x double>* %24, align 1
; ││││││││││└
; ││││││││││ @ reduce.jl:438 within `_mapreduce`
; ││││││││││┌ @ reduce.jl:24 within `add_sum`
; │││││││││││┌ @ In[5]:2 within `+` @ float.jl:408
              %26 = fadd <2 x double> %22, %25
; ││││││││││└└
; ││││││││││ @ reduce.jl:436 within `_mapreduce`
; ││││││││││┌ @ int.jl:83 within `<`
             %exitcond.not = icmp eq i64 %23, %6
; ││││││││││└
            br i1 %exitcond.not, label %L46, label %L33

L42:                                              ; preds = %L14
; ││││││││││ @ reduce.jl:442 within `_mapreduce`
; ││││││││││┌ @ reduce.jl:272 within `mapreduce_impl`
             call void @j_mapreduce_impl_1308([2 x double]* noalias nocapture noundef nonnull sret([2 x double]) %tmpcast, {}* nonnull %1, i64 signext 1, i64 signext %6, i64 signext 1024) #0
; └└└└└└└└└└└
  %27 = load <2 x double>, <2 x double>* %3, align 16
; ┌ @ essentials.jl:10 within `length`
   %.pre = load i64, i64* %5, align 8
   br label %L46

L46:                                              ; preds = %L42, %L33, %L16, %L12
   %28 = phi i64 [ %.pre, %L42 ], [ 1, %L12 ], [ %6, %L16 ], [ %6, %L33 ]
   %29 = phi <2 x double> [ %27, %L42 ], [ %13, %L12 ], [ %21, %L16 ], [ %26, %L33 ]
; └
; ┌ @ In[5]:3 within `/` @ promotion.jl:413
; │┌ @ promotion.jl:381 within `promote`
; ││┌ @ promotion.jl:358 within `_promote`
; │││┌ @ number.jl:7 within `convert`
; ││││┌ @ float.jl:159 within `Float64`
       %30 = sitofp i64 %28 to double
; │└└└└
; │ @ In[5]:3 within `/` @ promotion.jl:413 @ float.jl:411
   %31 = insertelement <2 x double> poison, double %30, i64 0
   %32 = shufflevector <2 x double> %31, <2 x double> poison, <2 x i32> zeroinitializer
   %33 = fdiv <2 x double> %29, %32
; └
  %34 = bitcast [2 x double]* %0 to <2 x double>*
  store <2 x double> %33, <2 x double>* %34, align 8
  ret void
}

What this is doing is creating small little vectors and then parallelizing the operations of those vectors by calling specific vector-parallel instructions. Keep this in mind.

5.2.3 Explicit SIMD

The following was all a form of loop-level parallelism known as loop vectorization. It’s simply easier for compilers to reason at the array level, prove iterates are independent, and automatically generate SIMD code from that. This is not necessary, and compilers can produce SIMD code from non-looping code through a process known as SLP supervectorization, but the results are far from optimal and the compiler requires a lot of time to do this calculation, meaning that it’s usually not a pass used by default.

If you want to pack the vectors yourself, then primitives for doing so from within Julia are available in SIMD.jl. This is for “real” performance warriors. This looks like for example:

using SIMD
v = Vec{4,Float64}((1,2,3,4))
@show v+v # basic arithmetic is supported
@show sum(v) # basic reductions are supported
v + v = <4 x Float64>[2.0, 4.0, 6.0, 8.0]
sum(v) = 10.0
10.0

Using this you can pull apart code and force the usage of SIMD vectors. One library which makes great use of this is LoopVectorization.jl. However, one word of “caution”:

Most performance optimization is not trying to do something really good for performance. Most performance optimization is trying to not do something that is actively bad for performance.

5.2.4 Summary of SIMD

  • Communication in SIMD is due to locality: if things are local the processor can automatically setup the operations.
  • There’s no real worry about “getting it wrong”: you cannot overwrite pieces from different parts of the arithmetic unit, and if SIMD is unsafe then it just won’t auto-vectorize.
  • Suitable for operations measured in ns.

5.3 Next Level Up: Multithreading

Last time we briefly went over multithreading and described how every process has multiple threads which share a single heap, and when multiple threads are executed simultaneously we have multithreaded parallelism. Note that you can have multiple threads which aren’t executed simultaneously, like in the case of I/O operations, and this is an example of concurrency without parallelism and is commonly referred to as green threads.

Last time we described a simple multithreaded program and noticed that multithreading has an overhead cost of around 50ns-100ns. This is due to the construction of the new stack (among other things) each time a new computational thread is spun up. This means that, unlike SIMD, some thought needs to be put in as to when to perform multithreading: it’s not always a good idea. It needs to be high enough on the cost for this to be counter-balanced.

One abstraction that was glossed over was the memory access style. Before, we were considering a single heap, or an UMA style:

However, this is the case for all shared memory devices. For example, compute nodes on the HPC tend to be “dual Xeon” or “quad Xeon”, where each Xeon processor is itself a multicore processor. But each processor on its own accesses its own local caches, and thus one has to be aware that this is setup in a NUMA (non-uniform memory access) manner:

where there is a cache that is closer to the processor and a cache that is further away. Care should be taken in this to localize the computation per thread, otherwise a cost associated with the memory sharing will be hit (but all sharing will still be automatic).

In this sense, interthread communication is naturally done through the heap: if you want other threads to be able to touch a value, then you can simply place it on the heap and then it’ll be available. We saw this last time by how overlapping computations can re-use the same heap-based caches, meaning that care needs to be taken with how one writes into a dynamically-allocated array.

A simple example that demonstrates this is. First, let’s make sure we have multithreading enabled:

using Base.Threads
Threads.nthreads() # should not be 1
1
using BenchmarkTools
acc = 0
@threads for i in 1:10_000
    global acc
    acc += 1
end
acc
10000

The reason for this behavior is that there is a difference between the reading and the writing step to an array. Here, values are being read while other threads are writing, meaning that they see a lower value than when they are attempting to write into it. The result is that the total summation is lower than the true value because of this clashing. We can prevent this by only allowing one thread to utilize the heap-allocated variable at a time. One abstraction for doing this is atomics:

acc = Atomic{Int64}(0)
@threads for i in 1:10_000
    atomic_add!(acc, 1)
end
acc
Atomic{Int64}(10000)

When an atomic add is being done, all other threads wishing to do the same computation are blocked. This of course can have a massive effect on performance since atomic computations are not parallel.

Julia also exposes a lower level of heap control in threading using locks

const acc_lock = Ref{Int64}(0)
const splock = SpinLock()
function f1()
    @threads for i in 1:10_000
        lock(splock)
        acc_lock[] += 1
        unlock(splock)
    end
end
const rsplock = ReentrantLock()
function f2()
    @threads for i in 1:10_000
        lock(rsplock)
        acc_lock[] += 1
        unlock(rsplock)
    end
end
acc2 = Atomic{Int64}(0)
function g()
  @threads for i in 1:10_000
      atomic_add!(acc2, 1)
  end
end
const acc_s = Ref{Int64}(0)
function h()
  global acc_s
  for i in 1:10_000
      acc_s[] += 1
  end
end
@btime f1()
  157.285 μs (7 allocations: 640 bytes)

SpinLock is non-reentrant, i.e. it will block itself if a thread that calls a lock does another lock. Therefore it has to be used with caution (every lock goes with one unlock), but it’s fast. ReentrantLock alleviates those concerns, but trades off a bit of performance:

@btime f2()
  166.921 μs (7 allocations: 640 bytes)

But if you can use atomics, they will be faster:

@btime g()
  46.660 μs (7 allocations: 640 bytes)

and if your computation is actually serial, then use serial code:

@btime h()
  2.329 ns (0 allocations: 0 bytes)

Why is this so fast? Check the code:

@code_llvm h()
;  @ In[10]:25 within `h`
define void @julia_h_1791() #0 {
top:
  %.promoted = load i64, i64* inttoptr (i64 140224426130176 to i64*), align 256
;  @ In[10]:27 within `h`
  %0 = add i64 %.promoted, 10000
;  @ In[10]:28 within `h`
; ┌ @ Base.jl within `setproperty!`
   store i64 %0, i64* inttoptr (i64 140224426130176 to i64*), align 256
; └
;  @ In[10]:29 within `h`
  ret void
}

It just knows to add 10000. So to get a proper timing let’s make the size mutable:

const len = Ref{Int}(10_000)
function h2()
  global acc_s
  global len
  for i in 1:len[]
      acc_s[] += 1
  end
end
@btime h2()
  2.393 ns (0 allocations: 0 bytes)
@code_llvm h2()
;  @ In[15]:2 within `h2`
define void @julia_h2_1798() #0 {
top:
;  @ In[15]:5 within `h2`
; ┌ @ refvalue.jl:56 within `getindex`
; │┌ @ Base.jl:37 within `getproperty`
    %0 = load i64, i64* inttoptr (i64 140224426639520 to i64*), align 32
; └└
; ┌ @ range.jl:5 within `Colon`
; │┌ @ range.jl:397 within `UnitRange`
; ││┌ @ range.jl:404 within `unitrange_last`
     %.inv = icmp sgt i64 %0, 0
; └└└
  br i1 %.inv, label %L18.preheader, label %L34

L18.preheader:                                    ; preds = %top
  %.promoted = load i64, i64* inttoptr (i64 140224426130176 to i64*), align 256
;  @ In[15]:7 within `h2`
  %1 = add i64 %.promoted, %0
;  @ In[15]:6 within `h2`
; ┌ @ Base.jl within `setproperty!`
   store i64 %1, i64* inttoptr (i64 140224426130176 to i64*), align 256
; └
;  @ In[15]:7 within `h2`
  br label %L34

L34:                                              ; preds = %L18.preheader, %top
  ret void
}

It’s still optimizing it!

non_const_len = 10000
function h3()
  global acc_s
  global non_const_len
  len2::Int = non_const_len
  for i in 1:len2
      acc_s[] += 1
  end
end
@btime h3()
  99.019 ns (0 allocations: 0 bytes)

Note that what is shown here is a type-declaration. a::T = ... forces a to be of type T throughout the whole function. By giving the compiler this information, I am able to use the non-constant global in a type-stable manner.

One last thing to note about multithreaded computations, and parallel computations, is that one cannot assume that the parallelized computation is computed in any given order. For example, the following will has a quasi-random ordering:

const a2 = zeros(nthreads()*10)
const acc_lock2 = Ref{Int64}(0)
const splock2 = SpinLock()
function f_order()
    @threads for i in 1:length(a2)
        lock(splock2)
        acc_lock2[] += 1
        a2[i] = acc_lock2[]
        unlock(splock2)
    end
end
f_order()
a2
10-element Vector{Float64}:
  1.0
  2.0
  3.0
  4.0
  5.0
  6.0
  7.0
  8.0
  9.0
 10.0

Note that here we can see that Julia 1.5 is dividing up the work into groups of 10 for each thread, and then one thread dominates the computation at a time, but which thread dominates is random.

5.3.1 The Dining Philosophers Problem

A classic tale in parallel computing is the dining philosophers problem. In this case, there are N philosophers at a table who all want to eat at the same time, following all of the same rules. Each philosopher must alternatively think and then eat. They need both their left and right fork to start eating, but cannot start eating until they have both forks. The problem is how to setup a concurrent algorithm that will not cause any philosophers to starve.

The difficulty is a situation known as deadlock. For example, if each philosopher was told to grab the right fork when it’s available, and then the left fork, and put down the fork after eating, then they will all grab the right fork and none will ever eat because they will all be waiting on the left fork. This is analogous to two blocked computations which are waiting on the other to finish. Thus, when using blocking structures, one needs to be careful about deadlock!

5.3.2 Two Programming Models: Loop-Level Parallelism and Task-Based Parallelism

As described in the previous lecture, one can also use Threads.@spawn to do multithreading in Julia v1.3+. The same factors all applay: how to do locks and Mutex etc. This is a case of a parallelism construct having two alternative programming models. Threads.@spawn represents task-based parallelism, while Threads.@threads represents Loop-Level Parallelism or a parallel iterator model. Loop-based parallelization models are very high level and, assuming every iteration is independent, almost requires no code change. Task-based parallelism is a more expressive parallelism model, but usually requires modifying the code to be explicitly written as a set of parallelizable tasks. Note that in the case of Julia, Threads.@threads is implemented using Threads.@spawn’s model.

5.3.3 Summary of Multithreading

  • Communication in multithreading is done on the heap. Locks and atomics allow for a form of safe message passing.
  • 50ns-100ns of overhead. Suitable for 1μs calculations.
  • Be careful of ordering and heap-allocated values.

5.4 GPU Computing

GPUs are not fast. In fact, the problem with GPUs is that each processor is slow. However, GPUs have a lot of cores… like thousands.

An RTX2080, a standard “gaming” GPU (not even the ones in the cluster), has 2944 cores. However, not only are GPUs slow, but they also need to be programmed in a style that is SPMD, which standard for Single Program Multiple Data. This means that every single thread must be running the same program but on different pieces of data. Exactly the same program. If you have

if a > 1
  # Do something
else
  # Do something else
end

where some of the data goes on one branch and other data goes on the other branch, every single thread will run both branches (performing “fake” computations while on the other branch). This means that GPU tasks should be “very parallel” with as few conditionals as possible.

5.4.1 GPU Memory

GPUs themselves are shared memory devices, meaning they have a heap that is shared amongst all threads. However, GPUs are heavily in the NUMA camp, where different blocks of the GPU have much faster access to certain parts of the memory. Additionally, this heap is disconnected from the standard processor, so data must be passed to the GPU and data must be returned.

GPU memory size is relatively small compared to CPUs. Example: the RTX2080Ti has 8GB of RAM. Thus one needs to be doing computations that are memory compact (such as matrix multiplications, which are O(n^3) making the computation time scale quicker than the memory cost).

5.4.2 Note on GPU Hardware

Standard GPU hardware “for gaming”, like RTX2070, is just as fast as higher end GPU hardware for Float32. Higher end hardware, like the Tesla, add more memory, memory safety, and Float64 support. However, these require being in a server since they have alternative cooling strategies, making them a higher end product.

5.4.3 SPMD Kernel Generation GPU Computing Models

The core programming models for GPU computing are SPMD kernel compilers, of which the most well-known is CUDA. CUDA is a C++-like programming language which compiles to .ptx kernels, and GPU execution on NVIDIA GPUs is done by “all steams” of a GPU doing concurrent execution of the kernel (generally, without going into more details, you can of “all streams” as just meaning “all cores”. More detailed views of GPU execution will come later).

.ptx CUDA kernels can be compiled from LLVM IR, and thus since Julia is a programming language which emits LLVM IR for all of its operations, native Julia programs are compatible with compilation to CUDA. The helper functions to enable this separate compilation path is CUDA.jl. Let’s take a look at a basic CUDA.jl kernel generating example:

using CUDA

N = 2^20
x_d = CUDA.fill(1.0f0, N)  # a vector stored on the GPU filled with 1.0 (Float32)
y_d = CUDA.fill(2.0f0, N)  # a vector stored on the GPU filled with 2.0

function gpu_add2!(y, x)
    index = threadIdx().x    # this example only requires linear indexing, so just use `x`
    stride = blockDim().x
    for i = index:stride:length(y)
        @inbounds y[i] += x[i]
    end
    return nothing
end

fill!(y_d, 2)
@cuda threads=256 gpu_add2!(y_d, x_d)
all(Array(y_d) .== 3.0f0)
true

The key to understanding the SPMD kernel approach is the index = threadIdx().x and stride = blockDim().x portions.

The way kernels are expected to run in parallel is that they are given a specific block of the computation and are expected to write a kernel which only on that small block of the input. This kernel is then called on every separate thread on the GPU, making each CUDA core simultaneously compute each block. Thus as a user in such a SPMD programming model, you never specify the computation globally but instead simply specify how chunks should behave, giving the compiler the leeway to determine the optimal global execution.

5.4.4 Array-Based GPU Computing Models

The simplest version of GPU computing is the array-based programming model.

A = rand(100,100); B = rand(100,100)
using CUDA
# Pass to the GPU
cuA = cu(A); cuB = cu(B)
cuC = cuA*cuB
# Pass to the CPU
C = Array(cuC)
100×100 Matrix{Float32}:
 25.0564  28.1081  24.5791  24.2118  …  29.1066  25.3427  26.785   26.8159
 23.8365  26.9323  22.1268  24.394      25.1406  24.2087  23.0141  24.5436
 23.8291  27.5819  20.8153  24.5719     26.0579  24.7479  24.5362  24.5483
 22.3662  25.2902  20.4605  22.7104     23.514   22.1021  23.3403  23.0844
 24.8252  29.7588  22.9885  28.0034     28.7318  26.3997  26.019   27.2803
 23.8504  25.0349  20.1403  22.2076  …  24.6679  21.7806  21.7851  22.3799
 20.0263  22.023   17.9829  20.6622     21.3617  19.9235  21.8351  21.5375
 22.3813  25.5924  19.3125  22.7719     24.4725  23.848   23.5627  23.3131
 25.0231  29.8407  23.5434  25.3326     26.7342  24.6092  25.2959  24.4861
 22.874   26.0299  20.9581  23.9912     25.9096  23.7     23.8586  24.6916
 22.7543  26.2432  20.4097  25.0279  …  27.3809  24.721   24.4355  26.0748
 25.4256  30.0091  22.1195  25.6505     26.6411  24.233   25.3694  25.9158
 24.6519  27.243   21.2237  24.1523     26.9574  25.4418  23.3858  24.1972
  ⋮                                  ⋱                             
 22.2877  27.4741  21.8779  25.6035     26.5338  23.4227  25.7808  24.7436
 25.7797  28.7837  23.0472  26.992      26.7406  25.9313  27.3336  27.8026
 23.9616  25.2329  20.9843  23.1032  …  25.5     24.5971  23.6548  26.0905
 23.9671  28.4682  21.4784  26.8163     26.3495  25.1586  25.6829  25.9901
 24.8729  29.1563  20.8062  25.7028     28.3692  25.0772  23.9394  28.236
 24.1732  29.4735  21.1834  25.9207     25.0506  25.0781  24.6209  25.716
 24.6369  28.7742  22.0081  26.1276     28.1415  27.08    26.6875  26.4206
 25.5436  26.6702  21.8379  24.6884  …  25.2922  24.5365  24.3202  25.257
 23.8022  27.3092  21.9259  23.1253     26.5923  23.4442  23.6898  24.4027
 25.0953  28.9557  23.7193  25.9754     28.8757  25.4649  25.8137  27.0875
 22.7922  26.9324  20.2291  23.9017     26.5135  22.9873  23.975   24.1223
 21.9951  26.0003  21.9184  21.6673     23.6457  22.287   22.3392  24.2492

Let’s see the transfer times:

@btime cu(A)
  8.144 μs (8 allocations: 39.30 KiB)
100×100 CuArray{Float32, 2, CUDA.Mem.DeviceBuffer}:
 0.795979   0.820715   0.804492   …  0.173238   0.602618  0.325205
 0.248617   0.817344   0.31537       0.99921    0.353251  0.357586
 0.562645   0.590865   0.612691      0.835189   0.317652  0.613724
 0.0302437  0.108246   0.134883      0.756453   0.844707  0.275566
 0.803271   0.454746   0.0568371     0.476437   0.292629  0.436834
 0.528203   0.965217   0.3534     …  0.284256   0.51717   0.149346
 0.0373348  0.664591   0.305146      0.37815    0.915737  0.26873
 0.161136   0.870413   0.747152      0.950921   0.273694  0.240076
 0.948914   0.538506   0.163413      0.087484   0.387942  0.0266194
 0.232321   0.729338   0.874976      0.342558   0.171119  0.669174
 0.749575   0.989877   0.585643   …  0.841039   0.372072  0.480976
 0.13611    0.0182909  0.993907      0.772716   0.208189  0.724729
 0.738871   0.168995   0.370926      0.902453   0.496412  0.624321
 ⋮                                ⋱                       
 0.850467   0.865298   0.566511      0.484169   0.133182  0.745015
 0.424948   0.767975   0.0608416     0.381281   0.962849  0.0890435
 0.163066   0.660003   0.823241   …  0.491438   0.353012  0.819618
 0.989283   0.663811   0.23109       0.359999   0.718979  0.0412983
 0.658782   0.359017   0.475921      0.643057   0.38225   0.922345
 0.60349    0.128136   0.304796      0.0785167  0.960574  0.403708
 0.904782   0.669525   0.633114      0.594589   0.381224  0.483661
 0.716692   0.930138   0.359974   …  0.739195   0.365376  0.108808
 0.253205   0.974106   0.696407      0.876903   0.518735  0.241752
 0.730507   0.961051   0.208908      0.767898   0.723701  0.709793
 0.803944   0.900472   0.60208       0.988069   0.471301  0.870057
 0.331299   0.979528   0.293971      0.121811   0.331233  0.993614
@btime Array(cuC)
  12.888 μs (3 allocations: 39.12 KiB)
100×100 Matrix{Float32}:
 25.0564  28.1081  24.5791  24.2118  …  29.1066  25.3427  26.785   26.8159
 23.8365  26.9323  22.1268  24.394      25.1406  24.2087  23.0141  24.5436
 23.8291  27.5819  20.8153  24.5719     26.0579  24.7479  24.5362  24.5483
 22.3662  25.2902  20.4605  22.7104     23.514   22.1021  23.3403  23.0844
 24.8252  29.7588  22.9885  28.0034     28.7318  26.3997  26.019   27.2803
 23.8504  25.0349  20.1403  22.2076  …  24.6679  21.7806  21.7851  22.3799
 20.0263  22.023   17.9829  20.6622     21.3617  19.9235  21.8351  21.5375
 22.3813  25.5924  19.3125  22.7719     24.4725  23.848   23.5627  23.3131
 25.0231  29.8407  23.5434  25.3326     26.7342  24.6092  25.2959  24.4861
 22.874   26.0299  20.9581  23.9912     25.9096  23.7     23.8586  24.6916
 22.7543  26.2432  20.4097  25.0279  …  27.3809  24.721   24.4355  26.0748
 25.4256  30.0091  22.1195  25.6505     26.6411  24.233   25.3694  25.9158
 24.6519  27.243   21.2237  24.1523     26.9574  25.4418  23.3858  24.1972
  ⋮                                  ⋱                             
 22.2877  27.4741  21.8779  25.6035     26.5338  23.4227  25.7808  24.7436
 25.7797  28.7837  23.0472  26.992      26.7406  25.9313  27.3336  27.8026
 23.9616  25.2329  20.9843  23.1032  …  25.5     24.5971  23.6548  26.0905
 23.9671  28.4682  21.4784  26.8163     26.3495  25.1586  25.6829  25.9901
 24.8729  29.1563  20.8062  25.7028     28.3692  25.0772  23.9394  28.236
 24.1732  29.4735  21.1834  25.9207     25.0506  25.0781  24.6209  25.716
 24.6369  28.7742  22.0081  26.1276     28.1415  27.08    26.6875  26.4206
 25.5436  26.6702  21.8379  24.6884  …  25.2922  24.5365  24.3202  25.257
 23.8022  27.3092  21.9259  23.1253     26.5923  23.4442  23.6898  24.4027
 25.0953  28.9557  23.7193  25.9754     28.8757  25.4649  25.8137  27.0875
 22.7922  26.9324  20.2291  23.9017     26.5135  22.9873  23.975   24.1223
 21.9951  26.0003  21.9184  21.6673     23.6457  22.287   22.3392  24.2492

The cost transferring is about 20μs-50μs in each direction, meaning that one needs to be doing operations that cost at least 200μs for GPUs to break even. A good rule of thumb is that GPU computations should take at least a millisecond, or GPU memory should be re-used.

5.4.5 Summary of GPUs

  • GPUs cores are slow
  • GPUs are SPMD
  • GPUs are generally used for linear algebra
  • Suitable for SPMD 1ms computations

5.5 Xeon Phi Accelerators and OpenCL

Other architectures exist to keep in mind. Xeon Phis are a now-defunct accelerator that used X86 (standard processors) as the base, using hundreds of them. For example, the Knights Landing series had 256 core accelerator cards. These were all clocked down, meaning they were still slower than a standard CPU, but there were less restrictions on SPMD (though SPMD-like computations were still preferred in order to heavily make use of SIMD). However, because machine learning essentially only needs linear algebra, and linear algebra is faster when restricting to SPMD-architectures, this failed. These devices can still be found on many high end clusters.

One alternative to CUDA is OpenCL which supports alternative architectures such as the Xeon Phi at the same time that it supports GPUs. However, one of the issues with OpenCL is that its BLAS implementation currently does not match the speed of CuBLAS, which makes NVIDIA-specific libraries still the king of machine learning and most scientific computing.

5.6 TPU Computing

TPUs are tensor processing units, which is Google’s newest accelerator technology. They are essentially just “tensor operation compilers”, which in computer science speak is simply higher dimensional linear algebra. To do this, they internally utilize a BFloat16 type, which is a 16-bit floating point number with the same exponent size as a Float32 with an 8-bit significant. This means that computations are highly prone to catastrophic cancellation. This computational device only works because BFloat16 has primitive operations for FMA which allows 32-bit-like accuracy of multiply-add operations, and thus computations which are only dot products (linear algebra) end up okay. Thus this is simply a GPU-like device which has gone further to completely specialize in linear algebra.

5.7 Multiprocessing (Distributed Computing)

While multithreading computes with multiple threads, multiprocessing computes with multiple independent processes. Note that processes do not share any memory, not heap or data, and thus this mode of computing also allows for distributed computations, which is the case where processes may be on separate computing hardware. However, even if they are on the same hardware, the lack of a shared address space means that multiprocessing has to do message passing, i.e. send data from one process to the other.

5.7.1 Distributed Tasks with Explicit Memory Handling: The Master-Worker Model

Given the amount of control over data handling, there are many different models for distributed computing. The simplest, the one that Julia’s Distributed Standard Library defaults to, is the master-worker model. The master-worker model has one process, deemed the master, which controls the worker processes.

Here we can start by adding some new worker processes:

using Distributed
addprocs(4)

This adds 4 worker processes for the master to control. The simplest computations are those where the master process gives the worker process a job which returns the value afterwards. For example, a pmap operation or @distributed loop gives the worker a function to execute, along with the data, and the worker then computes and returns the result.

At a lower level, this is done by Distributed.@spawning jobs, or using a remotecall and fetching the result. ParallelDataTransfer.jl gives an extended set of primitive message passing operations. For example, we can explicitly tell it to compute a function f on the remote process like:

@everywhere f(x) = x.^2 # Define this function on all processes
t = remotecall(f,2,randn(10))

remotecall is a non-blocking operation that returns a Future. To access the data, one should use the blocking operation fetch to receive the data:

xsq = fetch(t)

5.7.2 Distributed Tasks with Implicit Memory Handling: Distributed Task-Based Parallelism

Another popular programming model for distributed computation is task-based parallelism but where all of the memory handling is implicit. Since, unlike the shared memory parallelism case, data transfers are required for given processes to share heap allocated values, distributed task-based parallelism libraries tend to want a global view of the whole computation in order to build a sophisticated schedule that includes where certain data lives and when transfers will occur. Because of this, distributed task-based parallelism libraries tend to want the entire computational graph of the computation, to be able to restructure the graph as necessary with their own data transfer portions spliced into the compute. Examples of this kind of framework are:

  • Tensorflow
  • dask (“distributed tasks”)
  • Dagger.jl

Using these kinds of libraries requires building a directed acyclic graph (DAG). For example, the following showcases how to use Dagger.jl to represent a bunch of summations:

using Dagger

add1(value) = value + 1
add2(value) = value + 2
combine(a...) = sum(a)

p = delayed(add1)(4)
q = delayed(add2)(p)
r = delayed(add1)(3)
s = delayed(combine)(p, q, r)

@assert collect(s) == 16

Once the global computation is specified, commands like collect are used to instantiate the graph on given input data, which then run the computation in a (potentially) distributed manner, depending on internal scheduler heuristics.

5.7.3 Distributed Array-Based Parallelism: SharedArrays, Elemental, and DArrays

Because array operations are a standard way to compute in scientific computing, there are higher level primitives to help with message passing. A SharedArray is an array which acts like a shared memory device. This means that every change to a SharedArray causes message passing to keep them in sync, and thus this should be used with a performance caution. DistributedArrays.jl is a parallel array type which has local blocks and can be used for writing higher level abstractions with explicit message passing. Because it is currently missing high-level parallel linear algebra, currently the recommended tool for distributed linear algebra is Elemental.jl.

5.7.4 MapReduce, Hadoop, and Spark: The Map-Reduce Model

Many data-parallel operations work by mapping a function f onto each piece of data and then reducing it. For example, the sum of squares maps the function x -> x^2 onto each value, and then these values are reduced by performing a summation. MapReduce was a Google framework in the 2000’s built around this as the parallel computing concept, and current data-handling frameworks, like Hadoop and Spark, continue this as the core distributed programming model.

In Julia, there exists the mapreduce function for performing serial mapreduce operations. It also work on GPUs. However, it does not auto-distribute. For distributed map-reduce programming, the @distributed for-loop macro can be used. For example, sum of squares of random numbers is:

@distributed (+) for i in 1:1000
  rand()^2
end

One can see that computing summary statistics is easily done in this framework which is why it was majorly adopted among “big data” communities.

@distributed uses a static scheduler. The dynamic scheduling equivalent is pmap:

pmap(i->rand()^2,1:100)

which will dynamically allocate jobs to processes as they declare they have finished jobs. This thus has the same performance difference behavior as Threads.@threads vs Threads.@spawn.

5.7.5 MPI: The Distributed SPMD Model

The main way to do high-performance multiprocessing is MPI, which is an old distributed computing interface from the C/Fortran days. Julia has access to the MPI programming model through MPI.jl. The programming model for MPI is that every computer is running the same program, and synchronization is performed by blocking communication. For example, let’s look at the following:

using MPI
MPI.Init()

comm = MPI.COMM_WORLD
rank = MPI.Comm_rank(comm)
size = MPI.Comm_size(comm)

dst = mod(rank+1, size)
src = mod(rank-1, size)

N = 4

send_mesg = Array{Float64}(undef, N)
recv_mesg = Array{Float64}(undef, N)

fill!(send_mesg, Float64(rank))

rreq = MPI.Irecv!(recv_mesg, src,  src+32, comm)

print("$rank: Sending   $rank -> $dst = $send_mesg\n")
sreq = MPI.Isend(send_mesg, dst, rank+32, comm)

stats = MPI.Waitall!([rreq, sreq])

print("$rank: Received $src -> $rank = $recv_mesg\n")

MPI.Barrier(comm)
#| eval: false
> mpiexecjl -n 3 julia examples/04-sendrecv.jl
1: Sending   1 -> 2 = [1.0, 1.0, 1.0, 1.0]
0: Sending   0 -> 1 = [0.0, 0.0, 0.0, 0.0]
1: Received 0 -> 1 = [0.0, 0.0, 0.0, 0.0]
2: Sending   2 -> 0 = [2.0, 2.0, 2.0, 2.0]
0: Received 2 -> 0 = [2.0, 2.0, 2.0, 2.0]
2: Received 1 -> 2 = [1.0, 1.0, 1.0, 1.0]

Let’s investigate this a little bit. Think about having two computers run this line-by-line side by side. They will both locally build arrays, and then call MPI.Irecv!, which is an asynchronous non-blocking call to listen for a message from a given rank (a rank is the ID for a given process). Then they call their sreq = MPI.Isend function, which is an asynchronous non-blocking call to send a message send_mesg to the chosen rank. When the expected message is found, MPI.Irecv! will then run on its green thread and finish, updating the recv_mesg with the information from the message. However, in order to make sure all of the messages are received, we have added in a blocking operation MPI.Waitall!([rreq, sreq]), which will block all further execution on the given rank until both its rreq and sreq tasks are completed. After that is done, each given rank will have its updated data, and the script will continue on all ranks.

This model is thus very asynchronous and allows for many different computers to run one highly parallelized program, managing the data transmissions in a sparse way without a single computer in charge of managing the whole computation. However, it can be prone to deadlock, since errors in the program may for example require rank 1 to receive a message from rank 2 before continuing the program, but rank 2 won’t continue to program until it receives a message from rank 1. For this reason, while MPI has been the most successful large-scale distributed computing model and almost all major high-performance computing (HPC) cluster competitions have been won by codes utilizing the MPI model, the MPI model is nowadays considered a last resort due to these safety issues.

5.7.6 Summary of Multiprocessing

  • Cost is hardware dependent: only suitable for 1ms or higher depending on the connections through which the messages are being passed and the topology of the network.
  • The Master-worker programming model is Julia’s Distributed model
  • The Map-reduce programming model is a common data-handling model
  • Array-based distributed computations are another abstraction, used in all forms of parallelism.
  • MPI is a SPMD model of distributed computing, where each process is completely independent and one just controls the memory handling.

5.8 The Bait-and-switch: Parallelism is about Programming Models

While this looked like a lecture about parallel programming at the different levels and types of hardware, this wide overview showcases that the real underlying commonality within parallel program is in the parallel programming models, of which there are not too many. There are:

  • Map-reduce parallelism models. pmap, MapReduce (Hadoop/Spark)
    • Pros: Easy to use
    • Cons: Requires that your program is specifically only mapping functions f and reducing them. That said, many data science operations like mean, variance, maximum, etc. can be represented as map-reduce calls, which lead to the popularity of these approaches for “big data” operations.
  • Array-based parallelism models. SIMD (at the compiler level), CuArray, DistributedArray, PyTorch.torch, …
    • Pros: Easy to use, can have very fast library implementations for specific functions
    • Cons: Less control and restricted to specific functions implemented by the library. Parallelism matches the data structure, so it requires the user to be careful and know the best way to split the data.
  • Loop-based parallelism models. Threads.@threads, @distributed, OpenMP, MATLAB’s parfor, Chapel’s iterator parallelism
    • Pros: Easy to use, almost no code change can make existing loops parallelized
    • Cons: Refined operations, like locking and sharing data, can be awkward to write. Less control over fine details like scheduling, meaning less opportunities to optimize.
  • Task-based parallelism models with implicit distributed data handling. Threads.@spawn, Dagger.jl, TensorFlow, dask
    • Pros: Relatively high level, low risk of errors since parallelism is mostly handled for the user. User simply describes which functions to call in what order.
    • Cons: When used on distributed systems, implicit data handling is hard, meaning it’s generally not as efficient if you don’t optimize the code yourself or help the optimizer, and these require specific programming constructs for building the computational graph. Note this is only a downside for distributed data parallelism, whereas when applied to shared memory systems these aspects no longer require handling by the task scheduler.
  • Task-based parallelism models with explicit data handling. Distributed.@spawn
    • Pros: Allows for control over what compute hardware will have specific pieces of data and allows for transferring data manually.
    • Cons: Requires transferring data manually. All computations are managed by a single process/computer/node and thus it can have some issues scaling to extreme (1000+ node) computing situations.
  • SPMD kernel parallelism models. CUDA, MPI, KernelAbstractions.jl
    • Pros: Reduces the problem for the user to only specify what happens in small chunks of the problem. Works on accelerator hardware like GPUs, TPUs, and beyond.
    • Cons: Only works for computations that be represented block-wise, and relies on the compiler to generate good code.

In this sense, the different parallel programming “languages” and features are much more similar than they are all different, falling into similar categories.