将线程结果与 openmp 结合起来

发布于 2024-11-07 09:01:57 字数 2717 浏览 1 评论 0原文

我在组合从多个线程收到的处理结果时遇到一些问题。我不确定我是否正确使用 openmp 。下面的代码摘录显示了我的代码的 openmp 部分。

参数:

线程私有:

it:地图迭代器(时间戳,用户密钥)

ite:地图迭代器((时间戳,用户密钥)/ int amount)

thread_result_map: typedef 映射 <用户密钥(str),时间戳(str)>

何时、谁:匹配

在线程之间共享的正则表达式(时间戳、用户密钥):

日志:字符数组
大小: log.size()
标识符、时间戳、用户密钥: boost::regex 模式
combined_result_map: typedef 地图 thread_result_map,hits(int)>

#pragma omp parallel shared(log, size, identifier, timestamp, userkey) private(it, ite, str_time, str_key, vec_str_result, i, id, str_current, when, who, thread_result_map)
    {
#pragma omp for
    for (i = 0 ; i < size ; i++){
            str_current.push_back(log[i]);
        if (log[i] == '\n') {
            if (boost::regex_search(str_current, identifier)){
                boost::regex_search(str_current, when, timestamp);
                str_time = when[0];
                boost::regex_search(str_current, who, userkey);
                str_key = who[0];
                thread_result_map.insert(make_pair(str_time, str_key));
                }
                str_current = ""; //reset temp string
            }
        }
#pragma omp critical
        {
        for (it=thread_result_map.begin(); it!=thread_result_map.end(); it++) {
                id = omp_get_thread_num();
            cout << thread_result_map[it->first] <<
                           thread_result_map[it->second];
            cout << "tID_" << id << " reducing" << endl;
            }
        }
    }

正如您所看到的,每个线程都有自己的 char 数组分区,它从数组中逐行解析,如果当前字符串由“标识符”标识,则时间戳和用户密钥将添加到线程的私有结果映射中(字符串/细绳)。

现在,在循环之后,我有几个线程的私有结果映射。组合结果图是地图内的地图。键是线程结果的键/值的组合,值是该组合出现的次数。

我只解析时间戳的一部分,因此当 1 小时内同一用户密钥出现多次时,命中计数器将会增加。

结果应该如下所示:

TIME(MMM/DD/HH/);USERKEY;HITS
May/25/13;SOMEKEY124345;3

因此,通过指定组合+=结果,我可以在关键部分(我删除了它)中组合点击量。

但是我怎样才能以同样的方式组合我的结果图呢?我知道我必须迭代线程映射,但是当我将“cout”放入循环中进行测试时,每个线程仅调用它一次。

当我将所有正则表达式设置为“错误”时,我的本地系统日志上的测试运行会给出以下输出(以确保每个识别的行都有一个用户密钥和一个同名的时间戳):

解析访问字符串的模式:

error   Pattern for parsing Timestamp:
error   Pattern for parsing Userkey:
error

 *** Parsing File /var/log/syslog

errortID_0 reducing errortID_1
reducing errortID_2 reducing
errortID_3 reducing

 *** Ok!   ________________   hits :
418   worktime: 0.0253871s

计算出的命中来自线程私有计数器,我在上面的代码中删除了它)

因此,我的 4 个线程中的每一个都执行一次 cout 并离开循环,尽管总共应该有 418 个命中。那么我做错了什么?如何从 openmp 区域内迭代结果?

I have some problems combining the processing results I recieve from several Threads. And I'm not sure, if I use openmp correctly. The below code extract shows the openmp portion of my code.

Parameters:

thread private:

it: map iterator (timestamp, userkey)

ite: map iterator ((timestamp,userkey)/int amount)

thread_result_map: typedef map < userkey(str),timestamp(str) >

when, who: matching regex (timestamp, userkey)

shared among threads:

log: char array
size: log.size()
identifier, timestamp, userkey: boost::regex patterns
combined_result_map: typedef map < thread_result_map, hits(int) >

#pragma omp parallel shared(log, size, identifier, timestamp, userkey) private(it, ite, str_time, str_key, vec_str_result, i, id, str_current, when, who, thread_result_map)
    {
#pragma omp for
    for (i = 0 ; i < size ; i++){
            str_current.push_back(log[i]);
        if (log[i] == '\n') {
            if (boost::regex_search(str_current, identifier)){
                boost::regex_search(str_current, when, timestamp);
                str_time = when[0];
                boost::regex_search(str_current, who, userkey);
                str_key = who[0];
                thread_result_map.insert(make_pair(str_time, str_key));
                }
                str_current = ""; //reset temp string
            }
        }
#pragma omp critical
        {
        for (it=thread_result_map.begin(); it!=thread_result_map.end(); it++) {
                id = omp_get_thread_num();
            cout << thread_result_map[it->first] <<
                           thread_result_map[it->second];
            cout << "tID_" << id << " reducing" << endl;
            }
        }
    }

As you can see every thread has his own partition of the char array, it parses line by line from the array and if the current string is identified by "identifier", the timestamp and userkey are added to the thread's private result map (string/string).

Now after the loop I have several thread's private result maps. The combined_result_map is a map inside a map. The key is the combination of key/value of the threads result and the value is the amount of occurences of this combination.

I'm parsing only a portion of the timestamp so when in 1 hour the same userkey appears multiple times the hit counter will be increased.

The result should look something like this:

TIME(MMM/DD/HH/);USERKEY;HITS
May/25/13;SOMEKEY124345;3

So I have no problems combining hit amounts in the critical section (which I removed) by specifying combined+=results.

But how can I combine my result maps the same way? I know I have to iterate through threads maps, but when I put a "cout" inside the loop for testing every thread calls it only once.

A test run on my local syslog gives me the following output when I set all the regex to "error" (to make sure every identified line will have a userkey and a timestamp with the same name):

Pattern for parsing Access String:

error   Pattern for parsing Timestamp:
error   Pattern for parsing Userkey:
error

 *** Parsing File /var/log/syslog

errortID_0 reducing errortID_1
reducing errortID_2 reducing
errortID_3 reducing

 *** Ok!   ________________   hits :
418   worktime: 0.0253871s

(The calculated hits come from thread private counters, that I removed in the code above)

So every of my 4 threads does a single cout and leaves the loop, although all together should have 418 hits. So what do I do wrong? How do I iterate through my results from inside my openmp area?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

流云如水 2024-11-14 09:01:57

我自己发现了问题,很抱歉问了愚蠢的问题。

我试图多次添加相同的键,这就是为什么地图大小没有增加并且每个线程仅循环一次的原因。

编辑:

如果有人对如何组合线程结果的解决方案感兴趣,我就是这样做的。也许您会看到任何可以改进的地方。

我刚刚将本地线程结果映射更改为pairs(str,str) 向量。

这是完整的 openmp 工作代码部分。也许它对任何人都有用:

#pragma omp parallel shared(log, size, identifier, timestamp, userkey) private(it, ite, str_time, str_key, i, id, str_current, when, who, local_res)
    {
#pragma omp for
        for (i = 0 ; i < size ; i++){

            str_current.push_back(log[i]);

            if (log[i] == '\n') {   // if char is newline character
                if (boost::regex_search(str_current, identifier)){  // if current line is access string
                    boost::regex_search(str_current, when, timestamp);  // get timestamp from string
                    str_time = when[0];
                    boost::regex_search(str_current, who, userkey); // get userkey from string
                    str_key = who[0];
                    local_res.push_back((make_pair(str_time, str_key)));    // append key-value-pair(timestamp/userkey)
                    id = omp_get_thread_num();
                    //cout << "tID_" << id << " - adding pair - my local result map size is now: " << local_res.size() << endl;
                }
                str_current = "";
            }
        }

#pragma omp critical
        {
            id = omp_get_thread_num();
            hits += local_res.size();
            cout << "tID_" << id << " had HITS: " << local_res.size() << endl;
            for (i = 0; i < local_res.size(); i++) {
                acc_key = local_res[i].second;
                acc_time = local_res[i].first;
                if(m_KeyDatesHits.count(acc_key) == 0) { // if there are no items for this key yet, make a new entry
                    m_KeyDatesHits.insert(make_pair(acc_key, str_int_MapType()));
                }
                if (m_KeyDatesHits[acc_key].count(acc_time) == 0) { // "acc_time" is a key value, if it doesn't exist yet, add it and set "1" as value
                    m_KeyDatesHits[acc_key].insert(make_pair(acc_time, 1 ));
                    it = m_KeyDatesHits.begin(); // iterator for userkeys/maps
                    ite = m_KeyDatesHits[acc_key].begin(); // iterator for  times/clicks
                } else m_KeyDatesHits[acc_key][acc_time]++; // if userkey already exist and timestamp already exists, count hits +1 for it

            }
        }
    }

我做了一些测试,它确实运行得很快。

使用 4 线程,它会在 150MB 日志文件中搜索访问事件,解析每个事件的自定义用户密钥和日期,并在 4 秒内合并结果。

最后它会创建一个导出列表。这是程序输出:

您好,欢迎使用 LogMap 0.1!

C++/OpenMP 内存映射解析引擎
__________________ 可用处理器数量 = 4
线程数 = 4

解析访问字符串的模式:
GET /_openbooknow/key/ 模式
解析时间戳:\d{2}/\w{3}/\d{4}
解析 Userkey 的模式:
[a-zA-Z0-9]{20,32}

* 解析文件
/home/c0d31n/Desktop/access_log-test.txt

点击数:169147 点击数:169146 点击数:169146
点击数:169147

* 好的! ________ 点击次数:
676586工作时间:4.03816s

*已创建新导出文件:“./test.csv”

root@c0d3b0x:~/workspace/OpenBookMap/Release#
猫测试.csv
“1nDh0gV6eE3MzK0517aE6VIU0”;“2011 年 3 月 28 日”;“18813”
“215VIU1wBN2O2Fmd63MVmv6QTZy”;“2011 年 3 月 28 日”;“6272”
"36Pu0A2Wly3uYeIPZ4YPAuBy";"2011 年 3 月 18 日";"18816"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"2011 年 3 月 21 日";"12544"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"2011 年 3 月 22 日";"12544"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"2011 年 3 月 23 日";"18816"
"9E1608JFGk2GZQ4ppe1Grtv2";"2011 年 3 月 28 日";"12544"
“pachCsiog05bpK0kDA3K2lhEY”;“2011 年 3 月 17 日”;“18029”
“pachCsiog05bpK0kDA3K2lhEY”;“2011 年 3 月 18 日”;“12544”
“pachCsiog05bpK0kDA3K2lhEY”;“2011 年 3 月 21 日”;“18816”
“pachCsiog05bpK0kDA3K2lhEY”;“2011 年 3 月 22 日”;“6272”
“pachCsiog05bpK0kDA3K2lhEY”;“2011 年 3 月 23 日”;“18816”
“pachCsiog05bpK0kDA3K2lhEY”;“2011 年 3 月 28 日”;“501760”
“1nDh0gV6eE3MzK0517aE6VIU0”;“2011 年 3 月 28 日”;“18813”
“215VIU1wBN2O2Fmd63MVmv6QTZy”;“2011 年 3 月 28 日”;“6272”
"36Pu0A2Wly3uYeIPZ4YPAuBy";"2011 年 3 月 18 日";"18816"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"2011 年 3 月 21 日";"12544"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"2011 年 3 月 22 日";"12544"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"2011 年 3 月 23 日";"18816"
"9E1608JFGk2GZQ4ppe1Grtv2";"2011 年 3 月 28 日";"12544"
“pachCsiog05bpK0kDA3K2lhEY”;“2011 年 3 月 17 日”;“18029”
“pachCsiog05bpK0kDA3K2lhEY”;“2011 年 3 月 18 日”;“12544”
“pachCsiog05bpK0kDA3K2lhEY”;“2011 年 3 月 21 日”;“18816”
“pachCsiog05bpK0kDA3K2lhEY”;“2011 年 3 月 22 日”;“6272”
“pachCsiog05bpK0kDA3K2lhEY”;“2011 年 3 月 23 日”;“18816”
“pachCsiog05bpK0kDA3K2lhEY”;“2011 年 3 月 28 日”;“501760”

Found the problem myself, sorry for asking stupid questions.

I was trying to add the same key multiple times, that's why map size didn't increase and every thread looped only once.

Edit:

If anybody is interested in the solution how to combine thread results, this is how I did it. perhaps you see anything that could be improved.

I just changed the local threads result map to a vector of pairs(str,str).

This is the full working openmp code section. Pehaps it's useful for anyone:

#pragma omp parallel shared(log, size, identifier, timestamp, userkey) private(it, ite, str_time, str_key, i, id, str_current, when, who, local_res)
    {
#pragma omp for
        for (i = 0 ; i < size ; i++){

            str_current.push_back(log[i]);

            if (log[i] == '\n') {   // if char is newline character
                if (boost::regex_search(str_current, identifier)){  // if current line is access string
                    boost::regex_search(str_current, when, timestamp);  // get timestamp from string
                    str_time = when[0];
                    boost::regex_search(str_current, who, userkey); // get userkey from string
                    str_key = who[0];
                    local_res.push_back((make_pair(str_time, str_key)));    // append key-value-pair(timestamp/userkey)
                    id = omp_get_thread_num();
                    //cout << "tID_" << id << " - adding pair - my local result map size is now: " << local_res.size() << endl;
                }
                str_current = "";
            }
        }

#pragma omp critical
        {
            id = omp_get_thread_num();
            hits += local_res.size();
            cout << "tID_" << id << " had HITS: " << local_res.size() << endl;
            for (i = 0; i < local_res.size(); i++) {
                acc_key = local_res[i].second;
                acc_time = local_res[i].first;
                if(m_KeyDatesHits.count(acc_key) == 0) { // if there are no items for this key yet, make a new entry
                    m_KeyDatesHits.insert(make_pair(acc_key, str_int_MapType()));
                }
                if (m_KeyDatesHits[acc_key].count(acc_time) == 0) { // "acc_time" is a key value, if it doesn't exist yet, add it and set "1" as value
                    m_KeyDatesHits[acc_key].insert(make_pair(acc_time, 1 ));
                    it = m_KeyDatesHits.begin(); // iterator for userkeys/maps
                    ite = m_KeyDatesHits[acc_key].begin(); // iterator for  times/clicks
                } else m_KeyDatesHits[acc_key][acc_time]++; // if userkey already exist and timestamp already exists, count hits +1 for it

            }
        }
    }

I did some tests and it's really running fast.

Using 4 Threads this searches a 150MB LogFile for access events, parses a custom user key and date from every event and combines the results in under 4 seconds.

At the End it creates a export list. This is the program output:

HELLO, welcome to LogMap 0.1!

C++/OpenMP Memory Map Parsing Engine
__________________ Number of processors available = 4
Number of threads = 4

Pattern for parsing Access String:
GET /_openbooknow/key/ Pattern for
parsing Timestamp: \d{2}/\w{3}/\d{4}
Pattern for parsing Userkey:
[a-zA-Z0-9]{20,32}

* Parsing File
/home/c0d31n/Desktop/access_log-test.txt

HITS: 169147 HITS: 169146 HITS: 169146
HITS: 169147

* Ok! ________ hits :
676586 worktime: 4.03816s

* new export file created: "./test.csv"

root@c0d3b0x:~/workspace/OpenBookMap/Release#
cat test.csv
"1nDh0gV6eE3MzK0517aE6VIU0";"28/Mar/2011";"18813"
"215VIU1wBN2O2Fmd63MVmv6QTZy";"28/Mar/2011";"6272"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"18/Mar/2011";"18816"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"21/Mar/2011";"12544"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"22/Mar/2011";"12544"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"23/Mar/2011";"18816"
"9E1608JFGk2GZQ4ppe1Grtv2";"28/Mar/2011";"12544"
"pachCsiog05bpK0kDA3K2lhEY";"17/Mar/2011";"18029"
"pachCsiog05bpK0kDA3K2lhEY";"18/Mar/2011";"12544"
"pachCsiog05bpK0kDA3K2lhEY";"21/Mar/2011";"18816"
"pachCsiog05bpK0kDA3K2lhEY";"22/Mar/2011";"6272"
"pachCsiog05bpK0kDA3K2lhEY";"23/Mar/2011";"18816"
"pachCsiog05bpK0kDA3K2lhEY";"28/Mar/2011";"501760"
"1nDh0gV6eE3MzK0517aE6VIU0";"28/Mar/2011";"18813"
"215VIU1wBN2O2Fmd63MVmv6QTZy";"28/Mar/2011";"6272"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"18/Mar/2011";"18816"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"21/Mar/2011";"12544"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"22/Mar/2011";"12544"
"36Pu0A2Wly3uYeIPZ4YPAuBy";"23/Mar/2011";"18816"
"9E1608JFGk2GZQ4ppe1Grtv2";"28/Mar/2011";"12544"
"pachCsiog05bpK0kDA3K2lhEY";"17/Mar/2011";"18029"
"pachCsiog05bpK0kDA3K2lhEY";"18/Mar/2011";"12544"
"pachCsiog05bpK0kDA3K2lhEY";"21/Mar/2011";"18816"
"pachCsiog05bpK0kDA3K2lhEY";"22/Mar/2011";"6272"
"pachCsiog05bpK0kDA3K2lhEY";"23/Mar/2011";"18816"
"pachCsiog05bpK0kDA3K2lhEY";"28/Mar/2011";"501760"

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文