Julia's Parallel Processing
Get to know a range of methods for parallel processing in Julia.
Julia is a relatively new, free, and open-source programming language. It has a syntax similar to that of other popular programming languages such as MATLAB and Python, but it boasts being able to achieve C-like speeds.
While serial Julia code can be fast, sometimes even more speed is desired. In many cases, writing parallel code can further reduce run time. Parallel code takes advantage of the multiple CPU cores included in modern computers, allowing multiple computations to run at the same time, or in parallel.
Julia provides two methods for writing parallel CPU code: multi-threading and distributed computing. This post will cover the basics of how to use these two methods of parallel processing.
This post assumes you already have Julia installed. If you haven't yet, check out our earlier post on how to install Julia.
Multi-Threading
First, let's learn about multi-threading.
To enable multi-threading, you must start Julia in one of two ways:
Set the environment variable
JULIA_NUM_THREADS
to the number of threads Julia should use, and then start Julia. For example,JULIA_NUM_THREADS=4
.Run Julia with the
--threads
(or-t
) command line argument. For example,julia --threads 4
orjulia -t 4
.
After starting Julia (either with or without specifying the number of threads), the Threads
module will be loaded. We can check the number of threads Julia has available:
julia> Threads.nthreads()
4
The simplest way to start writing parallel code is just to use the Threads.@threads
macro. Inserting this macro before a for
loop will cause the iterations of the loop to be split across the available threads, which will then operate in parallel. For example:
Threads.@threads for i = 1:10
func(i)
end
Without Threads.@threads
, first func(1)
will run, then func(2)
, and so on. With the macro, and assuming we started Julia with four threads, first func(1)
, func(4)
, func(7)
, and func(9)
will run in parallel. Then, when a thread's iteration finishes, it will start another iteration (assuming the loop is not done yet), regardless of whether the other threads have finished their iterations yet. Therefore, this loop will theoretically finish 10 iterations in the time it takes a single thread to do 3.
Note that Threads.@threads
is blocking, meaning code after the threaded for
loop will not run until the loop has finished.
Julia also provides another macro for multi-threading: Threads.@spawn
. This macro is more flexible than Threads.@threads
because it can be used to run any code on a thread, not just for
loops. But let's illustrate how to use Threads.@spawn
by implementing the behavior of Threads.@threads
:
# Function for splitting up `x` as evenly as possible
# across `np` partitions.
function partition(x, np)
(len, rem) = divrem(length(x), np)
Base.Generator(1:np) do p
i1 = firstindex(x) + (p - 1) * len
i2 = i1 + len - 1
if p <= rem
i1 += p - 1
i2 += p
else
i1 += rem
i2 += rem
end
chunk = x[i1:i2]
end
end
N = 10
chunks = partition(1:10, Threads.nthreads())
tasks = map(chunks) do chunk
Threads.@spawn for i in chunk
func(i)
end
end
wait.(tasks)
Let's walk through this code, assuming Threads.nthreads() == 4
:
First, we split the 10 iterations evenly across the 4 threads using
partition
. So,chunks
ends up being[1:3, 4:6, 7:8, 9:10]
. (We could have hard-coded the partitioning, but now you have a nicepartition
function that can work with more complicated partitionings!)Then, for each chunk, we create a
Task
viaThreads.@spawn
that will callfunc
on each element of the chunk. ThisTask
will be scheduled to run on an available thread.tasks
contains a reference to each of these spawnedTask
s.Finally, we wait for the
Task
s to finish with thewait
function.
To reemphasize, note that Threads.@spawn
creates a Task
; it does not wait for the task to run. As such, it is non-blocking, and program execution continues as soon as the Task
is returned. The code wrapped in the task will also run, but in parallel, on a separate thread. This behavior is illustrated below:
julia> Threads.@spawn (sleep(2); println("Spawned task finished"))
Task (runnable) @0x00007fdd4b10dc30
julia> 1 + 1 # This code executes without waiting for the above task to finish
2
julia> Spawned task finished # Prints 2 seconds after spawning the above task
julia>
Spawned tasks can also return data. While wait
just waits for a task to finish, fetch
waits for a task and then obtains the result:
julia> task = Threads.@spawn (sleep(2); 1 + 1)
Task (runnable) @0x00007fdd4a5e28b0
julia> fetch(task)
2
Thread Safety
When using multi-threading, memory is shared across threads. If a thread writes to a memory location that is written to or read from another thread, that will lead to a race condition with unpredictable results. To illustrate:
julia> s = 0;
julia> Threads.@threads for i = 1:1000000
global s += i
end
julia> s
19566554653 # Should be 500000500000
There are two methods we can use to avoid the race condition. The first involves using a lock:
julia> s = 0; l = ReentrantLock();
julia> Threads.@threads for i = 1:1000000
lock(l) do
global s += i
end
end
julia> s
500000500000
In this case, the addition can only occur on a given thread once that thread holds the lock. If a thread does not hold the lock, it must wait for whatever thread controls it to release the lock before it can run the code within the lock
block.
Using a lock in this example is suboptimal, however, as it eliminates all parallelism because only one thread can hold the lock at any given moment. (In other examples, however, using a lock works great, particularly when only a small portion of the code depends on the lock.)
The other way to eliminate the race condition is to use task-local buffers:
julia> s = 0; chunks = partition(1:1000000, Threads.nthreads());
julia> tasks = map(chunks) do chunk
Threads.@spawn begin
x = 0
for i in chunk
x += i
end
x
end
end;
julia> thread_sums = fetch.(tasks);
julia> for i in thread_sums
s += i
end
julia> s
500000500000
In this example, each spawned task has its own x
that stores the sum of the values just in the task's chunk of data. In particular, none of the tasks modify s
. Then, once each task has computed its sum, the intermediate values are summed and stored in s
in a single-threaded manner.
Using task-local buffers works better for this example than using a lock because most of the parallelism is preserved.
(Note that it used to be advised to manage task-local buffers using the threadid
function. However, doing so does not guarantee each task uses its own buffer. Therefore, the method demonstrated in the above example is now advised.)
Packages for Quickly Utilizing Multi-Threading
In addition to writing your own multi-threaded code, there exist packages that utilize multi-threading. Two such examples are ThreadsX.jl and ThreadTools.jl.
ThreadsX.jl provides multi-threaded implementations of several common functions such as sum
and sort
, while ThreadTools.jl provides tmap
, a multi-threaded version of map
.
These packages can be great for quickly boosting performance without having to figure out multi-threading on your own.
Distributed Computing
Besides multi-threading, Julia also provides for distributed computing, or splitting work across multiple Julia processes.
There are two ways to start multiple Julia processes:
Load the Distributed standard library package with
using Distributed
and then useaddprocs
. For example,addprocs(2)
to add two additional Julia processes (for a total of three).Run Julia with the
-p
command line argument. For example,julia -p 2
to start Julia with three total Julia processes. (Note that running Julia with-p
will implicitly loadDistributed
.)
Added processes are known as worker processes, while the original process is the main process. Each process has an id: the main process has id 1
, and worker processes have id 2
, 3
, etc.
By default, code runs on the main process. To run code on a worker, we need to explicitly give code to that worker. We can do so with remotecall_fetch
, which takes as inputs a function to run, the process id to run the function on, and the input arguments and keyword arguments the function needs. Here are some examples:
# Create a zero-argument anonymous function to run on worker 2.
julia> remotecall_fetch(2) do
println("Done")
end
From worker 2: Done
# Create a two-argument anonymous function to run on worker 2.
julia> remotecall_fetch((a, b) -> a + b, 2, 1, 2)
3
# Run `sum([1 3; 2 4]; dims = 1)` on worker 3.
julia> remotecall_fetch(sum, 3, [1 3; 2 4]; dims = 1)
1x2 Matrix{Int64}:
3 7
If you don't need to wait for the result immediately, use remotecall
instead of remotecall_fetch
. This will create a Future
that you can later wait
on or fetch
(similarly to a Task
spawned with Threads.@spawn
).
Separate Memory Spaces
One significant difference between multi-threading and distributed processing is that memory is shared in multi-threading, while each distributed process has its own separate memory space. This has several important implications:
To use a package on a given worker, it must be loaded on that worker, not just on the main process. To illustrate:
julia> using LinearAlgebra julia> I UniformScaling{Bool} true*I julia> remotecall_fetch(() -> I, 2) ERROR: On worker 2: UndefVarError: `I` not defined
To avoid the error, we could use
@everywhere using LinearAlgebra
to loadLinearAlgebra
on all processes.Similarly to the previous point, functions defined on one process are not available on other processes. Prepend a function definition with
@everywhere
to allow using the function on all processes:julia> @everywhere function myadd(a, b) a + b end; julia> myadd(1, 2) 3 # This would error without `@everywhere` above. julia> remotecall_fetch(myadd, 2, 3, 4) 7
Global variables are not shared, even if defined everywhere with
@everywhere
:julia> @everywhere x = [0]; julia> remotecall_fetch(2) do x[1] = 2 end; # `x` was modified on worker 2. julia> remotecall_fetch(() -> x, 2) 1-element Vector{Int64}: 2 # `x` was not modified on worker 3. julia> remotecall_fetch(() -> x, 3) 1-element Vector{Int64}: 0
If needed, an array of data can be shared across processes by using a
SharedArray
, provided by the SharedArrays standard library package:julia> @everywhere using SharedArrays # We don't need `@everywhere` when defining a `SharedArray`. julia> x = SharedArray{Int,1}(1) 1-element SharedVector{Int64}: 0 julia> remotecall_fetch(2) do x[1] = 2 end; julia> remotecall_fetch(() -> x, 2) 1-element SharedVector{Int64}: 2 julia> remotecall_fetch(() -> x, 3) 1-element SharedVector{Int64}: 2
Now, a note about command line arguments. When adding worker processes with -p
, those processes are spawned with the same command line arguments as the main Julia process. With addprocs
, however, each of those added processes are started with no command line arguments. Below is an example of where this behavior might cause some confusion:
$ JULIA_NUM_THREADS=4 julia --banner=no -t 1
julia> Threads.nthreads()
1
julia> using Distributed
julia> addprocs(1);
julia> remotecall_fetch(Threads.nthreads, 2)
4
In this situation, we have the environment variable JULIA_NUM_THREADS
(for example, because normally we run Julia with four threads). But in this particular case we want to run Julia with just one thread, so we set -t 1
. Then we add a process, but it turns out that process has four threads, not one! This is because the environment variable was set, but no command line arguments were given to the added process. To use just one thread for the added process, we would need to use the exeflags
keyword argument to addprocs
:
addprocs(1; exeflags = ["-t 1"])
As a final note, if needed, processes can be removed with rmprocs
, which removes the processes associated with the provided worker ids.
Summary
In this post, we have provided an introduction to parallel processing in Julia. We discussed the basics of both multi-threading and distributed computing, how to use them in Julia, and some things to watch out for.
As a parting piece of advice, when choosing whether to use multi-threading or distributed processing, choose multi-threading unless you have a specific need for multiple processes with distinct memory spaces. Multi-threading has lower overhead and generally is easier to use.
How do you use parallel processing in your code? Let us know in the comments below!
Additional Links
-
- Official Julia documentation on multi-threading.
Multi-processing and Distributed Computing
- Official Julia documentation on distributed computing.