R并行计算及线程资源配置

R支持多线程(并行计算)的包有snow、multicore和parallel等。parallel是 R2.14.0开始引入的,parallel基于multicore(2009-)和snow(2003-),实现了这两个包的大部分函数。parallel包的开发背景是2000年后个人电脑逐渐普及了多核,原有的并行计算逻辑发生了改变,实际上,parallel包是可以跨以太网、跨不同的操作系统同时运行的。
这里的并行计算指的是同时计算许多”块”数据,这些”块”之间不相互依赖(一个块的计算无需另一个块的计算结果,相互依赖的计算情况常见于HPC)
并行计算的基本流程是:
1、启动并行计算程序,初始化(如设置核的使用数量)
2、给每个并行计算进程分配任务,一般是将任务分为相等数量的块(有时会用任务池,通过设置preschedule参数,一有线程计算完,就从任务池分配给它一个新任务,在R中,这个过程又叫做负载均衡,在python中称这个方法为线程池)
3、等待所有计算进程结束,合并结果
4、关闭
需要注意的是,进程间调用任务时使用了序列化和反序列化的对象处理方法。
parallel包的使用样例:
library(parallel)
# Calculate the number of cores
no_cores <- detectCores() – 1 #detectCores(logical = FALSE)返回物理核的数量,detectCores()默认返回逻辑CPU的核数
# Initiate cluster
cl <- makeCluster(no_cores)
system.time({
res <- parLapply(cl, 1:5000000, function(exponent) 2^exponent) #multicore和snow使用parLapply和mclapply实现了lapply、sapply和apply等,需要注意的是mclapply在Windows系统下不可使用
})
#关闭并行计算
stopCluster(cl)

在多线程工具的使用过程中,有一个问题需要特别注意,就是各个线程所需的资源问题,比如连接数据库、或者使用一个自定义的函数,线程的资源配置,可以使用clusterExport()和clusterEvalQ()函数。
例1:
library(parallel)
ex.df <- data.frame(a=seq(1,100,1),b=seq(10,1000,10),c=runif(100)) \# Define the threshold ths <- 20 \# These 2 statements in Base R are equivalent aa <- apply(ex.df,1, function(x) (x[1]+x[2])*x[3] > 20)
aa <- apply(ex.df,1, function(x) (x[1]+x[2])*x[3] > ths)
### Equivalent parallel execution ###
# Declare the cluster object. Here we use the default settings (SOCK)
# and the number of nodes is specified by the number given
clus <- makeCluster(3) \# The equivalent for the first alternative would be very easy aa <- parRapply(clus,ex.df, function(x) (x[1]+x[2])*x[3] > 20)
aa <- parRapply(clus,ex.df, function(x) (x[1]+x[2])*x[3] > ths)
Error in checkForRemoteErrors(val) :
3 nodes produced errors; first error: 找不到对象’ths’
#However, if the variable “ths” needs to be used, a line has to be added
clusterExport(clus,”ths”)
aa <- parRapply(clus,ex.df, function(x) (x[1]+x[2])*x[3] > ths)

例2:
并行计算处理数据库:
cl <- makeCluster(getOption(“cl.cores”, 4));
clusterEvalQ(cl, {
library(RODBC)
dbConn <- odbcDriverConnect(connection=”connection string”)
NULL
})

clusetEvalQ()与clusterExport()用法略有不同,clusetEvalQ是在函数内部进行函数定义等,clusterExport是先在环境中定义函数

#Create cluster
clus <- makeCluster(3)
#Option 1. Declare the function for each node
clusterEvalQ(clus, custom.function <- function(a,b,c){
result <- (a+b)c
return(result)})
#Option 2. Export it form base workspace
custom.function <- function(a,b,c){
result <- (a+b)
c
return(result)}
clusterExport(clus,”custom.function”)

ex.df <- data.frame(a=seq(1,100,1),b=seq(10,1000,10),c=runif(100))
#Apply the declared function
aa <- parRapply(clus,ex.df, function(x) custom.function(x[1],x[2],x[3]))
此外,clusterCall也可以实现线程资源配置,clusterCall(cl, function() { source(“test.R”) })

参考资料:
Arrayhttp://www.sfu.ca/~sblay/R/snow.html
Arrayhttps://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf
Arrayhttps://www.r-bloggers.com/how-to-go-parallel-in-r-basics-tips/
Arrayhttp://m.doc00.com/doc/1011025mh

发表评论