对 Amazon Elastic MapReduce 上的 R 映射器脚本进行故障排除 - 结果不符合预期

发布于 2024-10-07 18:36:16 字数 2712 浏览 3 评论 0 原文

我正在尝试使用 Amazon Elastic Map Red 运行一系列数百万个案例的模拟。这是一个没有减速器的 Rscript 流作业。我在 EMR 调用 --reducer org.apache.hadoop.mapred.lib.IdentityReducer 中使用 IdentityReducer。

当手动传递一行字符串时,脚本文件在测试时工作正常,并从 Linux 机器上的命令行本地运行 echo "1,2443,2442,1,5" | ./mapper.R 我得到了我所期望的一行结果。然而,当我使用 EMR 输入文件中的大约 10,000 个案例(行)测试我的模拟时,我只得到了 10k 输入行中十几行左右的输出。我已经尝试了好几次,但我无法弄清楚为什么。 Hadoop 作业运行良好,没有任何错误。似乎输入行被跳过,或者身份缩减器可能发生了一些事情。对于有输出的情况,结果是正确的。

我的输入文件是具有以下数据格式的 csv,由逗号分隔的一系列五个整数:

1,2443,2442,1,5
2,2743,4712,99,8
3,2443,861,282,3177
etc...

这是我的 ma​​pper.R 的 R 脚本

#! /usr/bin/env Rscript

# Define Functions
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))
# function to read in the relevant data from needed data files
get.data <- function(casename) {
    list <- lapply(casename, function(x) {
        read.csv(file = paste("./inputdata/",x, ".csv", sep = ""),
                 header = TRUE,
        stringsAsFactors = FALSE)})
    return(data.frame(list))
}

con <- file("stdin")            
line <- readLines(con, n = 1, warn = FALSE) 
line <- trimWhiteSpace(line)
values <- unlist(strsplit(line, ","))
lv <- length(values)
cases <- as.numeric(values[2:lv])
simid <- paste("sim", values[1], ":", sep = "")
l <- length(cases)                      # for indexing

## create a vector for the case names
names.vector <- paste("case", cases, sep = ".")

## read in metadata and necessary data columns using get.data function
metadata <- read.csv(file = "./inputdata/metadata.csv", header = TRUE,
                     stringsAsFactors = FALSE)
d <- cbind(metadata[,1:3], get.data(names.vector))

## Calculations that use df d and produce a string called 'output' 
## in the form of "id: value1 value2 value3 ..." to be used at a 
## later time for agregation.

cat(output, "\n")
close(con)

此模拟的(通用)EMR 调用是:

ruby elastic-mapreduce --create --stream --input s3n://bucket/project/input.txt --output s3n://bucket/project/output --mapper s3n://bucket/project/mapper.R --reducer org.apache.hadoop.mapred.lib.IdentityReducer --cache-archive s3n://bucket/project/inputdata.tar.gz#inputdata --name Simulation --num-instances 2

如果有人如果您对我可能遇到这些问题的原因有任何见解,我愿意接受建议以及对 R 脚本的任何更改/优化。

我的另一个选择是将脚本转换为函数并使用 R 多核包运行并行应用,但我还没有尝试过。我希望在 EMR 上实现此功能。我使用了 JD Long 的 原文

I am trying to use Amazon Elastic Map Reduce to run a series of simulations of several million cases. This is an Rscript streaming job with no reducer. I am using the Identity Reducer in my EMR call --reducer org.apache.hadoop.mapred.lib.IdentityReducer.

The script file works fine when tested and run locally from the command line on a Linux box when passing one line of string manually echo "1,2443,2442,1,5" | ./mapper.R and I get the one line of results that I am expecting. However, when I tested my simulation using about 10,000 cases (lines) from the input file on EMR, I only got output for a dozen lines or so out of 10k input lines. I've tried several times and I cannot figure out why. The Hadoop job runs fine without any errors. It seems like input lines are being skipped, or perhaps something is happening with the Identity reducer. The results are correct for the cases where there is output.

My input file is a csv with the following data format, a series of five integers separated by commas:

1,2443,2442,1,5
2,2743,4712,99,8
3,2443,861,282,3177
etc...

Here is my R script for mapper.R

#! /usr/bin/env Rscript

# Define Functions
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))
# function to read in the relevant data from needed data files
get.data <- function(casename) {
    list <- lapply(casename, function(x) {
        read.csv(file = paste("./inputdata/",x, ".csv", sep = ""),
                 header = TRUE,
        stringsAsFactors = FALSE)})
    return(data.frame(list))
}

con <- file("stdin")            
line <- readLines(con, n = 1, warn = FALSE) 
line <- trimWhiteSpace(line)
values <- unlist(strsplit(line, ","))
lv <- length(values)
cases <- as.numeric(values[2:lv])
simid <- paste("sim", values[1], ":", sep = "")
l <- length(cases)                      # for indexing

## create a vector for the case names
names.vector <- paste("case", cases, sep = ".")

## read in metadata and necessary data columns using get.data function
metadata <- read.csv(file = "./inputdata/metadata.csv", header = TRUE,
                     stringsAsFactors = FALSE)
d <- cbind(metadata[,1:3], get.data(names.vector))

## Calculations that use df d and produce a string called 'output' 
## in the form of "id: value1 value2 value3 ..." to be used at a 
## later time for agregation.

cat(output, "\n")
close(con)

The (generalized) EMR call for this simulation is:

ruby elastic-mapreduce --create --stream --input s3n://bucket/project/input.txt --output s3n://bucket/project/output --mapper s3n://bucket/project/mapper.R --reducer org.apache.hadoop.mapred.lib.IdentityReducer --cache-archive s3n://bucket/project/inputdata.tar.gz#inputdata --name Simulation --num-instances 2

If anyone has any insights as to why I might be experiencing these issues, I am open to suggestions, as well as any changes/optimization to the R script.

My other option is to turn the script into a function and run a parallelized apply using R multicore packages, but I haven't tried it yet. I'd like to get this working on EMR. I used JD Long's and Pete Skomoroch's R/EMR examples as a basis for creating the script.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

烟火散人牵绊 2024-10-14 18:36:16

没有任何明显的东西跳出来。但是,您可以使用只有 10 行的简单输入文件来运行该作业吗?确保这 10 行是没有在您的大测试用例中运行的场景。尝试此操作以消除您的输入导致 R 脚本无法生成答案的可能性。

调试 EMR 作业本身就是一项技能。

编辑:

这是一次彻底的钓鱼探险,但使用 AWS GUI 启动 EMR 交互式猪会话。 “交互式猪”会话保持正常运行,以便您可以通过 ssh 访问它们。您也可以从命令行工具执行此操作,但从 GUI 中执行此操作会更容易一些,因为您只需执行一次即可。然后 ssh 进入集群,传输测试用例 infile 缓存文件和映射器,然后运行以下

命令:cat infile.txt | yourMapper.R> outfile.txt

这只是为了测试您的映射器是否可以在 EMR 环境中解析 infile,而无需使用 Hadoop 位。

编辑2:

我将上面的文本留在那里供后代使用,但真正的问题是你的脚本永远不会返回到标准输入来获取更多数据。因此,每个映射器都会运行一次,然后结束。如果运行上述一行,您将只能得到一个结果,而不是 infile.txt 中每一行的结果。如果您甚至在本地计算机上运行了 cat 测试,则应该会弹出错误!

让我们看看 Pete 的 R 示例中的字数

#! /usr/bin/env Rscript

trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))

## **** could wo with a single readLines or in blocks
con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
    line <- trimWhiteSpace(line)
    words <- splitIntoWords(line)
    ## **** can be done as cat(paste(words, "\t1\n", sep=""), sep="")
    for (w in words)
        cat(w, "\t1\n", sep="")
}
close(con)

您的脚本缺少的部分是这样的:

 while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
        #do your dance
        #do your dance quick
        #come on everybody tell me what's the word
        #word up
    }

你自然应该替换 Cameo 的 Word Up 的歌词!与你的实际逻辑。

请记住,正确的调试音乐可以减轻整个过程的痛苦:

http://www.youtube.com/观看?v=MZjAantupsA

Nothing obvious jumps out. However, can you run the job using a simple input file of only 10 lines? Make sure these 10 lines are scenarios which did not run in your big test case. Try this to eliminate the possibility that your inputs are causing the R script to not produce an answer.

Debugging EMR jobs is a skill of its own.

EDIT:

This is a total fishing expedition, but fire up a EMR interactive pig session using the AWS GUI. "Interactive pig" sessions stay up and running so you can ssh into them. You could also do this from the command line tools, but it's a little easier from the GUI since, hopefully, you only need to do this once. Then ssh into the cluster, transfer over your test case infile your cachefiles and your mapper and then run this:

cat infile.txt | yourMapper.R > outfile.txt

This is just to test if your mapper can parse the infile in the EMR environment with no Hadoop bits in the way.

EDIT 2:

I'm leaving the above text there for posterity but the real issue is your script never goes back to stdin to pick up more data. Thus you get one run for each mapper then it ends. If you run the above one liner you will only get out one result, not a result for each line in infile.txt. If you had run the cat test even on your local machine the error should pop out!

So let's look at Pete's word count in R example:

#! /usr/bin/env Rscript

trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))

## **** could wo with a single readLines or in blocks
con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
    line <- trimWhiteSpace(line)
    words <- splitIntoWords(line)
    ## **** can be done as cat(paste(words, "\t1\n", sep=""), sep="")
    for (w in words)
        cat(w, "\t1\n", sep="")
}
close(con)

The piece your script is missing is this bit:

 while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
        #do your dance
        #do your dance quick
        #come on everybody tell me what's the word
        #word up
    }

you should, naturally, replace the lyrics of Cameo's Word Up! with your actual logic.

Keep in mind that proper debugging music makes the process less painful:

http://www.youtube.com/watch?v=MZjAantupsA

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文