pyparsing - 并行日志处理的性能技巧

发布于 2024-12-01 20:44:42 字数 4475 浏览 1 评论 0原文

我正在使用 2 个进程池来并行解析多个日志文件,

po = Pool(processes=2)
pool_object = po.apply_async(log_parse, (hostgroup_sender_dir, hostname, host_depot_dir,        synced_log, prev_last_pos, get_report_rate), )

(curr_last_pos, remote_report_datetime, report_gen_rate) = pool_object.get()

但是在初始运行时它非常慢, 大约 16 分钟,大约 12 个 ~20Mb 文件。

考虑到我将每 2 或 3 分钟解析一次日志新字节,在下一次迭代中不会有太大问题, 但我在第一次运行时的表现肯定还有改进的空间。 将原木预先分割成几个较小尺寸的接头 (这样 pyparse 就不必将整个盎司的日志分配到内存中) 加快速度吗?

我仍在双核开发虚拟机上运行它, 但很快就必须迁移到四核物理服务器(我将尝试获得额外的四核 CPU),并且它可能需要能够管理约 50 个日志。

从日志中进行拼接,

log_splice = """
# XX_MAIN     (23143) Report at 2011-08-30 20:00:00.003    Type:  Periodic     #
# Report number 1790                                        State: Active      #
################################################################################
# Running since                  : 2011-08-12 04:40:06.153                     #
# Total execution time           :  18 day(s) 15:19:53.850                     #
# Last report date               : 2011-08-30 19:45:00.002                     #
# Time since last periodic report:   0 day(s) 00:15:00.000                     #
################################################################################
                            ----------------------------------------------------
                            |       Periodic        |          Global          |
----------------------------|-----------------------|--------------------------|
Simultaneous Accesses       |  Curr  Max Cumulative |      Max    Cumulative   |
--------------------------- |  ---- ---- ---------- |     ---- -------------   |
Accesses                    |     1    5          - |      180             -   |
- in start/stop state       |     1    5      12736 |      180      16314223   |
-------------------------------------------------------------------------------|
Accesses per Second         |    Max   Occurr. Date |      Max Occurrence Date |
--------------------------- | ------ -------------- |   ------ --------------- |
Accesses per second         |  21.00 08-30 19:52:33 |    40.04  08-16 20:19:18 |
-------------------------------------------------------------------------------|
Service Statistics          |  Success    Total  %  |   Success      Total  %  |
--------------------------- | -------- -------- --- | --------- ---------- --- |
Services accepted accesses  |    17926    17927  99 |  21635954   21637230 -98 |
- 98: NF                    |     7546     7546 100 |  10992492   10992492 100 |
- 99: XFC                   |    10380    10380 100 |  10643462   10643462 100 |
 ----------------------------------------------------------------------------- |
Services succ. terminations |    12736    12736 100 |  16311566   16314222  99 |
- 98: NF                    |     7547     7547 100 |  10991401   10992492  99 |
- 99: XFC                   |     5189     5189 100 |   5320165    5321730  99 |
 ----------------------------------------------------------------------------- |
""" 

使用 pyparse

unparsed_log_data = input_log.read()

#------------------------------------------------------------------------
# Define Grammars
#------------------------------------------------------------------------
integer = Word( nums )

# XX_MAIN     ( 4801) Report at 2010-01-25 06:55:00
binary_name = "# XX_MAIN"
pid = "(" + Word(nums) + ")"
report_id = Suppress(binary_name) + Suppress(pid)

# Word as a contiguous set of characters found in the string nums
year = Word(nums, max=4)
month = Word(nums, max=2)
day = Word(nums, max=2)
# 2010-01-25 grammar
yearly_day_bnf = Combine(year + "-" + month + "-" + day)
# 06:55:00. grammar
clock24h_bnf = Combine(Word(nums, max=2) + ":" + Word(nums, max=2) + ":" + Word(nums,     max=2) + Suppress("."))
timestamp_bnf = Combine(yearly_day_bnf + White(' ') + clock24h_bnf)("timestamp")

report_bnf = report_id + Suppress("Report at ") + timestamp_bnf

# Service Statistics          |  Success    Total  %  | 
# Services succ. terminations |       40       40 100 |   3494775    3497059  99 |
partial_report_ignore = Suppress(SkipTo("Services succ. terminations", include=True))
succ_term_bnf = Suppress("|") + integer("succTerms") + integer("totalTerms")
terminations_report_bnf = report_bnf + partial_report_ignore + succ_term_bnf

# Apply the BNF to the unparsed data
terms_parsing = terminations_report_bnf.searchString(unparsed_log_data)

I'm using a 2 processes Pool to parallel parse several log files,

po = Pool(processes=2)
pool_object = po.apply_async(log_parse, (hostgroup_sender_dir, hostname, host_depot_dir,        synced_log, prev_last_pos, get_report_rate), )

(curr_last_pos, remote_report_datetime, report_gen_rate) = pool_object.get()

However it's quite slow on the initial run,
~16min for about twelve ~20Mb files.

There won't be much of a problem in the next iterations, considering I'll parse the logs new bytes each 2 or 3 min,
but surely there's room for improvement on how I'm doing it on the first run.
Would pre-splitting the logs into several lower sized splices
(so that pyparse won't have to allocate the entirety of the log at ounce into memory)
speed it up?

I'm still running it on a dual core dev VM,
but soon will have to migrate to a quad core physical Server (I'll try to get an extra quad-core CPU) and it may need to be able to manage ~50 logs.

A splice from the log,

log_splice = """
# XX_MAIN     (23143) Report at 2011-08-30 20:00:00.003    Type:  Periodic     #
# Report number 1790                                        State: Active      #
################################################################################
# Running since                  : 2011-08-12 04:40:06.153                     #
# Total execution time           :  18 day(s) 15:19:53.850                     #
# Last report date               : 2011-08-30 19:45:00.002                     #
# Time since last periodic report:   0 day(s) 00:15:00.000                     #
################################################################################
                            ----------------------------------------------------
                            |       Periodic        |          Global          |
----------------------------|-----------------------|--------------------------|
Simultaneous Accesses       |  Curr  Max Cumulative |      Max    Cumulative   |
--------------------------- |  ---- ---- ---------- |     ---- -------------   |
Accesses                    |     1    5          - |      180             -   |
- in start/stop state       |     1    5      12736 |      180      16314223   |
-------------------------------------------------------------------------------|
Accesses per Second         |    Max   Occurr. Date |      Max Occurrence Date |
--------------------------- | ------ -------------- |   ------ --------------- |
Accesses per second         |  21.00 08-30 19:52:33 |    40.04  08-16 20:19:18 |
-------------------------------------------------------------------------------|
Service Statistics          |  Success    Total  %  |   Success      Total  %  |
--------------------------- | -------- -------- --- | --------- ---------- --- |
Services accepted accesses  |    17926    17927  99 |  21635954   21637230 -98 |
- 98: NF                    |     7546     7546 100 |  10992492   10992492 100 |
- 99: XFC                   |    10380    10380 100 |  10643462   10643462 100 |
 ----------------------------------------------------------------------------- |
Services succ. terminations |    12736    12736 100 |  16311566   16314222  99 |
- 98: NF                    |     7547     7547 100 |  10991401   10992492  99 |
- 99: XFC                   |     5189     5189 100 |   5320165    5321730  99 |
 ----------------------------------------------------------------------------- |
""" 

using pyparse,

unparsed_log_data = input_log.read()

#------------------------------------------------------------------------
# Define Grammars
#------------------------------------------------------------------------
integer = Word( nums )

# XX_MAIN     ( 4801) Report at 2010-01-25 06:55:00
binary_name = "# XX_MAIN"
pid = "(" + Word(nums) + ")"
report_id = Suppress(binary_name) + Suppress(pid)

# Word as a contiguous set of characters found in the string nums
year = Word(nums, max=4)
month = Word(nums, max=2)
day = Word(nums, max=2)
# 2010-01-25 grammar
yearly_day_bnf = Combine(year + "-" + month + "-" + day)
# 06:55:00. grammar
clock24h_bnf = Combine(Word(nums, max=2) + ":" + Word(nums, max=2) + ":" + Word(nums,     max=2) + Suppress("."))
timestamp_bnf = Combine(yearly_day_bnf + White(' ') + clock24h_bnf)("timestamp")

report_bnf = report_id + Suppress("Report at ") + timestamp_bnf

# Service Statistics          |  Success    Total  %  | 
# Services succ. terminations |       40       40 100 |   3494775    3497059  99 |
partial_report_ignore = Suppress(SkipTo("Services succ. terminations", include=True))
succ_term_bnf = Suppress("|") + integer("succTerms") + integer("totalTerms")
terminations_report_bnf = report_bnf + partial_report_ignore + succ_term_bnf

# Apply the BNF to the unparsed data
terms_parsing = terminations_report_bnf.searchString(unparsed_log_data)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

临走之时 2024-12-08 20:44:42

我将围绕解析单个日志条目构建解析器。这完成了两件事:

  1. 它将问题分解为易于并行化的块,
  2. 在处理初始积压的日志数据后,将您的解析器定位为处理增量日志处理,

然后您的并行化块大小就是一个很好打包的单个项目,并且每个进程都可以单独解析该项目(假设您不需要将任何状态或经过的时间信息从一条日志消息转移到下一条日志消息)。

编辑(这个问题已经演变成更多有关 pyparsing 调优的主题...)

我发现最好定义使用Combine(lots)构建的低级原语+of+表达式+此处) 使用 pyparsing Regex 表达式。这通常适用于实数或时间戳等表达式,例如:

# 2010-01-25 grammar
yearly_day_bnf = Combine(year + "-" + month + "-" + day)
yearly_day_bnf = Regex(r"\d{4}-\d{2}-\d{2}")

# 06:55:00. grammar
clock24h_bnf = Combine(Word(nums, max=2) + ":" + 
                       Word(nums, max=2) + ":" + 
                       Word(nums, max=2) + Suppress("."))
clock24h_bnf = Regex(r"\d{2}:\d{2}:\d{2}\.")
clock24h_bnf.setParseAction(lambda tokens:tokens[0][:-1])

timestamp_bnf = Combine(yearly_day_bnf + White(' ') + clock24h_bnf)
timestamp_bnf = Regex(r"\d{4}-\d{2}-\d{2}\s+\d{1,2}:\d{2}:\d{2}")

不过,无需过度。像 integer=Word(nums) 这样的东西已经在幕后生成 RE。

请注意,我还从 timestamp_bnf 中删除了结果名称。我通常会省略原始定义中的结果名称,并在将它们组装成更高级别的表达式时添加它们,这样我就可以多次使用相同的原始名称,但名称不同,例如:

summary = ("Started:" + timestamp_bnf("startTime") + 
           "Ended:" + timestamp_bnf("endTime"))

我发现这也可以帮助我组织我的结果名称。解析的结构。

将结果名称移至更高的表达式也会导致我为该字段指定一个更具描述性的名称:

report_bnf = report_id + Suppress("Report at ") + timestamp_bnf("reportTime")

查看您的语法,您并没有真正破解所有此报告信息,只是从这一行中提取报告时间:

# XX_MAIN     (23143) Report at 2011-08-30 20:00:00.003

以及来自的 2 个整数字段这行:

Services succ. terminations |    12736    12736 100 |  16311566   16314222  99 |

试试这个:

report_bnf = report_id + Suppress("Report at") + timestamp_bnf("reportTime")
succ_term_bnf = (Suppress("Services succ. terminations") + Suppress("|") + 
                        integer("succTerms") + integer("totalTerms"))
log_data_sources_bnf = report_bnf | succ_term_bnf

extractLogData = lambda logentry : sum(log_data_sources_bnf.searchString(logentry))

print extractLogData(log_slice).dump()

Pyparsing 总是比 RE 慢,并且在您的情况下 pyparsing 解析器可能只是原型设计的垫脚石。我确信使用 pyparsing 解析器无法让您获得 500 倍的性能,您可能只需要使用基于 RE 的解决方案来处理 Mb 的日志文件。

I would structure the parser around parsing a single log entry. This accomplishes 2 things:

  1. it breaks up the problem into easily parallelizable chunks
  2. it positions your parser to handle the incremental log processing after the initial backlog of log data has been processed

Your parallelizing chunk size is then a nicely packaged single item, and each process can parse the item separately (assuming that you don't need to carry forward any state or elapsed time info from one log message to the next).

EDIT (this question has morphed into more of a topic on pyparsing tuning...)

I've found that it is better to define low-level primitives that are built up using Combine(lots+of+expressions+here) using a pyparsing Regex expression. This usually applies to expressions like real numbers or timestamps, such as:

# 2010-01-25 grammar
yearly_day_bnf = Combine(year + "-" + month + "-" + day)
yearly_day_bnf = Regex(r"\d{4}-\d{2}-\d{2}")

# 06:55:00. grammar
clock24h_bnf = Combine(Word(nums, max=2) + ":" + 
                       Word(nums, max=2) + ":" + 
                       Word(nums, max=2) + Suppress("."))
clock24h_bnf = Regex(r"\d{2}:\d{2}:\d{2}\.")
clock24h_bnf.setParseAction(lambda tokens:tokens[0][:-1])

timestamp_bnf = Combine(yearly_day_bnf + White(' ') + clock24h_bnf)
timestamp_bnf = Regex(r"\d{4}-\d{2}-\d{2}\s+\d{1,2}:\d{2}:\d{2}")

No need to overdo, though. Things like integer=Word(nums) are already generating RE's under the covers.

Note that I also removed the results name from timestamp_bnf. I usually leave off the results names from the primitive definitions, and add them as I assemble them into higher-level expressions, so I can use the same primitive multiple times, with different names, like:

summary = ("Started:" + timestamp_bnf("startTime") + 
           "Ended:" + timestamp_bnf("endTime"))

I find that this also helps me organize my parsed structures.

Moving the results name to the higher expression also leads me to give the field a more descriptive name:

report_bnf = report_id + Suppress("Report at ") + timestamp_bnf("reportTime")

Looking at your grammar, you are not really cracking all of this report info, just extracting the report time from this line:

# XX_MAIN     (23143) Report at 2011-08-30 20:00:00.003

and 2 integer fields from this line:

Services succ. terminations |    12736    12736 100 |  16311566   16314222  99 |

Try this instead:

report_bnf = report_id + Suppress("Report at") + timestamp_bnf("reportTime")
succ_term_bnf = (Suppress("Services succ. terminations") + Suppress("|") + 
                        integer("succTerms") + integer("totalTerms"))
log_data_sources_bnf = report_bnf | succ_term_bnf

extractLogData = lambda logentry : sum(log_data_sources_bnf.searchString(logentry))

print extractLogData(log_slice).dump()

Pyparsing will always be slower than RE's, and it may be that a pyparsing parser in your case is just a prototyping stepping stone. I'm sure I can't get you 500X performance with a pyparsing parser, and you may just have to use the RE-based solution to process Mb's worth of log files.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文