并行计算
对于多线程和并行计算的新手来说,首先了解Jullia所提供的 不同层级并行是非常有用的。这里我们主要将其分成三类:
- Julia 协程(绿色线程)
- 多线程
- 多核心或分布式处理
我们首先考虑 Julia 任务 [Task(也就是协程)]
共享数组使用系统共享内存将数组映射到多个进程上,尽管和 DArray
有点像,但其实际表现有很大不同。在 DArray
中,每个进程可以访问数据中的一块,但任意两个进程都不能共享同一块数据,而对于 SharedArray
,每个进程都可以访问整个数组。如果你想在一台机器上,让一大块数据能够被多个进程访问到,那么 SharedArray
是个不错的选择。
共享数组由 SharedArray
提供,必须在所有相关的 worker 中都显式地加载。
对 SharedArray
索引(访问和复制)操作就跟普通的数组一样,由于底层的内存对本地的进程是可见的,索引的效率很高,因此大多数单进程上的算法对 SharedArray
来说都是适用的,除非某些算法必须使用 Array
类型(此时可以通过调用 sdata
来获取 SharedArray
数组)。对于其它类型的 AbstractArray
类型数组来说,sdata
仅仅会返回数组本身,因此,可以放心地使用 sdata
对任意类型的 Array
进行操作。
共享数组可以通过以下形式构造:
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])
上面的代码会创建一个 N 维,类型为 T
,大小为 dims
的共享数组,通过 pids
指定可见的进程。与分布式数组不同的是,只有通过 pids
指定的 worker 才可见。
如果提供了一个类型为 initfn(S::SharedArray)
的 init
函数,那么所有相关的 worker 都会调用它。你可以让每个 worker 都在共享数组不同的地方执行 init
函数,从而实现并行初始化。
下面是个例子:
julia> using Distributed
julia> addprocs(3)
3-element Array{Int64,1}:
2
3
4
julia> @everywhere using SharedArrays
julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = myid())
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 3 4 4
julia> S[3,2] = 7
7
julia> S
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 7 4 4
SharedArrays.localindices
提供了一个以为的切片,可以很方便地用来将 task 分配到各个进程上。当然你可以按你想要的方式做区分:
julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = myid())
3×4 SharedArray{Int64,2}:
2 2 2 2
3 3 3 3
4 4 4 4
由于所有的进程都能够访问底层的数据,因此一定要小心避免出现冲突:
@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p)
end
end
end
上面的代码会导致不确定的结果,因为每个进程都将整个数组赋值为其 pid
,从而导致最后一个执行完成的进程会保留其 pid
。
考虑更复杂的一种情况:
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
这个例子中,如果首先将任务用按照一维的索引作区分,那么就会出问题:如果 q[i,j,t]
位于分配给某个 worker 的最后一个位置,而 q[i,j,t+1]
位于下一个 worker 的开始位置,那么后面这个 worker 开始计算的时候,可能 q[i,j,t]
还没有准备好,这时候,更好的做法是,手动分区,比如可以定义一个函数,按照 (irange,jrange)
给每个 worker 分配任务。
julia> @everywhere function myrange(q::SharedArray)
idx = indexpids(q)
if idx == 0 # This worker is not assigned a piece
return 1:0, 1:0
end
nchunks = length(procs(q))
splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
1:size(q,1), splits[idx]+1:splits[idx+1]
end
然后定义计算内核:
julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
@show (irange, jrange, trange) # display so we can see what's happening
for t in trange, j in jrange, i in irange
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
end
q
end
然后定义一个 wrapper:
julia> @everywhere advection_shared_chunk!(q, u) =
advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)
接下来,比较三个不同的版本,第一个是单进程版本:
julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);
然后是使用 @distributed
:
julia> function advection_parallel!(q, u)
for t = 1:size(q,3)-1
@sync @distributed for j = 1:size(q,2)
for i = 1:size(q,1)
q[i,j,t+1]= q[i,j,t] + u[i,j,t]
end
end
end
q
end;
最后是使用分区:
julia> function advection_shared!(q, u)
@sync begin
for p in procs(q)
@async remotecall_wait(advection_shared_chunk!, p, q, u)
end
end
q
end;
如果创建好了 SharedArray
之后,计算这些函数的执行时间,那么可以得到以下结果(用 julia -p 4
启动):
julia> q = SharedArray{Float64,3}((500,500,500));
julia> u = SharedArray{Float64,3}((500,500,500));
先执行一次以便 JIT 编译,然后用 @time
宏测试其第二次执行的时间:
julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
830.220 milliseconds (216 allocations: 13820 bytes)
julia> @time advection_parallel!(q, u);
2.495 seconds (3999 k allocations: 289 MB, 2.09% gc time)
julia> @time advection_shared!(q,u);
From worker 2: (irange,jrange,trange) = (1:500,1:125,1:499)
From worker 4: (irange,jrange,trange) = (1:500,251:375,1:499)
From worker 3: (irange,jrange,trange) = (1:500,126:250,1:499)
From worker 5: (irange,jrange,trange) = (1:500,376:500,1:499)
238.119 milliseconds (2264 allocations: 169 KB)
这里 advection_shared!
最大的优势在于,最小程度地降低了 woker 之间的通信,从而让每个 worker 能针对被分配的部分持续地计算一段时间。
共享数组与分布式垃圾回收
和远程引用一样,共享数组也依赖于创建节点上的垃圾回收来释放所有参与的 worker 上的引用。因此,创建大量生命周期比较短的数组,并尽可能快地显式 finilize 这些对象,代码会更高效,这样与之对用的内存和文件句柄都会更快地释放。
集群管理器
Julia 通过集群管理器实现对多个进程(所构成的逻辑上的集群)的启动,管理以及网络通信。一个 ClusterManager
负责:
- 在一个集群环境中启动 worker 进程
- 管理每个 worker 生命周期内的事件
- (可选),提供数据传输
一个 Julia 集群由以下特点:
- 初始进程,称为
master
,其id
为 1 - 只有 master 进程可以增加或删除 worker 进程
- 所有进程之间都可以直接通信
worker 之间的连接(用的是内置的 TCP/IP 传输)按照以下方式进行:
master 进程对一个
ClusterManager
对象调用addprocs
每个 worker 监听一个端口,然后将其 host 和 port 信息传给
stdout
集群管理器捕获
stdout
中每个 worker 的信息,并提供给 master 进程master 进程解析信息并与相应的 worker 建立 TCP/IP 连接
每个 worker 都会被通知集群中的其它 worker
每个 worker 与
id
小于自己的 worker 连接这样,一个网络就建立了,从而,每个 worker 都可以与其它 worker 建立连接
尽管默认的传输层使用的是 TCPSocket
,对于一个自定义的集群管理器来说,完全可以使用其它传输方式。
Julia 提供了两种内置的集群管理器:
LocalManager
,调用addprocs()
或addprocs(np::Integer)
时会用到。SSHManager
,调用addprocs(hostnames::Array)
时,传递一个 hostnames 的列表。
LocalManager
用来在同一个 host 上启动多个 worker,从而利用多核/多处理器硬件。
因此,一个最小的集群管理器需要:
[addprocs(manager::FooManager)
](http://127.0.0.5/@ref addprocs) 需要 FooManager
实现:
function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
作为一个例子,我们来看下 LocalManager
是怎么实现的:
struct LocalManager <: ClusterManager
np::Integer
end
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
launch
方法接收以下参数:
manager::ClusterManager
: 调用addprocs
时所用到的集群管理器params::Dict
: 所有的关键字参数都会传递到addprocs
中launched::Array
: 用来存储一个或多个WorkerConfig
c::Condition
: 在 workers 启动后被通知的条件变量
launch
会在一个异步的task中调用,该 task 结束之后,意味着所有请求的 worker 都已经启动好了。因此,launch
函数必须在所有 worker 启动之后,尽快退出。
新启动的 worker 之间采用的是多对多的连接方式。在命令行中指定参数 --worker[=<cookie>]
会让所有启动的进程把自己当作 worker,然后通过 TCP/IP 构建连接。
集群中所有的 worker 默认使用同一个 master 的 [cookie](http://127.0.0.5/@ref man-cluster-cookie)。如果 cookie 没有指定,(比如没有通过 --worker
指定),那么 worker 会尝试从它的标准输入中读取。LocalManager
和 SSHManager
都是通过标准输入来将 cookie 传递给新启动的 worker。
默认情况下,一个 worker 会监听从 getipaddr()
函数返回的地址上的一个开放端口。若要指定监听的地址,可以通过额外的参数 --bind-to bind_addr[:port]
指定,这对于多 host 的情况来说很方便。
对于非 TCP/IP 传输,可以选择 MPI 作为一种实现,此时一定不要指定 --worker
参数,另外,新启动的 worker 必须调用 init_worker(cookie)
之后再使用并行的结构体。
对于每个已经启动的 worker,launch
方法必须往 launched
中添加一个 WorkerConfig
对象(相应的值已经初始化)。
mutable struct WorkerConfig
# Common fields relevant to all cluster managers
io::Union{IO, Nothing}
host::Union{AbstractString, Nothing}
port::Union{Integer, Nothing}
# Used when launching additional workers at a host
count::Union{Int, Symbol, Nothing}
exename::Union{AbstractString, Cmd, Nothing}
exeflags::Union{Cmd, Nothing}
# External cluster managers can use this to store information at a per-worker level
# Can be a dict if multiple fields need to be stored.
userdata::Any
# SSHManager / SSH tunnel connections to workers
tunnel::Union{Bool, Nothing}
bind_addr::Union{AbstractString, Nothing}
sshflags::Union{Cmd, Nothing}
max_parallel::Union{Integer, Nothing}
# Used by Local/SSH managers
connect_at::Any
[...]
end
WorkerConfig
中的大多数字段都是内置的集群管理器会用到,对于自定义的管理器,通常只需要指定 io
或 host
/port
:
如果指定了
io
,那么就会用来读取 host/port 信息。每个 worker 会在启动时打印地址和端口,这样 worker 就可以自由监听可用的端口,而不必手动配置 worker 的端口。如果
io
没有指定,那么host
和port
就会用来连接。count
,exename
和exeflags
用于从一个 worker 上启动额外的 worker。例如,一个集群管理器可能对每个节点都只启动一个 worker,然后再用它来启动额外的 worker。count
可以是一个整数n
,用来指定启动n
个 workercount
还可以是:auto
,用来启动跟那台机器上 CPU 个数(逻辑上的核的个数)相同的 workerexename
是julia
可执行文件的全路径exeflags
应该设置成传递给将要启动的 worker 命令行参数
tunnel
,bind_addr
,sshflags
和max_parallel
会在从 worker 与 master 进程建立 ssh 隧道时用到userdata
用来提供给自定义集群管理器存储自己的 worker 相关的信息
manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
会在一个 worker 生命周期中的不同时刻被调用,其中 op 的值可能是:
:register
/:deregister
,从 Julia 的 worker 池子中添加/删除一个 worker:interrupt
,当interrupt(workers)
被调用是,此时,ClusterManager
应该给相应的 worker 发送终端信号:finalize
,用于清理操作。
自定义集群管理器的传输方式
将默认的 TCP/IP 多对多 socket 连接替换成一个自定义的传输层需要做很多工作。每个 Julia 进程都有与其连接的 worker 数量相同的通信 task。例如,在一个有 32 个进程的多对多集群中:
每个进程都有31个通信task
每个 task 在一个消息处理循环中从一个远端 worker 读取所有的输入信息
每个消息处理循环等待一个
IO
对象(比如,在默认实现中是一个TCPSocket
),然后读取整个信息,处理,等待下一个发送消息则可以直接在任意 Julia task 中完成,而不只是通信 task,同样,也是通过相应的
IO
对象
要替换默认的传输方式,需要新的实现能够在远程 worker 之间建立连接,同时提供一个可以用来被消息处理循环等待的 IO
对象。集群管理器的回调函数需要实现如下函数:
connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)
默认的实现(使用的是 TCP/IP socket)是 connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)
。
connect
需要返回一对 IO
对象,一个用于从 pid
worker 读取数据,另一个用于往 pid
写数据。自定义的集群管理器可以用内存中的 BUfferStream
作为一个管道将自定义的(很可能是非 IO
的)传输与 Julia 内置的并行基础设施衔接起来。
BufferStream
是一个内存中的 IOBuffer
,其表现很像 IO
,就是一个流(stream),可以异步地处理。
在 Examples repository 的 clustermanager/0mq
目录中,包含一个使用 ZeroMQ 连接 Julia worker 的例子,用的是星型拓补结构。需要注意的是:Julia 的进程仍然是逻辑上相互连接的,任意 worker 都可以与其它 worker 直接相连而无需感知到 0MQ 作为传输层的存在。
在使用自定义传输的时候:
Julia 的 workers 必须不能通过
--worker
启动。如果启动的时候使用了--worker
,那么新启动的 worker 会默认使用基于 TCP/IP socket 的实现对于每个 worker 逻辑上的输入连接,必须调用
Base.process_messages(rd::IO, wr::IO)()
,这会创建一个新的 task 来处理 worker 消息的读写init_worker(cookie, manager::FooManager)
必须作为 worker 进程初始化的一部分呢被调用WorkerConfig
中的connect_at::Any
字段可以被集群管理器在调用launch
的时候设置,该字段的值会发送到所有的connect
回调中。通常,其中包含的是如何连接到一个 worker 的信息。例如,在 TCP/IP socket 传输中,用这个字段存储(host, port)
来声明如何连接到一个 worker。
kill(manager, pid, config)
用来从一个集群中删除一个 worker,在 master 进程中,对应的 IO
对象必须通过对应的实现来关闭,从而保证正确地释放资源。默认的实现简单地对指定的远端 worker 执行 exit()
即可。
在例子目录中,clustermanager/simple
展示了一个简单地实现,使用的是 UNIX 下的 socket。
LocalManager 和 SSHManager 的网络要求
Julia 集群设计的时候,默认是在一个安全的环境中执行,比如本地的笔记本,部门的集群,甚至是云端。这部分将介绍 LocalManager
和 SSHManager
的网络安全要点:
master 进程不监听任何端口,它只负责向外连接 worker
每个 worker 都只绑定一个本地的接口,同时监听一个操作系统分配的临时端口。
addprocs(N)
使用的LocalManager
,默认只会绑定到回环接口(loopback interface),这就意味着,之后在远程主机上(恶意)启动的 worker 无法连接到集群中,在执行addprocs(4)
之后,又跟一个addprocs(["remote_host"])
会失败。有些用户可能希望创建一个集群同时管理本地系统和几个远端系统,这可以通过在绑定LocalManager
到外部网络接口的时候,指定一个restrict
参数:addprocs(4; restrict=false)
addprocs(list_of_remote_hosts)
使用的SSHManager
会通过 SSH 启动远程机上的 worker。 默认 SSH 只会用来启动 Julia 的 worker。随后的 master-worker 和 worker-worker 连接使用的是普通的、未加密的 TCP/IP 通信。 远程机必须开启免密登陆。 额外的 SSH 标记或认证信息会通过关键字参数sshflags
指定。addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>)
在我们希望给 master-worker 也使用 SSH 连接的时候很有用。 一个典型的场景是本地的笔记本 运行 Julia ERPL (做为 master)和云上的其他机器,比如 Amazon EC2,构成集群。 这时候远程机器只要开启 22 端口就可以,然后要有 SSH 客户端 通过公约基础设施(PKI)认证过。授权信息可以通过sshflags
生效,比如sshflags=`-i <keyfile>`
。在一个所有节点联通的拓扑网中(默认情况下是这样的),所有的 worker 节点都通过普通 TCP socket 通信互相连接。 这样集群的安全策略就必须允许 worker 节点间 通过操作系统分配的临时端口范围自由连接。
所有 worker-worker 间(都是 SSH)的安全和加密或者信息的加密 都可以通过自定义
ClusterManager
完成。
[集群 Cookie](@id man-cluster-cookie)
集群上所有的进程都共享同一个 cookie,默认是 master 进程随机生成的字符串。
cluster_cookie()
返回 cookie,而cluster_cookie(cookie)()
设置并返回新的 cookie。所有的连接都进行双向认证,从而保证只有 master 启动的 worker 才能相互连接。
cookie 可以在 worker 启动的时候,通过参数
--worker=<cookie>
指定,如果参数--worker
没有指定 cookie,那么 worker 会从它的标准输入中 (stdin
) 读取,stdin
会在 cookie 获取之后立即关闭。ClusterManager
可以通过cluster_cookie()
从 master 中过去 cookie,不适用默认 TCP/IP 传输的集群管理器(即没有指定--worker
)必须用于 master 相同的 cookie 调用init_worker(cookie, manager)
。
注意,在对安全性要求很高的环境中,可以通过自定义 ClusterManager
实现。例如,cookie 可以提前共享,然后不必再启动参数中指定。
指定网络拓补结构(实验性功能)
可以通过传递到 addprocs
中的参数 topology
来指定 worker 之间如何连接。
:all_to_all
,默认的,所有 worker 之间相互都连接:master_worker
,只有主进程,即pid
为 1 的进程能够与 worker 建立连接:custom
: 集群管理器的launch
方法通过WorkerConfig
中的ident
和connect_idents
指定连接的拓补结构。一个 worker 通过集群管理器提供的ident
来连接到所有connect_idents
指定的 worker。
关键字参数 lazy=true|false
只会影响 topology
选项中的 :all_to_all
。如果是 true
,那么集群启动的时候 master 会连接所有的 worker,然后 worker 之间的特定连接会在初次唤醒的是建立连接,这有利于降低集群初始化的时候对资源的分配。lazy
的默认值是 true
。
目前,在没有建立连接的两个 worker 之间传递消息会出错,目前该行为是实验性的,未来的版本中可能会改变。
一些值得关注的外部库
除了 Julia 自带的并行机制之外,还有许多外部的库值得一提。例如 MPI.jl 提供了一个 MPI
协议的 Julia 的封装,或者是在 共享数组 提到的 DistributedArrays.jl,此外尤其值得一提的是 Julia 的 GPU 编程生态,其包括:
底层(C内核)的 OpenCL.jl 和 CUDAdrv.jl,分别提供了 OpenCL 和 CUDA 的封装。
底层(Julia 内核)的接口,如 CUDAnative.jl,提供了 Julia 原生的 CUDA 实现。
高层的特定抽象,如 CuArrays.jl 和 CLArrays.jl。
高层的库,如 ArrayFire.jl 和 GPUArrays.jl。
下面的例子将介绍如何用 DistributedArrays.jl
和 CuArrays.jl
通过 distribute()
和 CuArray()
将数组分配到多个进程。
记住在载入 DistributedArrays.jl
时,需要用 @everywhere
将其载入到多个进程中。
$ ./julia -p 4
julia> addprocs()
julia> @everywhere using DistributedArrays
julia> using CuArrays
julia> B = ones(10_000) ./ 2;
julia> A = ones(10_000) .* π;
julia> C = 2 .* A ./ B;
julia> all(C .≈ 4*π)
true
julia> typeof(C)
Array{Float64,1}
julia> dB = distribute(B);
julia> dA = distribute(A);
julia> dC = 2 .* dA ./ dB;
julia> all(dC .≈ 4*π)
true
julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}
julia> cuB = CuArray(B);
julia> cuA = CuArray(A);
julia> cuC = 2 .* cuA ./ cuB;
julia> all(cuC .≈ 4*π);
true
julia> typeof(cuC)
CuArray{Float64,1}
要牢记,当前一些 Julia 的特性并没有被 CUDAnative.jl [^2] 支持,尤其是一些像 sin
之类的函数需要换成 CUDAnative.sin
(cc: @maleadt)。
下面的例子中,通过 DistributedArrays.jl
和 CuArrays.jl
将一个数组分配到多个进程,然后调用一个函数。
function power_method(M, v)
for i in 1:100
v = M*v
v /= norm(v)
end
return v, norm(M*v) / norm(v) # or (M*v) ./ v
end
power_method
重复创建一个新的向量然后对其归一化,这里并没有在函数中指定类型信息,来看看是否对前面提到的类型适用:
julia> M = [2. 1; 1 1];
julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877
julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)
julia> cuM = CuArray(M);
julia> cuv = CuArray(v);
julia> curesult = power_method(cuM, cuv);
julia> typeof(curesult)
CuArray{Float64,1}
julia> dM = distribute(M);
julia> dv = distribute(v);
julia> dC = power_method(dM, dv);
julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}
最后,我们来看看 MPI.jl
,这个库时 Julia 对 MPI 协议的封装。一一介绍其中的每个函数太累赘了,这里领会其实现协议的方法就够了。
考虑下面这个简单的脚本,它做的只是调用每个子进程,然后初始化其 rank,然后在 master 访问时,对 rank 求和。
import MPI
MPI.Init()
comm = MPI.COMM_WORLD
MPI.Barrier(comm)
root = 0
r = MPI.Comm_rank(comm)
sr = MPI.Reduce(r, MPI.SUM, root, comm)
if(MPI.Comm_rank(comm) == root)
@printf("sum of ranks: %s\n", sr)
end
MPI.Finalize()
mpirun -np 4 ./julia example.jl
[^1]: In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access (RMA). The motivation for adding rma to the MPI standard was to facilitate one-sided communication patterns. For additional information on the latest MPI standard, see https://mpi-forum.org/docs.
[^2]: Julia GPU 手册
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论