在 R 中并行化向量化函数的最简单方法是什么?

发布于 2024-10-31 00:01:41 字数 757 浏览 4 评论 0原文

我有一个非常大的列表X和一个向量化函数f。我想计算f(X),但是如果我用单核来计算,这将需要很长时间。我有(访问)一台 48 核服务器。并行计算 f(X) 的最简单方法是什么?以下不是正确答案:

library(foreach)
library(doMC)
registerDoMC()

foreach(x=X, .combine=c) %dopar% f(x)

上面的代码确实会并行计算f(X),但它是通过应用f来实现的> 分别到 X 的每个元素。这忽略了 f 的矢量化性质,并且可能会导致速度变慢,而不是更快。我不想将 f 按元素应用于 X,而是希望将 X 拆分为合理大小的块,并将 f 应用于那些。

那么,我是否应该手动将 X 拆分为 48 个大小相等的子列表,然后然后并行地将 f 应用于每个子列表,然后手动将结果放在一起?或者有专门为此设计的包吗?

如果有人想知道,我的具体用例是此处

I have a very large list X and a vectorized function f. I want to calculate f(X), but this will take a long time if I do it with a single core. I have (access to) a 48-core server. What is the easiest way to parallelize the calculation of f(X)? The following is not the right answer:

library(foreach)
library(doMC)
registerDoMC()

foreach(x=X, .combine=c) %dopar% f(x)

The above code will indeed parallelize the calculation of f(X), but it will do so by applying f separately to every element of X. This ignores the vectorized nature of f and will probably make things slower as a result, not faster. Rather than applying f elementwise to X, I want to split X into reasonably-sized chunks and apply f to those.

So, should I just manually split X into 48 equal-sized sublists and then apply f to each in parallel, then manually put together the result? Or is there a package designed for this?

In case anyone is wondering, my specific use case is here.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

暖伴 2024-11-07 00:01:41

虽然这是一个较老的问题,但对于通过谷歌偶然发现这个问题的每个人(像我一样)来说,这可能会很有趣:看看 multicore 包中的 pvec 函数。我认为它正是你想要的。

Although this is an older question this might be interesting for everyone who stumbled upon this via google (like me): Have a look at the pvec function in the multicore package. I think it does exactly what you want.

亢潮 2024-11-07 00:01:41

这是我的实现。这是一个函数chunkmap,它需要一个
向量化函数,应向量化的参数列表,
以及不应向量化的参数列表(即
常量),并返回与调用该函数相同的结果
直接参数,但结果是并行计算的。
对于函数 f,向量参数 v1v2v3 和标量
参数 s1s2,以下内容应返回相同的结果:

f(a=v1, b=v2, c=v3, d=s1, e=s2)
f(c=v3, b=v2, e=s2, a=v1, d=s1)
chunkapply(FUN=f, VECTOR.ARGS=list(a=v1, b=v2, c=v3), SCALAR.ARGS=list(d=s1, e=s2))
chunkapply(FUN=f, SCALAR.ARGS=list(e=s2, d=s1), VECTOR.ARGS=list(a=v1, c=v3, b=v2))

因为 chunkapply 函数不可能知道哪个
f 的参数是向量化的,哪些不是,这取决于你
调用时指定,否则会得到错误的结果。你
通常应该命名你的参数以确保它们受到约束
正确。

library(foreach)
library(iterators)
# Use your favorite doPar backend here
library(doMC)
registerDoMC()

get.chunk.size <- function(vec.length,
                           min.chunk.size=NULL, max.chunk.size=NULL,
                           max.chunks=NULL) {
  if (is.null(max.chunks)) {
    max.chunks <- getDoParWorkers()
  }
  size <- vec.length / max.chunks
  if (!is.null(max.chunk.size)) {
    size <- min(size, max.chunk.size)
  }
  if (!is.null(min.chunk.size)) {
    size <- max(size, min.chunk.size)
  }
  num.chunks <- ceiling(vec.length / size)
  actual.size <- ceiling(vec.length / num.chunks)
  return(actual.size)
}

ichunk.vectors <- function(vectors=NULL,
                           min.chunk.size=NULL,
                           max.chunk.size=NULL,
                           max.chunks=NULL) {
  ## Calculate number of chunks
  recycle.length <- max(sapply(vectors, length))
  actual.chunk.size <- get.chunk.size(recycle.length, min.chunk.size, max.chunk.size, max.chunks)
  num.chunks <- ceiling(recycle.length / actual.chunk.size)

  ## Make the chunk iterator
  i <- 1
  it <- idiv(recycle.length, chunks=num.chunks)
  nextEl <- function() {
    n <- nextElem(it)
    ix <- seq(i, length = n)
    i <<- i + n
    vchunks <- foreach(v=vectors) %do% v[1+ (ix-1) %% length(v)]
    names(vchunks) <- names(vectors)
    vchunks
  }
  obj <- list(nextElem = nextEl)
  class(obj) <- c("ichunk", "abstractiter", "iter")
  obj
}

chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), MERGE=TRUE, ...) {
  ## Check that the arguments make sense
  stopifnot(is.list(VECTOR.ARGS))
  stopifnot(length(VECTOR.ARGS) >= 1)
  stopifnot(is.list(SCALAR.ARGS))
  ## Choose appropriate combine function
  if (MERGE) {
    combine.fun <- append
  } else {
    combine.fun <- foreach:::defcombine
  }
  ## Chunk and apply, and maybe merge
  foreach(vchunk=ichunk.vectors(vectors=VECTOR.ARGS, ...),
          .combine=combine.fun,
          .options.multicore = mcoptions) %dopar%
  {
    do.call(FUN, args=append(vchunk, SCALAR.ARGS))
  }
}

## Only do chunkapply if it will run in parallel
maybe.chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), ...) {
  if (getDoParWorkers() > 1) {
    chunkapply(FUN, VECTOR.ARGS, SCALAR.ARGS, ...)
  } else {
    do.call(FUN, append(VECTOR.ARGS, SCALAR.ARGS))
  }
}

以下一些示例显示 chunkapply(f,list(x)) 生成与 f(x) 相同的结果。我已将 max.chunk.size 设置得非常小,以确保实际使用分块算法。

> # Generate all even integers from 2 to 100 inclusive
> identical(chunkapply(function(x,y) x*y, list(1:50), list(2), max.chunk.size=10), 1:50 * 2)
[1] TRUE

> ## Sample from a standard normal distribution, then discard values greater than 1
> a <- rnorm(n=100)
> cutoff <- 1
> identical(chunkapply(function(x,limit) x[x<=limit], list(x=a), list(limit=cutoff), max.chunk.size=10), a[a<cutoff])
[1] TRUE

如果有人有比“chunkapply”更好的名字,请推荐。

编辑:

正如另一个答案指出的那样,多核包中有一个名为 pvec 的函数,它的功能与我编写的功能非常相似。对于简单的情况,您应该这样做,并且您应该投票赞成 Jonas Rauch 的答案。然而,我的函数有点更通用,所以如果以下任何一个适用于您,您可能需要考虑使用我的函数:

  • 您需要使用多核以外的并行后端(例如 MPI)。我的函数使用 foreach,因此您可以使用任何为 foreach 提供后端的并行化框架。
  • 您需要传递多个向量化参数。 pvec 仅对单个参数进行矢量化,因此您无法轻松地使用 pvec 实现并行矢量化加法。我的函数允许您指定任意参数。

Here's my implementation. It's a function chunkmap that takes a
vectorized function, a list of arguments that should be vectorized,
and a list of arguments that should not be vectorized (i.e.
constants), and returns the same result as calling the function on the
arguments directly, except that the result is calculated in parallel.
For a function f, vector arguments v1, v2, v3, and scalar
arguments s1, s2, the following should return identical results:

f(a=v1, b=v2, c=v3, d=s1, e=s2)
f(c=v3, b=v2, e=s2, a=v1, d=s1)
chunkapply(FUN=f, VECTOR.ARGS=list(a=v1, b=v2, c=v3), SCALAR.ARGS=list(d=s1, e=s2))
chunkapply(FUN=f, SCALAR.ARGS=list(e=s2, d=s1), VECTOR.ARGS=list(a=v1, c=v3, b=v2))

Since it is impossible for the chunkapply function to know which
arguments of f are vectorized and which are not, it is up to you to
specify when you call it, or else you will get the wrong results. You
should generally name your arguments to ensure that they get bound
correctly.

library(foreach)
library(iterators)
# Use your favorite doPar backend here
library(doMC)
registerDoMC()

get.chunk.size <- function(vec.length,
                           min.chunk.size=NULL, max.chunk.size=NULL,
                           max.chunks=NULL) {
  if (is.null(max.chunks)) {
    max.chunks <- getDoParWorkers()
  }
  size <- vec.length / max.chunks
  if (!is.null(max.chunk.size)) {
    size <- min(size, max.chunk.size)
  }
  if (!is.null(min.chunk.size)) {
    size <- max(size, min.chunk.size)
  }
  num.chunks <- ceiling(vec.length / size)
  actual.size <- ceiling(vec.length / num.chunks)
  return(actual.size)
}

ichunk.vectors <- function(vectors=NULL,
                           min.chunk.size=NULL,
                           max.chunk.size=NULL,
                           max.chunks=NULL) {
  ## Calculate number of chunks
  recycle.length <- max(sapply(vectors, length))
  actual.chunk.size <- get.chunk.size(recycle.length, min.chunk.size, max.chunk.size, max.chunks)
  num.chunks <- ceiling(recycle.length / actual.chunk.size)

  ## Make the chunk iterator
  i <- 1
  it <- idiv(recycle.length, chunks=num.chunks)
  nextEl <- function() {
    n <- nextElem(it)
    ix <- seq(i, length = n)
    i <<- i + n
    vchunks <- foreach(v=vectors) %do% v[1+ (ix-1) %% length(v)]
    names(vchunks) <- names(vectors)
    vchunks
  }
  obj <- list(nextElem = nextEl)
  class(obj) <- c("ichunk", "abstractiter", "iter")
  obj
}

chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), MERGE=TRUE, ...) {
  ## Check that the arguments make sense
  stopifnot(is.list(VECTOR.ARGS))
  stopifnot(length(VECTOR.ARGS) >= 1)
  stopifnot(is.list(SCALAR.ARGS))
  ## Choose appropriate combine function
  if (MERGE) {
    combine.fun <- append
  } else {
    combine.fun <- foreach:::defcombine
  }
  ## Chunk and apply, and maybe merge
  foreach(vchunk=ichunk.vectors(vectors=VECTOR.ARGS, ...),
          .combine=combine.fun,
          .options.multicore = mcoptions) %dopar%
  {
    do.call(FUN, args=append(vchunk, SCALAR.ARGS))
  }
}

## Only do chunkapply if it will run in parallel
maybe.chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), ...) {
  if (getDoParWorkers() > 1) {
    chunkapply(FUN, VECTOR.ARGS, SCALAR.ARGS, ...)
  } else {
    do.call(FUN, append(VECTOR.ARGS, SCALAR.ARGS))
  }
}

Here are some examples showing that chunkapply(f,list(x)) produces identical results to f(x). I have set the max.chunk.size extremely small to ensure that the chunking algorithm is actually used.

> # Generate all even integers from 2 to 100 inclusive
> identical(chunkapply(function(x,y) x*y, list(1:50), list(2), max.chunk.size=10), 1:50 * 2)
[1] TRUE

> ## Sample from a standard normal distribution, then discard values greater than 1
> a <- rnorm(n=100)
> cutoff <- 1
> identical(chunkapply(function(x,limit) x[x<=limit], list(x=a), list(limit=cutoff), max.chunk.size=10), a[a<cutoff])
[1] TRUE

If anyone has a better name than "chunkapply", please suggest it.

Edit:

As another answer points out, there is a function called pvec in the multicore pacakge that has very similar functionality to what I have written. For simple cases, you should us that, and you should vote up Jonas Rauch's answer for it. However, my function is a bit more general, so if any of the following apply to you, you might want to consider using my function instead:

  • You need to use a parallel backend other than multicore (e.g. MPI). My function uses foreach, so you can use any parallelization framework that provides a backend for foreach.
  • You need to pass multiple vectorized arguments. pvec only vectorizes over a single argument, so you couldn't easily implement parallel vectorized addition with pvec, for example. My function allows you to specify arbitrary arguments.
比忠 2024-11-07 00:01:41

itertools 包就是为了解决此类问题而设计的。在这种情况下,我将使用 isplitVector:

n <- getDoParWorkers()
foreach(x=isplitVector(X, chunks=n), .combine='c') %dopar% f(x)

对于此示例,pvec 无疑更快、更简单,但是这可以在 Windows 上与 doParallel 包一起使用。

The itertools package was designed to address this kind of problem. In this case, I would use isplitVector:

n <- getDoParWorkers()
foreach(x=isplitVector(X, chunks=n), .combine='c') %dopar% f(x)

For this example, pvec is undoubtably faster and simpler, but this can be used on Windows with the doParallel package, for example.

旧梦荧光笔 2024-11-07 00:01:41

Map-Reduce 可能就是您正在寻找的;它已移植到R

Map-Reduce might be what you're looking for; it's been ported to R

拥抱我好吗 2024-11-07 00:01:41

像这样的事情怎么样? R 将利用所有可用内存,而多核将在所有可用核心上并行化。

library(multicore)
result = mclapply(X, function,mc.preschedule=FALSE, mc.set.seed=FALSE)

How about something like this? R will take advantage of all the available memory and multicore will parallelize over all available cores.

library(multicore)
result = mclapply(X, function,mc.preschedule=FALSE, mc.set.seed=FALSE)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文