Hadoop 和 MapReduce,如何将从 csv 中提取的行数组发送到映射函数,其中每个数组包含行 x - y;

发布于 2024-12-01 12:29:24 字数 833 浏览 1 评论 0原文

好吧,我读了很多关于 Hadoop 和 MapReduce 的文章,也许是因为我不像大多数人那样熟悉迭代器,但我有一个问题,我似乎也找不到直接答案。基本上,据我了解,映射函数是由许多机器和/或内核并行执行的。因此,无论您正在做什么,都不能依赖于程序之前执行的代码来获得任何类型的速度增益。这对我来说非常适合,但我所做的需要我小批量地测试信息。基本上,我需要将 .csv 中的批量行作为 32、64、128 或任何行的数组发送。就像第 0 – 127 行转到 core1 执行映射函数一样,第 128 – 255 行转到 core2 的执行,等等。另外,我需要在函数内将每个批次的内容作为一个整体提供,就像我向它传递了一个数组一样。我读了一些关于新的 java API 如何允许所谓的“推”和“拉”的内容,并且这允许批量发送内容,但我找不到任何示例代码。我不知道,我将继续研究,我会发布我发现的任何内容,但如果有人知道,请他们在这个帖子中发布。我真的很感激我可能得到的任何帮助。

编辑

如果您可以简单地确保 .csv 的块按顺序发送,您可以通过这种方式执行它。我想这也假设mapreduce 中有全局变量。

//** concept not code **//

GLOBAL_COUNTER = 0;
GLOBAL_ARRAY = NEW ARRAY();

map()
{ 

GLOBAL_ARRAY[GLOBAL_COUNTER] = ITERATOR_VALUE;

GLOBAL_COUNTER++;
if(GLOBAL_COUNTER == 127)
{
//EXECUTE TEST WITH AN ARRAY OF 128 VALUES FOR COMPARISON
GLOBAL_COUNTER = 0;
}

}

Okay, so I have been reading a lot about Hadoop and MapReduce, and maybe it’s because I’m not as familiar with iterators as most, but I have a question I can’t seem to find a direct answer too. Basically, as I understand it, the map function is executed in parallel by many machine and/or cores. Thus, whatever you are working on must not depend on prior code being executed for the program to make any kind of speed gains. This works perfectly for me, but what I’m doing requires me to test information in small batches. Basically I need to send batches of lines in a .csv as arrays of 32, 64, 128 or whatever lines each. Like lines 0 – 127 go to core1’s execution of the map function, lines 128 – 255 lines go to core2’s, etc., .etc . Also I need to have the contents of each batch available as a whole inside the function, as if I had passed it an array. I read a little about how the new java API allows for something called push and pull, and that this allows things to be sent in batches, but I couldn’t find any example code. I dunno, I’m going to continue researching, and I’ll post anything I find, but if anyone knows, could they please post in this thread. I would really appreciate any help I might receive.

edit

If you could simply ensure that the chunks of the .csv are sent in sequence you could preform it this way. I guess this also assumes that there are globals in mapreduce.

//** concept not code **//

GLOBAL_COUNTER = 0;
GLOBAL_ARRAY = NEW ARRAY();

map()
{ 

GLOBAL_ARRAY[GLOBAL_COUNTER] = ITERATOR_VALUE;

GLOBAL_COUNTER++;
if(GLOBAL_COUNTER == 127)
{
//EXECUTE TEST WITH AN ARRAY OF 128 VALUES FOR COMPARISON
GLOBAL_COUNTER = 0;
}

}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

不即不离 2024-12-08 12:29:24

如果您尝试将 CSV 文件中的大量行放入映射器中,您可能会考虑编写自己的 InputFormat/RecordReader 以及可能自己的 WritableComparable 对象。使用自定义的InputFormat/RecordReader,您将能够根据收到的输入指定如何创建对象并将其传递给映射器。

如果映射器正在执行您想要的操作,但您需要将这些行块发送到化简器,请使映射器的输出键对于同一化简函数中您想要的每一行都相同。

默认的 TextInputFormat 将像这样向映射器提供输入(本例中的键/偏移量只是随机数):

0    Hello World
123  My name is Sam
456  Foo bar bar foo

每一行都将作为键值对读入映射器。只需将您需要的每一行的键修改为相同的并将其写入输出:

0    Hello World
0    My name is Sam
1    Foo bar bar foo

第一次读取reduce函数时,它将收到一个键,值对,其中键为“0”,值为Iterable包含“Hello World”和“我的名字是 Sam”的对象。您将能够使用 Iterable 对象在同一个reduce 方法调用中访问这两个值。

这是一些伪代码:

int count = 0
map (key, value) {
      int newKey = count/2
      context.write(newKey,value)
      count++
}

reduce (key, values) {
    for value in values
        // Do something to each line
}

希望有帮助。 :)

If you're trying to get a chunk of lines from your CSV file into the mapper, you might consider writing your own InputFormat/RecordReader and potentially your own WritableComparable object. With the custom InputFormat/RecordReader you'll be able to specify how objects are created and passed to the mapper based on the input you receive.

If the mapper is doing what you want, but you need these chunks of lines sent to the reducer, make the output key for the mapper the same for each line you want in the same reduce function.

The default TextInputFormat will give input to your mapper like this (the keys/offsets in this example are just random numbers):

0    Hello World
123  My name is Sam
456  Foo bar bar foo

Each of those lines will be read into your mapper as a key,value pair. Just modify the key to be the same for each line you need and write it to the output:

0    Hello World
0    My name is Sam
1    Foo bar bar foo

The first time the reduce function is read, it will receive a key,value pair with the key being "0" and the value being an Iterable object containing "Hello World" and "My name is Sam". You'll be able to access both of these values in the same reduce method call by using the Iterable object.

Here is some pseudo code:

int count = 0
map (key, value) {
      int newKey = count/2
      context.write(newKey,value)
      count++
}

reduce (key, values) {
    for value in values
        // Do something to each line
}

Hope that helps. :)

污味仙女 2024-12-08 12:29:24

如果您想要的最终目标是强制某些集合转到某些机器进行处理,您想考虑编写自己的 分区器。否则,Hadoop将根据reducer的数量自动为您分割数据。

我建议阅读Hadoop 站点上的教程以更好地理解先生。

If the end goal of what you want is to force certain sets to go to certain machines for processing you want to look into writing your own Partitioner. Otherwise, Hadoop will split data automatically for you depending on the number of reducers.

I suggest reading the tutorial on the Hadoop site to get a better understanding of M/R.

枕花眠 2024-12-08 12:29:24

如果您只想将 N 行输入发送到单个映射器,您可以使用 NLineInputFormat 类。然后,您可以在映射器中进行行解析(以逗号分割等)。

如果您想访问映射器当前正在处理的行之前和之后的行,您可能必须编写自己的输入格式。子类化 FileInputFormat 通常是一个很好的起点。您可以创建一个 InputFormat 来读取 N 行,将它们连接起来,并将它们作为一个块发送到映射器,然后映射器再次将输入拆分为 N 行并开始处理。

就 Hadoop 中的全局变量而言,您可以在创建作业配置时指定一些自定义参数,但据我所知,您无法在工作线程中更改它们并期望更改会传播到整个集群。要设置工作人员可见的作业参数,请在创建作业时执行以下操作:

job.getConfiguration().set(Constants.SOME_PARAM, "my value");

然后要读取映射器或减速器中的参数值,

public void map(Text key, Text value, Context context) {

        Configuration conf = context.getConfiguration();
        String someParam = conf.get(Constants.SOME_PARAM);

        // use someParam in processing input
}

Hadoop 支持基本类型,例如 int、long、string、bool、等用于参数。

If you simply want to send N lines of input to a single mapper, you can user the NLineInputFormat class. You could then do the line parsing (splitting on commas, etc) in the mapper.

If you want to have access to the lines before and after the line the mapper is currently processing, you may have to write your own input format. Subclassing FileInputFormat is usually a good place to start. You could create an InputFormat that reads N lines, concatenates them, and sends them as one block to a mapper, which then splits the input into N lines again and begins processing.

As far as globals in Hadoop go, you can specify some custom parameters when you create the job configuration, but as far as I know, you cannot change them in a worker and expect the change to propagate throughout the cluster. To set a job parameter that will be visible to workers, do the following where you are creating the job:

job.getConfiguration().set(Constants.SOME_PARAM, "my value");

Then to read the parameters value in the mapper or reducer,

public void map(Text key, Text value, Context context) {

        Configuration conf = context.getConfiguration();
        String someParam = conf.get(Constants.SOME_PARAM);

        // use someParam in processing input
}

Hadoop has support for basic types such as int, long, string, bool, etc to be used in parameters.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文