返回介绍

并行计算

发布于 2019-07-03 15:53:51 字数 22743 浏览 1115 评论 0 收藏 0

对于多线程和并行计算的新手来说,首先了解Jullia所提供的 不同层级并行是非常有用的。这里我们主要将其分成三类:

  1. Julia 协程(绿色线程)
  2. 多线程
  3. 多核心或分布式处理

我们首先考虑 Julia 任务 [Task(也就是协程)]

共享数组使用系统共享内存将数组映射到多个进程上,尽管和 DArray 有点像,但其实际表现有很大不同。在 DArray 中,每个进程可以访问数据中的一块,但任意两个进程都不能共享同一块数据,而对于 SharedArray,每个进程都可以访问整个数组。如果你想在一台机器上,让一大块数据能够被多个进程访问到,那么 SharedArray 是个不错的选择。

共享数组由 SharedArray 提供,必须在所有相关的 worker 中都显式地加载。

SharedArray 索引(访问和复制)操作就跟普通的数组一样,由于底层的内存对本地的进程是可见的,索引的效率很高,因此大多数单进程上的算法对 SharedArray 来说都是适用的,除非某些算法必须使用 Array 类型(此时可以通过调用 sdata 来获取 SharedArray 数组)。对于其它类型的 AbstractArray 类型数组来说,sdata 仅仅会返回数组本身,因此,可以放心地使用 sdata 对任意类型的 Array 进行操作。

共享数组可以通过以下形式构造:

SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])

上面的代码会创建一个 N 维,类型为 T,大小为 dims 的共享数组,通过 pids 指定可见的进程。与分布式数组不同的是,只有通过 pids 指定的 worker 才可见。

如果提供了一个类型为 initfn(S::SharedArray)init 函数,那么所有相关的 worker 都会调用它。你可以让每个 worker 都在共享数组不同的地方执行 init 函数,从而实现并行初始化。

下面是个例子:

julia> using Distributed

julia> addprocs(3)
3-element Array{Int64,1}:
 2
 3
 4

julia> @everywhere using SharedArrays

julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = myid())
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  3  4  4

julia> S[3,2] = 7
7

julia> S
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  7  4  4

SharedArrays.localindices 提供了一个以为的切片,可以很方便地用来将 task 分配到各个进程上。当然你可以按你想要的方式做区分:

julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = myid())
3×4 SharedArray{Int64,2}:
 2  2  2  2
 3  3  3  3
 4  4  4  4

由于所有的进程都能够访问底层的数据,因此一定要小心避免出现冲突:

@sync begin
    for p in procs(S)
        @async begin
            remotecall_wait(fill!, p, S, p)
        end
    end
end

上面的代码会导致不确定的结果,因为每个进程都将整个数组赋值为其 pid,从而导致最后一个执行完成的进程会保留其 pid

考虑更复杂的一种情况:

q[i,j,t+1] = q[i,j,t] + u[i,j,t]

这个例子中,如果首先将任务用按照一维的索引作区分,那么就会出问题:如果 q[i,j,t] 位于分配给某个 worker 的最后一个位置,而 q[i,j,t+1] 位于下一个 worker 的开始位置,那么后面这个 worker 开始计算的时候,可能 q[i,j,t] 还没有准备好,这时候,更好的做法是,手动分区,比如可以定义一个函数,按照 (irange,jrange) 给每个 worker 分配任务。

julia> @everywhere function myrange(q::SharedArray)
           idx = indexpids(q)
           if idx == 0 # This worker is not assigned a piece
               return 1:0, 1:0
           end
           nchunks = length(procs(q))
           splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
           1:size(q,1), splits[idx]+1:splits[idx+1]
       end

然后定义计算内核:

julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
           @show (irange, jrange, trange)  # display so we can see what's happening
           for t in trange, j in jrange, i in irange
               q[i,j,t+1] = q[i,j,t] + u[i,j,t]
           end
           q
       end

然后定义一个 wrapper:

julia> @everywhere advection_shared_chunk!(q, u) =
           advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)

接下来,比较三个不同的版本,第一个是单进程版本:

julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);

然后是使用 @distributed:

julia> function advection_parallel!(q, u)
           for t = 1:size(q,3)-1
               @sync @distributed for j = 1:size(q,2)
                   for i = 1:size(q,1)
                       q[i,j,t+1]= q[i,j,t] + u[i,j,t]
                   end
               end
           end
           q
       end;

最后是使用分区:

julia> function advection_shared!(q, u)
           @sync begin
               for p in procs(q)
                   @async remotecall_wait(advection_shared_chunk!, p, q, u)
               end
           end
           q
       end;

如果创建好了 SharedArray 之后,计算这些函数的执行时间,那么可以得到以下结果(用 julia -p 4 启动):

julia> q = SharedArray{Float64,3}((500,500,500));

julia> u = SharedArray{Float64,3}((500,500,500));

先执行一次以便 JIT 编译,然后用 @time 宏测试其第二次执行的时间:

julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
 830.220 milliseconds (216 allocations: 13820 bytes)

julia> @time advection_parallel!(q, u);
   2.495 seconds      (3999 k allocations: 289 MB, 2.09% gc time)

julia> @time advection_shared!(q,u);
        From worker 2:       (irange,jrange,trange) = (1:500,1:125,1:499)
        From worker 4:       (irange,jrange,trange) = (1:500,251:375,1:499)
        From worker 3:       (irange,jrange,trange) = (1:500,126:250,1:499)
        From worker 5:       (irange,jrange,trange) = (1:500,376:500,1:499)
 238.119 milliseconds (2264 allocations: 169 KB)

这里 advection_shared! 最大的优势在于,最小程度地降低了 woker 之间的通信,从而让每个 worker 能针对被分配的部分持续地计算一段时间。

共享数组与分布式垃圾回收

和远程引用一样,共享数组也依赖于创建节点上的垃圾回收来释放所有参与的 worker 上的引用。因此,创建大量生命周期比较短的数组,并尽可能快地显式 finilize 这些对象,代码会更高效,这样与之对用的内存和文件句柄都会更快地释放。

集群管理器

Julia 通过集群管理器实现对多个进程(所构成的逻辑上的集群)的启动,管理以及网络通信。一个 ClusterManager 负责:

  • 在一个集群环境中启动 worker 进程
  • 管理每个 worker 生命周期内的事件
  • (可选),提供数据传输

一个 Julia 集群由以下特点:

  • 初始进程,称为 master,其 id 为 1
  • 只有 master 进程可以增加或删除 worker 进程
  • 所有进程之间都可以直接通信

worker 之间的连接(用的是内置的 TCP/IP 传输)按照以下方式进行:

  • master 进程对一个 ClusterManager 对象调用 addprocs

  • addprocs 调用对应的 launch 方法,然后在对应的机器上启动相应数量的 worker 进程

  • 每个 worker 监听一个端口,然后将其 host 和 port 信息传给 stdout

  • 集群管理器捕获 stdout 中每个 worker 的信息,并提供给 master 进程

  • master 进程解析信息并与相应的 worker 建立 TCP/IP 连接

  • 每个 worker 都会被通知集群中的其它 worker

  • 每个 worker 与 id 小于自己的 worker 连接

  • 这样,一个网络就建立了,从而,每个 worker 都可以与其它 worker 建立连接

尽管默认的传输层使用的是 TCPSocket,对于一个自定义的集群管理器来说,完全可以使用其它传输方式。

Julia 提供了两种内置的集群管理器:

LocalManager 用来在同一个 host 上启动多个 worker,从而利用多核/多处理器硬件。

因此,一个最小的集群管理器需要:

  • 是一个 ClusterManager 抽象类的一个子类
  • 实现 launch 接口,用来启动新的 worker
  • 实现 manage,在一个 worker 的生命周期中多次被调用(例如,发送中断信号)

[addprocs(manager::FooManager)](http://127.0.0.5/@ref addprocs) 需要 FooManager 实现:

function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

作为一个例子,我们来看下 LocalManager 是怎么实现的:

struct LocalManager <: ClusterManager
    np::Integer
end

function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

launch 方法接收以下参数:

  • manager::ClusterManager: 调用 addprocs 时所用到的集群管理器
  • params::Dict: 所有的关键字参数都会传递到 addprocs
  • launched::Array: 用来存储一个或多个 WorkerConfig
  • c::Condition: 在 workers 启动后被通知的条件变量

launch 会在一个异步的task中调用,该 task 结束之后,意味着所有请求的 worker 都已经启动好了。因此,launch 函数必须在所有 worker 启动之后,尽快退出。

新启动的 worker 之间采用的是多对多的连接方式。在命令行中指定参数 --worker[=<cookie>] 会让所有启动的进程把自己当作 worker,然后通过 TCP/IP 构建连接。

集群中所有的 worker 默认使用同一个 master 的 [cookie](http://127.0.0.5/@ref man-cluster-cookie)。如果 cookie 没有指定,(比如没有通过 --worker 指定),那么 worker 会尝试从它的标准输入中读取。LocalManagerSSHManager 都是通过标准输入来将 cookie 传递给新启动的 worker。

默认情况下,一个 worker 会监听从 getipaddr() 函数返回的地址上的一个开放端口。若要指定监听的地址,可以通过额外的参数 --bind-to bind_addr[:port] 指定,这对于多 host 的情况来说很方便。

对于非 TCP/IP 传输,可以选择 MPI 作为一种实现,此时一定不要指定 --worker 参数,另外,新启动的 worker 必须调用 init_worker(cookie) 之后再使用并行的结构体。

对于每个已经启动的 worker,launch 方法必须往 launched 中添加一个 WorkerConfig 对象(相应的值已经初始化)。

mutable struct WorkerConfig
    # Common fields relevant to all cluster managers
    io::Union{IO, Nothing}
    host::Union{AbstractString, Nothing}
    port::Union{Integer, Nothing}

    # Used when launching additional workers at a host
    count::Union{Int, Symbol, Nothing}
    exename::Union{AbstractString, Cmd, Nothing}
    exeflags::Union{Cmd, Nothing}

    # External cluster managers can use this to store information at a per-worker level
    # Can be a dict if multiple fields need to be stored.
    userdata::Any

    # SSHManager / SSH tunnel connections to workers
    tunnel::Union{Bool, Nothing}
    bind_addr::Union{AbstractString, Nothing}
    sshflags::Union{Cmd, Nothing}
    max_parallel::Union{Integer, Nothing}

    # Used by Local/SSH managers
    connect_at::Any

    [...]
end

WorkerConfig 中的大多数字段都是内置的集群管理器会用到,对于自定义的管理器,通常只需要指定 iohost/port:

  • 如果指定了 io,那么就会用来读取 host/port 信息。每个 worker 会在启动时打印地址和端口,这样 worker 就可以自由监听可用的端口,而不必手动配置 worker 的端口。

  • 如果 io 没有指定,那么 hostport 就会用来连接。

  • countexenameexeflags 用于从一个 worker 上启动额外的 worker。例如,一个集群管理器可能对每个节点都只启动一个 worker,然后再用它来启动额外的 worker。

    • count 可以是一个整数 n,用来指定启动 n 个 worker
    • count 还可以是 :auto,用来启动跟那台机器上 CPU 个数(逻辑上的核的个数)相同的 worker
    • exenamejulia 可执行文件的全路径
    • exeflags 应该设置成传递给将要启动的 worker 命令行参数
  • tunnel, bind_addr, sshflagsmax_parallel 会在从 worker 与 master 进程建立 ssh 隧道时用到

  • userdata 用来提供给自定义集群管理器存储自己的 worker 相关的信息

manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol) 会在一个 worker 生命周期中的不同时刻被调用,其中 op 的值可能是:

  • :register/:deregister,从 Julia 的 worker 池子中添加/删除一个 worker

  • :interrupt,当 interrupt(workers) 被调用是,此时,ClusterManager 应该给相应的 worker 发送终端信号

  • :finalize,用于清理操作。

自定义集群管理器的传输方式

将默认的 TCP/IP 多对多 socket 连接替换成一个自定义的传输层需要做很多工作。每个 Julia 进程都有与其连接的 worker 数量相同的通信 task。例如,在一个有 32 个进程的多对多集群中:

  • 每个进程都有31个通信task

  • 每个 task 在一个消息处理循环中从一个远端 worker 读取所有的输入信息

  • 每个消息处理循环等待一个 IO 对象(比如,在默认实现中是一个 TCPSocket),然后读取整个信息,处理,等待下一个

  • 发送消息则可以直接在任意 Julia task 中完成,而不只是通信 task,同样,也是通过相应的 IO 对象

要替换默认的传输方式,需要新的实现能够在远程 worker 之间建立连接,同时提供一个可以用来被消息处理循环等待的 IO 对象。集群管理器的回调函数需要实现如下函数:

connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)

默认的实现(使用的是 TCP/IP socket)是 connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)

connect 需要返回一对 IO 对象,一个用于从 pid worker 读取数据,另一个用于往 pid 写数据。自定义的集群管理器可以用内存中的 BUfferStream 作为一个管道将自定义的(很可能是非 IO 的)传输与 Julia 内置的并行基础设施衔接起来。

BufferStream 是一个内存中的 IOBuffer,其表现很像 IO,就是一个(stream),可以异步地处理。

Examples repositoryclustermanager/0mq 目录中,包含一个使用 ZeroMQ 连接 Julia worker 的例子,用的是星型拓补结构。需要注意的是:Julia 的进程仍然是逻辑上相互连接的,任意 worker 都可以与其它 worker 直接相连而无需感知到 0MQ 作为传输层的存在。

在使用自定义传输的时候:

  • Julia 的 workers 必须不能通过 --worker 启动。如果启动的时候使用了 --worker,那么新启动的 worker 会默认使用基于 TCP/IP socket 的实现

  • 对于每个 worker 逻辑上的输入连接,必须调用 Base.process_messages(rd::IO, wr::IO)(),这会创建一个新的 task 来处理 worker 消息的读写

  • init_worker(cookie, manager::FooManager) 必须作为 worker 进程初始化的一部分呢被调用

  • WorkerConfig中的 connect_at::Any 字段可以被集群管理器在调用 launch 的时候设置,该字段的值会发送到所有的 connect 回调中。通常,其中包含的是如何连接到一个 worker 的信息。例如,在 TCP/IP socket 传输中,用这个字段存储 (host, port) 来声明如何连接到一个 worker。

kill(manager, pid, config) 用来从一个集群中删除一个 worker,在 master 进程中,对应的 IO 对象必须通过对应的实现来关闭,从而保证正确地释放资源。默认的实现简单地对指定的远端 worker 执行 exit() 即可。

在例子目录中,clustermanager/simple 展示了一个简单地实现,使用的是 UNIX 下的 socket。

LocalManager 和 SSHManager 的网络要求

Julia 集群设计的时候,默认是在一个安全的环境中执行,比如本地的笔记本,部门的集群,甚至是云端。这部分将介绍 LocalManagerSSHManager 的网络安全要点:

  • master 进程不监听任何端口,它只负责向外连接 worker

  • 每个 worker 都只绑定一个本地的接口,同时监听一个操作系统分配的临时端口。

  • addprocs(N) 使用的 LocalManager,默认只会绑定到回环接口(loopback interface),这就意味着,之后在远程主机上(恶意)启动的 worker 无法连接到集群中,在执行 addprocs(4) 之后,又跟一个 addprocs(["remote_host"]) 会失败。有些用户可能希望创建一个集群同时管理本地系统和几个远端系统,这可以通过在绑定 LocalManager 到外部网络接口的时候,指定一个 restrict 参数:addprocs(4; restrict=false)

  • addprocs(list_of_remote_hosts) 使用的 SSHManager 会通过 SSH 启动远程机上的 worker。 默认 SSH 只会用来启动 Julia 的 worker。随后的 master-worker 和 worker-worker 连接使用的是普通的、未加密的 TCP/IP 通信。 远程机必须开启免密登陆。 额外的 SSH 标记或认证信息会通过关键字参数 sshflags 指定。

  • addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>) 在我们希望给 master-worker 也使用 SSH 连接的时候很有用。 一个典型的场景是本地的笔记本 运行 Julia ERPL (做为 master)和云上的其他机器,比如 Amazon EC2,构成集群。 这时候远程机器只要开启 22 端口就可以,然后要有 SSH 客户端 通过公约基础设施(PKI)认证过。授权信息可以通过 sshflags 生效,比如 sshflags=`-i <keyfile>`

    在一个所有节点联通的拓扑网中(默认情况下是这样的),所有的 worker 节点都通过普通 TCP socket 通信互相连接。 这样集群的安全策略就必须允许 worker 节点间 通过操作系统分配的临时端口范围自由连接。

    所有 worker-worker 间(都是 SSH)的安全和加密或者信息的加密 都可以通过自定义 ClusterManager 完成。

[集群 Cookie](@id man-cluster-cookie)

集群上所有的进程都共享同一个 cookie,默认是 master 进程随机生成的字符串。

  • cluster_cookie() 返回 cookie,而 cluster_cookie(cookie)() 设置并返回新的 cookie。

  • 所有的连接都进行双向认证,从而保证只有 master 启动的 worker 才能相互连接。

  • cookie 可以在 worker 启动的时候,通过参数 --worker=<cookie> 指定,如果参数 --worker 没有指定 cookie,那么 worker 会从它的标准输入中 (stdin) 读取, stdin 会在 cookie 获取之后立即关闭。

  • ClusterManager 可以通过 cluster_cookie() 从 master 中过去 cookie,不适用默认 TCP/IP 传输的集群管理器(即没有指定 --worker)必须用于 master 相同的 cookie 调用 init_worker(cookie, manager)

注意,在对安全性要求很高的环境中,可以通过自定义 ClusterManager 实现。例如,cookie 可以提前共享,然后不必再启动参数中指定。

指定网络拓补结构(实验性功能)

可以通过传递到 addprocs 中的参数 topology 来指定 worker 之间如何连接。

  • :all_to_all,默认的,所有 worker 之间相互都连接
  • :master_worker,只有主进程,即 pid 为 1 的进程能够与 worker 建立连接
  • :custom: 集群管理器的 launch 方法通过 WorkerConfig 中的 identconnect_idents 指定连接的拓补结构。一个 worker 通过集群管理器提供的 ident 来连接到所有 connect_idents 指定的 worker。

关键字参数 lazy=true|false 只会影响 topology 选项中的 :all_to_all。如果是 true,那么集群启动的时候 master 会连接所有的 worker,然后 worker 之间的特定连接会在初次唤醒的是建立连接,这有利于降低集群初始化的时候对资源的分配。lazy 的默认值是 true

目前,在没有建立连接的两个 worker 之间传递消息会出错,目前该行为是实验性的,未来的版本中可能会改变。

一些值得关注的外部库

除了 Julia 自带的并行机制之外,还有许多外部的库值得一提。例如 MPI.jl 提供了一个 MPI 协议的 Julia 的封装,或者是在 共享数组 提到的 DistributedArrays.jl,此外尤其值得一提的是 Julia 的 GPU 编程生态,其包括:

  1. 底层(C内核)的 OpenCL.jlCUDAdrv.jl,分别提供了 OpenCL 和 CUDA 的封装。

  2. 底层(Julia 内核)的接口,如 CUDAnative.jl,提供了 Julia 原生的 CUDA 实现。

  3. 高层的特定抽象,如 CuArrays.jlCLArrays.jl

  4. 高层的库,如 ArrayFire.jlGPUArrays.jl

下面的例子将介绍如何用 DistributedArrays.jlCuArrays.jl 通过 distribute()CuArray() 将数组分配到多个进程。

记住在载入 DistributedArrays.jl 时,需要用 @everywhere 将其载入到多个进程中。

$ ./julia -p 4

julia> addprocs()

julia> @everywhere using DistributedArrays

julia> using CuArrays

julia> B = ones(10_000) ./ 2;

julia> A = ones(10_000) .* π;

julia> C = 2 .* A ./ B;

julia> all(C .≈ 4*π)
true

julia> typeof(C)
Array{Float64,1}

julia> dB = distribute(B);

julia> dA = distribute(A);

julia> dC = 2 .* dA ./ dB;

julia> all(dC .≈ 4*π)
true

julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}

julia> cuB = CuArray(B);

julia> cuA = CuArray(A);

julia> cuC = 2 .* cuA ./ cuB;

julia> all(cuC .≈ 4*π);
true

julia> typeof(cuC)
CuArray{Float64,1}

要牢记,当前一些 Julia 的特性并没有被 CUDAnative.jl [^2] 支持,尤其是一些像 sin 之类的函数需要换成 CUDAnative.sin(cc: @maleadt)。

下面的例子中,通过 DistributedArrays.jlCuArrays.jl 将一个数组分配到多个进程,然后调用一个函数。

function power_method(M, v)
    for i in 1:100
        v = M*v
        v /= norm(v)
    end

    return v, norm(M*v) / norm(v)  # or  (M*v) ./ v
end

power_method 重复创建一个新的向量然后对其归一化,这里并没有在函数中指定类型信息,来看看是否对前面提到的类型适用:

julia> M = [2. 1; 1 1];

julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877

julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)

julia> cuM = CuArray(M);

julia> cuv = CuArray(v);

julia> curesult = power_method(cuM, cuv);

julia> typeof(curesult)
CuArray{Float64,1}

julia> dM = distribute(M);

julia> dv = distribute(v);

julia> dC = power_method(dM, dv);

julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}

最后,我们来看看 MPI.jl,这个库时 Julia 对 MPI 协议的封装。一一介绍其中的每个函数太累赘了,这里领会其实现协议的方法就够了。

考虑下面这个简单的脚本,它做的只是调用每个子进程,然后初始化其 rank,然后在 master 访问时,对 rank 求和。

import MPI

MPI.Init()

comm = MPI.COMM_WORLD
MPI.Barrier(comm)

root = 0
r = MPI.Comm_rank(comm)

sr = MPI.Reduce(r, MPI.SUM, root, comm)

if(MPI.Comm_rank(comm) == root)
   @printf("sum of ranks: %s\n", sr)
end

MPI.Finalize()
mpirun -np 4 ./julia example.jl

[^1]: In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access (RMA). The motivation for adding rma to the MPI standard was to facilitate one-sided communication patterns. For additional information on the latest MPI standard, see https://mpi-forum.org/docs.

[^2]: Julia GPU 手册

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文