Julia's Parallel Processing

Julia's Parallel Processing

Get to know a range of methods for parallel processing in Julia.

Julia is a relatively new, free, and open-source programming language. It has a syntax similar to that of other popular programming languages such as MATLAB and Python, but it boasts being able to achieve C-like speeds.

While serial Julia code can be fast, sometimes even more speed is desired. In many cases, writing parallel code can further reduce run time. Parallel code takes advantage of the multiple CPU cores included in modern computers, allowing multiple computations to run at the same time, or in parallel.

Julia provides two methods for writing parallel CPU code: multi-threading and distributed computing. This post will cover the basics of how to use these two methods of parallel processing.

This post assumes you already have Julia installed. If you haven't yet, check out our earlier post on how to install Julia.

Multi-Threading

First, let's learn about multi-threading.

To enable multi-threading, you must start Julia in one of two ways:

  1. Set the environment variable JULIA_NUM_THREADS to the number of threads Julia should use, and then start Julia. For example, JULIA_NUM_THREADS=4.

  2. Run Julia with the --threads (or -t) command line argument. For example, julia --threads 4 or julia -t 4.

After starting Julia (either with or without specifying the number of threads), the Threads module will be loaded. We can check the number of threads Julia has available:

julia> Threads.nthreads()
4

The simplest way to start writing parallel code is just to use the Threads.@threads macro. Inserting this macro before a for loop will cause the iterations of the loop to be split across the available threads, which will then operate in parallel. For example:

Threads.@threads for i = 1:10
    func(i)
end

Without Threads.@threads, first func(1) will run, then func(2), and so on. With the macro, and assuming we started Julia with four threads, first func(1), func(4), func(7), and func(9) will run in parallel. Then, when a thread's iteration finishes, it will start another iteration (assuming the loop is not done yet), regardless of whether the other threads have finished their iterations yet. Therefore, this loop will theoretically finish 10 iterations in the time it takes a single thread to do 3.

Note that Threads.@threads is blocking, meaning code after the threaded for loop will not run until the loop has finished.

Image of threaded for loop

threads_for

Julia also provides another macro for multi-threading: Threads.@spawn. This macro is more flexible than Threads.@threads because it can be used to run any code on a thread, not just for loops. But let's illustrate how to use Threads.@spawn by implementing the behavior of Threads.@threads:

# Function for splitting up `x` as evenly as possible
# across `np` partitions.
function partition(x, np)
    (len, rem) = divrem(length(x), np)
    Base.Generator(1:np) do p
        i1 = firstindex(x) + (p - 1) * len
        i2 = i1 + len - 1
        if p <= rem
            i1 += p - 1
            i2 += p
        else
            i1 += rem
            i2 += rem
        end
        chunk = x[i1:i2]
    end
end
N = 10
chunks = partition(1:10, Threads.nthreads())
tasks = map(chunks) do chunk
    Threads.@spawn for i in chunk
        func(i)
    end
end
wait.(tasks)

Let's walk through this code, assuming Threads.nthreads() == 4:

  • First, we split the 10 iterations evenly across the 4 threads using partition. So, chunks ends up being [1:3, 4:6, 7:8, 9:10]. (We could have hard-coded the partitioning, but now you have a nice partition function that can work with more complicated partitionings!)

  • Then, for each chunk, we create a Task via Threads.@spawn that will call func on each element of the chunk. This Task will be scheduled to run on an available thread. tasks contains a reference to each of these spawned Tasks.

  • Finally, we wait for the Tasks to finish with the wait function.

To reemphasize, note that Threads.@spawn creates a Task; it does not wait for the task to run. As such, it is non-blocking, and program execution continues as soon as the Task is returned. The code wrapped in the task will also run, but in parallel, on a separate thread. This behavior is illustrated below:

julia> Threads.@spawn (sleep(2); println("Spawned task finished"))
Task (runnable) @0x00007fdd4b10dc30

julia> 1 + 1 # This code executes without waiting for the above task to finish
2

julia> Spawned task finished # Prints 2 seconds after spawning the above task
julia>

Spawned tasks can also return data. While wait just waits for a task to finish, fetch waits for a task and then obtains the result:

julia> task = Threads.@spawn (sleep(2); 1 + 1)
Task (runnable) @0x00007fdd4a5e28b0

julia> fetch(task)
2

Thread Safety

When using multi-threading, memory is shared across threads. If a thread writes to a memory location that is written to or read from another thread, that will lead to a race condition with unpredictable results. To illustrate:

julia> s = 0;

julia> Threads.@threads for i = 1:1000000
           global s += i
       end

julia> s
19566554653 # Should be 500000500000

Race condition

race_condition

There are two methods we can use to avoid the race condition. The first involves using a lock:

julia> s = 0; l = ReentrantLock();

julia> Threads.@threads for i = 1:1000000
           lock(l) do
               global s += i
           end
       end

julia> s
500000500000

In this case, the addition can only occur on a given thread once that thread holds the lock. If a thread does not hold the lock, it must wait for whatever thread controls it to release the lock before it can run the code within the lock block.

Using a lock in this example is suboptimal, however, as it eliminates all parallelism because only one thread can hold the lock at any given moment. (In other examples, however, using a lock works great, particularly when only a small portion of the code depends on the lock.)

The other way to eliminate the race condition is to use task-local buffers:

julia> s = 0; chunks = partition(1:1000000, Threads.nthreads());

julia> tasks = map(chunks) do chunk
           Threads.@spawn begin
               x = 0
               for i in chunk
                   x += i
               end
               x
           end
       end;

julia> thread_sums = fetch.(tasks);

julia> for i in thread_sums
           s += i
       end

julia> s
500000500000

In this example, each spawned task has its own x that stores the sum of the values just in the task's chunk of data. In particular, none of the tasks modify s. Then, once each task has computed its sum, the intermediate values are summed and stored in s in a single-threaded manner.

Using task-local buffers works better for this example than using a lock because most of the parallelism is preserved.

(Note that it used to be advised to manage task-local buffers using the threadid function. However, doing so does not guarantee each task uses its own buffer. Therefore, the method demonstrated in the above example is now advised.)

Packages for Quickly Utilizing Multi-Threading

In addition to writing your own multi-threaded code, there exist packages that utilize multi-threading. Two such examples are ThreadsX.jl and ThreadTools.jl.

ThreadsX.jl provides multi-threaded implementations of several common functions such as sum and sort, while ThreadTools.jl provides tmap, a multi-threaded version of map.

These packages can be great for quickly boosting performance without having to figure out multi-threading on your own.

Distributed Computing

Besides multi-threading, Julia also provides for distributed computing, or splitting work across multiple Julia processes.

There are two ways to start multiple Julia processes:

  1. Load the Distributed standard library package with using Distributed and then use addprocs. For example, addprocs(2) to add two additional Julia processes (for a total of three).

  2. Run Julia with the -p command line argument. For example, julia -p 2 to start Julia with three total Julia processes. (Note that running Julia with -p will implicitly load Distributed.)

Added processes are known as worker processes, while the original process is the main process. Each process has an id: the main process has id 1, and worker processes have id 2, 3, etc.

By default, code runs on the main process. To run code on a worker, we need to explicitly give code to that worker. We can do so with remotecall_fetch, which takes as inputs a function to run, the process id to run the function on, and the input arguments and keyword arguments the function needs. Here are some examples:

# Create a zero-argument anonymous function to run on worker 2.
julia> remotecall_fetch(2) do
           println("Done")
       end
      From worker 2:    Done

# Create a two-argument anonymous function to run on worker 2.
julia> remotecall_fetch((a, b) -> a + b, 2, 1, 2)
3

# Run `sum([1 3; 2 4]; dims = 1)` on worker 3.
julia> remotecall_fetch(sum, 3, [1 3; 2 4]; dims = 1)
1x2 Matrix{Int64}:
 3  7

If you don't need to wait for the result immediately, use remotecall instead of remotecall_fetch. This will create a Future that you can later wait on or fetch (similarly to a Task spawned with Threads.@spawn).

Super computer

super_computer

Separate Memory Spaces

One significant difference between multi-threading and distributed processing is that memory is shared in multi-threading, while each distributed process has its own separate memory space. This has several important implications:

  • To use a package on a given worker, it must be loaded on that worker, not just on the main process. To illustrate:

      julia> using LinearAlgebra
    
      julia> I
      UniformScaling{Bool}
      true*I
    
      julia> remotecall_fetch(() -> I, 2)
      ERROR: On worker 2:
      UndefVarError: `I` not defined
    

    To avoid the error, we could use @everywhere using LinearAlgebra to load LinearAlgebra on all processes.

  • Similarly to the previous point, functions defined on one process are not available on other processes. Prepend a function definition with @everywhere to allow using the function on all processes:

      julia> @everywhere function myadd(a, b)
                 a + b
             end;
    
      julia> myadd(1, 2)
      3
    
      # This would error without `@everywhere` above.
      julia> remotecall_fetch(myadd, 2, 3, 4)
      7
    
  • Global variables are not shared, even if defined everywhere with @everywhere:

      julia> @everywhere x = [0];
    
      julia> remotecall_fetch(2) do
                 x[1] = 2
             end;
    
      # `x` was modified on worker 2.
      julia> remotecall_fetch(() -> x, 2)
      1-element Vector{Int64}:
       2
    
      # `x` was not modified on worker 3.
      julia> remotecall_fetch(() -> x, 3)
      1-element Vector{Int64}:
       0
    

    If needed, an array of data can be shared across processes by using a SharedArray, provided by the SharedArrays standard library package:

      julia> @everywhere using SharedArrays
    
      # We don't need `@everywhere` when defining a `SharedArray`.
      julia> x = SharedArray{Int,1}(1)
      1-element SharedVector{Int64}:
       0
    
      julia> remotecall_fetch(2) do
                 x[1] = 2
             end;
    
      julia> remotecall_fetch(() -> x, 2)
      1-element SharedVector{Int64}:
       2
    
      julia> remotecall_fetch(() -> x, 3)
      1-element SharedVector{Int64}:
       2
    

Now, a note about command line arguments. When adding worker processes with -p, those processes are spawned with the same command line arguments as the main Julia process. With addprocs, however, each of those added processes are started with no command line arguments. Below is an example of where this behavior might cause some confusion:

$ JULIA_NUM_THREADS=4 julia --banner=no -t 1
julia> Threads.nthreads()
1

julia> using Distributed

julia> addprocs(1);

julia> remotecall_fetch(Threads.nthreads, 2)
4

In this situation, we have the environment variable JULIA_NUM_THREADS (for example, because normally we run Julia with four threads). But in this particular case we want to run Julia with just one thread, so we set -t 1. Then we add a process, but it turns out that process has four threads, not one! This is because the environment variable was set, but no command line arguments were given to the added process. To use just one thread for the added process, we would need to use the exeflags keyword argument to addprocs:

addprocs(1; exeflags = ["-t 1"])

As a final note, if needed, processes can be removed with rmprocs, which removes the processes associated with the provided worker ids.

Summary

In this post, we have provided an introduction to parallel processing in Julia. We discussed the basics of both multi-threading and distributed computing, how to use them in Julia, and some things to watch out for.

As a parting piece of advice, when choosing whether to use multi-threading or distributed processing, choose multi-threading unless you have a specific need for multiple processes with distinct memory spaces. Multi-threading has lower overhead and generally is easier to use.

How do you use parallel processing in your code? Let us know in the comments below!

Additional Links