优化超大文件的处理

发布于 2024-10-19 06:43:48 字数 3049 浏览 10 评论 0原文

我的任务相对简单:对于输入文件中的每一行,测试该行是否满足一组给定的条件,如果满足,则将该行的特定列写入新文件。我已经编写了一个Python脚本来执行此操作,但我需要一些帮助:1)提高速度,2)列名方面的最佳工作方式(因为列号可能因文件而异),以及3 )指定过滤条件和所需输出列的最佳方式。

1)我使用的文件包含天文图像的光度测定。每个文件大约有 1e6 行 x 150 列浮点,大小通常超过 1GB。我有一个旧的 AWK 脚本,可以在大约 1 分钟内处理这样的文件;我的 python 脚本需要 5 到 7 分钟。我经常需要调整过滤条件并重新运行几次,直到输出文件是我想要的,所以速度绝对是理想的。我发现 for 循环非常快;这就是我在循环内做事的方式,这会减慢它的速度。与将整行读入内存相比,使用 itemgetter 只挑选出我想要的列是一个很大的改进,但我不确定可以做什么来进一步提高速度。它能像 AWK 一样快吗?

2)我想根据列名而不是列号进行工作,因为特定数量(光子计数、背景、信噪比等)的列号可以在文件之间发生变化。在我的 AWK 脚本中,我始终需要检查指定条件和输出列的列号是否正确,即使过滤和输出适用于相同的数量。我在 python 中的解决方案是创建一个字典,为每个数量分配一个列号。当一个文件有不同的列时,我只需要指定一个新的字典。也许有更好的方法来做到这一点?

3)理想情况下,我只需要指定输入和输出文件的名称、过滤条件和所需的输出列,它们可以在我的脚本顶部找到,这样我就不需要进行搜索代码只是为了调整一些东西。我的主要问题是未定义的变量。例如,典型的条件是“SNR>1”。 4',但在开始从光度测定文件中读取行之前,“SNR”(信噪比)实际上并未分配值。我的解决方案是使用字符串和 eval/exec 的组合。再说一遍,也许有更好的方法?

我根本没有接受过计算机科学方面的培训(我是天文学的研究生)——我通常只是将一些东西组合在一起并进行调试,直到它起作用。然而,上述三点的优化对于我的研究来说变得极其重要。对于冗长的帖子,我深表歉意,但我认为详细信息会有所帮助。除了清理事情/编码风格之外,您对我的任何和所有建议都将不胜感激。

非常感谢, 杰克

#! /usr/bin/env python2.6

from operator import itemgetter


infile = 'ugc4305_1.phot'
outfile = 'ugc4305_1_filt.phot'

# names must belong to dicitonary
conditions = 'OBJ <= 2 and SNR1 > 4 and SNR2 > 4 and FLAG1 < 8 and FLAG2 < 8 and (SHARP1 + SHARP2)**2 < 0.075 and (CROWD1 + CROWD2) < 0.1'

input = 'OBJ, SNR1, SNR2, FLAG1, FLAG2, SHARP1, SHARP2, CROWD1, CROWD2'
    # should contain all quantities used in conditions

output = 'X, Y, OBJ, COUNTS1, BG1, ACS1, ERR1, CHI1, SNR1, SHARP1, ROUND1, CROWD1, FLAG1, COUNTS2, BG2, ACS2, ERR2, CHI2, SNR2, SHARP2, ROUND2, CROWD2, FLAG2'

# dictionary of col. numbers for the more important qunatities
columns = dict(EXT=0, CHIP=1, X=2, Y=3, CHI_GL=4, SNR_GL=5, SHARP_GL=6, ROUND_GL=7, MAJAX_GL=8, CROWD_GL=9, OBJ=10, COUNTS1=11, BG1=12, ACS1=13, STD1=14, ERR1=15, CHI1=16, SNR1=17, SHARP1=18, ROUND1=19, CROWD1=20, FWHM1=21, ELLIP1=22, PSFA1=23, PSFB1=24, PSFC1=25, FLAG1=26, COUNTS2=27, BG2=28, ACS2=29, STD2=30, ERR2=31, CHI2=32, SNR2=33, SHARP2=34, ROUND2=35, CROWD2=36, FWHM2=37, ELLIP2=38, PSFA2=39, PSFB2=40, PSFC2=41, FLAG2=42)



f = open(infile)
g = open(outfile, 'w')


# make string that extracts values for testing
input_items = []
for i in input.replace(',', ' ').split():
    input_items.append(columns[i])
input_items = ', '.join(str(i) for i in input_items)

var_assign = '%s = [eval(i) for i in itemgetter(%s)(line.split())]' % (input, input_items) 


# make string that specifies values for writing
output_items = []
for i in output.replace(',', ' ').split():
    output_items.append(columns[i])
output_items = ', '.join(str(i) for i in output_items)

output_values = 'itemgetter(%s)(line.split())' % output_items


# make string that specifies format for writing
string_format = []
for i in output.replace(',', ' ').split():
    string_format.append('%s')
string_format = ' '.join(string_format)+'\n'


# main loop
for line in f:
   exec(var_assign)
   if eval(conditions):
      g.write(string_format % tuple(eval(output_values)))
f.close()
g.close()

My task is relatively simple: for each line in an input file, test whether the line satisfies a given set of conditions, and if so, write specific columns of that line to a new file. I've written a python script that does this, but I'd like some help on 1) improving speed, 2) the best way to work in terms of column names (as column numbers can vary from file to file), and 3) the best way to specify my filtering conditions and desired output columns.

1) The files I work with contain photometry for astronomical images. Each file is around 1e6 lines by 150 columns of floats, typically over 1GB in size. I have an old AWK script that will process files like this in about 1 minute; my python script takes between 5 and 7 minutes. I often need to tweak the filtering conditions and rerun several times until the output file is what I want, so speed is definitely desirable. I've found that the for loop is plenty fast; it's how I do things inside the loop that slow it down. Using itemgetter to pick out just the columns I want was a big improvement over reading the entire line into memory, but I'm unsure of what I can do to further increase speed. Can this ever be as fast as AWK?

2) I'd like to work in terms of column names instead of column numbers since the column number of a particular quantity (photon counts, background, signal-to-noise, etc) can change between files. In my AWK script, I always need to check that the column numbers are correct where conditions and output columns are specified, even if the filtering and output apply to the same quantities. My solution in python has been to create a dictionary that assigns a column number to each quantity. When a file has different columns, I only need to specify a new dictionary. Perhaps there is a better way to do this?

3) Ideally, I would only need to specify the names of the input and output files, the filtering conditions, and desired columns to output, and they would be found at the top of my script so I wouldn't need to go searching through the code just to tweak something. My main issue has been with undefined variables. For example, a typical condition is 'SNR > 4', but 'SNR' (signal-to-noise) isn't actually assigned a value until lines start being read from the photometry file. My solution has been to use a combination of strings and eval/exec. Again, maybe there is a better way?

I'm not at all trained in computer science (I'm a grad student in astronomy) - I typically just hack something together and debug until it works. However, optimization with regard to my three points above has become extremely important for my research. I apologize for the lengthy post, but I felt that the details would be helpful. Any and all advice you have for me, in addition to just cleaning things up/coding style, would be greatly appreciated.

Thanks so much,
Jake

#! /usr/bin/env python2.6

from operator import itemgetter


infile = 'ugc4305_1.phot'
outfile = 'ugc4305_1_filt.phot'

# names must belong to dicitonary
conditions = 'OBJ <= 2 and SNR1 > 4 and SNR2 > 4 and FLAG1 < 8 and FLAG2 < 8 and (SHARP1 + SHARP2)**2 < 0.075 and (CROWD1 + CROWD2) < 0.1'

input = 'OBJ, SNR1, SNR2, FLAG1, FLAG2, SHARP1, SHARP2, CROWD1, CROWD2'
    # should contain all quantities used in conditions

output = 'X, Y, OBJ, COUNTS1, BG1, ACS1, ERR1, CHI1, SNR1, SHARP1, ROUND1, CROWD1, FLAG1, COUNTS2, BG2, ACS2, ERR2, CHI2, SNR2, SHARP2, ROUND2, CROWD2, FLAG2'

# dictionary of col. numbers for the more important qunatities
columns = dict(EXT=0, CHIP=1, X=2, Y=3, CHI_GL=4, SNR_GL=5, SHARP_GL=6, ROUND_GL=7, MAJAX_GL=8, CROWD_GL=9, OBJ=10, COUNTS1=11, BG1=12, ACS1=13, STD1=14, ERR1=15, CHI1=16, SNR1=17, SHARP1=18, ROUND1=19, CROWD1=20, FWHM1=21, ELLIP1=22, PSFA1=23, PSFB1=24, PSFC1=25, FLAG1=26, COUNTS2=27, BG2=28, ACS2=29, STD2=30, ERR2=31, CHI2=32, SNR2=33, SHARP2=34, ROUND2=35, CROWD2=36, FWHM2=37, ELLIP2=38, PSFA2=39, PSFB2=40, PSFC2=41, FLAG2=42)



f = open(infile)
g = open(outfile, 'w')


# make string that extracts values for testing
input_items = []
for i in input.replace(',', ' ').split():
    input_items.append(columns[i])
input_items = ', '.join(str(i) for i in input_items)

var_assign = '%s = [eval(i) for i in itemgetter(%s)(line.split())]' % (input, input_items) 


# make string that specifies values for writing
output_items = []
for i in output.replace(',', ' ').split():
    output_items.append(columns[i])
output_items = ', '.join(str(i) for i in output_items)

output_values = 'itemgetter(%s)(line.split())' % output_items


# make string that specifies format for writing
string_format = []
for i in output.replace(',', ' ').split():
    string_format.append('%s')
string_format = ' '.join(string_format)+'\n'


# main loop
for line in f:
   exec(var_assign)
   if eval(conditions):
      g.write(string_format % tuple(eval(output_values)))
f.close()
g.close()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(6

反差帅 2024-10-26 06:43:48

我认为你没有提到它,但看起来你的数据是在 csv 中。使用 csv.DictReader 可能会让您受益匪浅。您可以一次迭代 1 行文件(避免将整个文件加载到内存中)并按列名称引用列。

如果您还没有看过的话,您还应该看看 cProfile(Python 的分析器)。它会告诉您程序的哪些部分执行时间最长。

I don't think you mentioned it, but it looks like your data is in csv. You might get a lot out of using csv.DictReader. You can iterate over files 1 line at a time (avoiding loading the whole thing into memory) and refer to columns by their names.

You should also take a look at cProfile, Python's profiler, if you haven't already. It will tell you what bits of your program are taking the most time to execute.

浮华 2024-10-26 06:43:48

我的第一步是摆脱 exec()eval() 调用。每次评估字符串时,都必须对其进行编译,然后执行,从而增加文件每一行上函数调用的开销。更不用说,eval 往往会导致混乱、难以调试的代码,通常应该避免。

您可以通过将逻辑放入小的、易于理解的函数中来开始重构。例如,您可以将 eval(conditions) 替换为函数,例如:

def conditions(d):
    return (d[OBJ] <= 2 and
            d[SNRI] > 4 and
            d[SNR2] > 4 and
            d[FLAG1] < 8 and ...

提示:如果某些条件语句失败的可能性较高,请将它们放在前面,Python 将跳过其余条件的求值。

我将摆脱列名字典,只需在文件顶部设置一堆变量,然后通过 line[COLNAME] 引用列。这可以帮助您简化某些部分,例如条件函数,并且您可以按名称引用列,而不必分配每个变量。

My first step here, would be to get rid of the exec() and eval() calls. Each time you eval a string, it has to be compiled, and then executed, adding to the overhead of your function call on every line of your file. Not to mention, eval tends to lead to messy, hard to debug code, and should generally be avoided.

You can start refactoring by putting your logic into a small, easily understandable functions. For example, you can replace eval(conditions) with a function, e.g.:

def conditions(d):
    return (d[OBJ] <= 2 and
            d[SNRI] > 4 and
            d[SNR2] > 4 and
            d[FLAG1] < 8 and ...

Tip: if some of your conditionals have higher probability of failing, put them in first, and python will skip the evaluation of the rest.

I would get rid of the dictionary of column names, and simply set a bunch of variables at the top of your file, then refer to columns by line[COLNAME]. This may help you simplify some parts like the conditions function, and you can refer to the columns by name, without having to assign each variable.

世俗缘 2024-10-26 06:43:48

以下是我将如何处理这样的事情...

这在我的机器上运行大约 35 秒,而原始脚本则运行大约 3 分钟。可以添加更多优化(例如,我们只需将一些列转换为浮点数),但这只会减少几秒钟的运行时间。

正如几个人所建议的那样,您也可以在这里轻松使用 csv.DictReader 。我正在避免它,因为您必须定义一个自定义方言,并且如果没有它,只需几行额外的代码即可完成相同的操作。 (各种 csv 模块类还检查更复杂的行为(例如带引号的字符串等),在这种特殊情况下您无需担心。它们在许多情况下非常非常方便,但在这种情况下它们有点矫枉过正。)

请注意,当您调用脚本时,您还可以轻松添加 infile 和 outfile 名称作为参数,而不是对它们进行硬编码(即 infile = sys.argv[0]< /代码>等)。这还允许您轻松地通过管道输入或输出数据...(您可以检查 sys.argv 的长度并设置 infile 或 outfile > 相应地 sys.stdin 和/或 sys.stdout

def main():
    infile = 'ugc4305_1.phot'
    outfile = 'ugc4305_1_filt.phot'
    process_data(infile, outfile)

def filter_conditions(row):
    for key, value in row.iteritems():
        row[key] = float(value)

    cond = (row['OBJ'] <= 2 and row['SNR1'] > 4 
       and row['SNR2'] > 4 and row['FLAG1'] < 8 
       and row['FLAG2'] < 8 
       and (row['SHARP1'] + row['SHARP2'])**2 < 0.075 
       and (row['CROWD1'] + row['CROWD2']) < 0.1
       )
    return cond

def format_output(row):
    output_columns = ('X', 'Y', 'OBJ', 'COUNTS1', 'BG1', 'ACS1', 'ERR1', 'CHI1', 
                     'SNR1', 'SHARP1', 'ROUND1', 'CROWD1', 'FLAG1', 'COUNTS2', 
                     'BG2', 'ACS2', 'ERR2', 'CHI2', 'SNR2', 'SHARP2', 'ROUND2', 
                     'CROWD2', 'FLAG2')
    delimiter = '\t'
    return delimiter.join((row[name] for name in output_columns))

def process_data(infilename, outfilename):
    column_names = ('EXT', 'CHIP', 'X', 'Y', 'CHI_GL', 'SNR_GL', 'SHARP_GL', 
                    'ROUND_GL', 'MAJAX_GL', 'CROWD_GL', 'OBJ', 'COUNTS1', 
                    'BG1', 'ACS1', 'STD1', 'ERR1', 'CHI1', 'SNR1', 'SHARP1', 
                    'ROUND1', 'CROWD1', 'FWHM1', 'ELLIP1', 'PSFA1', 'PSFB1', 
                    'PSFC1', 'FLAG1', 'COUNTS2', 'BG2', 'ACS2', 'STD2', 
                    'ERR2', 'CHI2', 'SNR2', 'SHARP2', 'ROUND2', 'CROWD2', 
                    'FWHM2', 'ELLIP2', 'PSFA2', 'PSFB2', 'PSFC2', 'FLAG2')

    with open(infilename) as infile:
        with open(outfilename, 'w') as outfile:
            for line in infile:
                line = line.strip().split()
                row = dict(zip(column_names, line))
                if filter_conditions(row.copy()):
                    outfile.write(format_output(row) + '\n')

if __name__ == '__main__':
    main()

Here's how I would go about something like this...

This runs in ~35 secs vs. ~3 minutes for your original script on my machine. It is possible to add a few more optimizations (we only need to convert a few of the columns to floats, for example), but that only shaves a few seconds off of the run time.

You could also easily use csv.DictReader here, as several people have suggested. I'm avoiding it, as you'd have to define a custom dialect, and it's only a couple of extra lines to do the same thing without it. (The various csv module classes also check for more complex behavior (e.g. quoted strings, etc) that you don't need to worry about in this particular case. They're very, very handy in many cases, but they're slight overkill in this case.)

Note that you can also easily add your infile and outfile names as arguments when you call the script instead of hardcoding them in (i.e. infile = sys.argv[0], etc). This would also allow you to pipe data in or out easily... (You can check the length of sys.argv and set infile or outfile to sys.stdin and/or sys.stdout accordingly)

def main():
    infile = 'ugc4305_1.phot'
    outfile = 'ugc4305_1_filt.phot'
    process_data(infile, outfile)

def filter_conditions(row):
    for key, value in row.iteritems():
        row[key] = float(value)

    cond = (row['OBJ'] <= 2 and row['SNR1'] > 4 
       and row['SNR2'] > 4 and row['FLAG1'] < 8 
       and row['FLAG2'] < 8 
       and (row['SHARP1'] + row['SHARP2'])**2 < 0.075 
       and (row['CROWD1'] + row['CROWD2']) < 0.1
       )
    return cond

def format_output(row):
    output_columns = ('X', 'Y', 'OBJ', 'COUNTS1', 'BG1', 'ACS1', 'ERR1', 'CHI1', 
                     'SNR1', 'SHARP1', 'ROUND1', 'CROWD1', 'FLAG1', 'COUNTS2', 
                     'BG2', 'ACS2', 'ERR2', 'CHI2', 'SNR2', 'SHARP2', 'ROUND2', 
                     'CROWD2', 'FLAG2')
    delimiter = '\t'
    return delimiter.join((row[name] for name in output_columns))

def process_data(infilename, outfilename):
    column_names = ('EXT', 'CHIP', 'X', 'Y', 'CHI_GL', 'SNR_GL', 'SHARP_GL', 
                    'ROUND_GL', 'MAJAX_GL', 'CROWD_GL', 'OBJ', 'COUNTS1', 
                    'BG1', 'ACS1', 'STD1', 'ERR1', 'CHI1', 'SNR1', 'SHARP1', 
                    'ROUND1', 'CROWD1', 'FWHM1', 'ELLIP1', 'PSFA1', 'PSFB1', 
                    'PSFC1', 'FLAG1', 'COUNTS2', 'BG2', 'ACS2', 'STD2', 
                    'ERR2', 'CHI2', 'SNR2', 'SHARP2', 'ROUND2', 'CROWD2', 
                    'FWHM2', 'ELLIP2', 'PSFA2', 'PSFB2', 'PSFC2', 'FLAG2')

    with open(infilename) as infile:
        with open(outfilename, 'w') as outfile:
            for line in infile:
                line = line.strip().split()
                row = dict(zip(column_names, line))
                if filter_conditions(row.copy()):
                    outfile.write(format_output(row) + '\n')

if __name__ == '__main__':
    main()
背叛残局 2024-10-26 06:43:48

正如nmichaels所说,您可以使用csv.DictReaderfieldnamesdialect参数来读取该文件。然后,对于每一行你都会有一个字典。有了字典,您就不必使用 eval,并且可以使用类似这样的语句:

if data_item['OBJ'] <= 2 and data_item['SNR1']:
     g.write(data_item['X'], data_item['Y'], data_item['OBJ'])

由于所有的 eval,您现在执行的方式既缓慢又复杂。没有必要那么复杂。

Like what nmichaels said, you can use the fieldnames and dialect parameters of csv.DictReader to read this file. Then, for each line you will have a dictionary. With the dictionary, you won't have to use eval, and can use statments like

if data_item['OBJ'] <= 2 and data_item['SNR1']:
     g.write(data_item['X'], data_item['Y'], data_item['OBJ'])

The way you're doing it now is slow and complicated because of all the evals. There no need for that complexity.

韶华倾负 2024-10-26 06:43:48

如果分析显示大量时间花费在实际读取和解析文件上,并且您将多次处理相同的原始文件,您可以尝试创建一个针对使用 Python 读取而优化的中间文件格式。

可以尝试的一件事是读取文件一次,使用 pickle/cPickle。然后在过滤器脚本中使用 pickle/cpickle 读取中间文件。

由于不太了解 python,无法判断这是否比读取每一行并拆分它们更快。 (在 c# 中,我会使用二进制序列化器,但我不知道在 python 中是否可用)。

如果磁盘 IO 是瓶颈,您还可以尝试压缩输入文件并使用 gzip< /a> 模块。

If profiling shows that a lot of time is spent on the actual reading and parsing of the files and you will process the same raw file many times you can try to create an intermediate file format optimized for reading with Python.

One thing to try can be to read the file once, parse and output the result with pickle/cPickle. Then read the intermediate file with pickle/cpickle in your filter script.

In don't know python well enough to tell if this will be faster than reading each line and split them. (In c# I would use a binary serializer, but I don't know if that is available in python).

If disk IO is a bottle neck you may also try to zip your input files and read them with the gzip module.

忘年祭陌 2024-10-26 06:43:48

你尝试过熊猫吗?

我相信 OBJ、SNR1...是列名称,我希望您在所有行上应用相同的条件。
如果是这样的话,我建议你选择熊猫。

你的代码片段会是这样的......

import pandas as pd

infile = 'ugc4305_1.phot'
outfile = 'ugc4305_1_filt.phot'

df = pd.read_csv(infile)

condition = (df['OBJ'] <= 2) & (df['SRN1'] > 4) & (df['SRN2'] > 4) & (df['FLAG1'] < 8) & (df['FLAG2'] < 8) & ((df['SHARP1'] + df['SHARP2'])**2 < 0.075) & ((df['CROWD1'] + df['CROWD2']) < 0.1)
newDf = df[condition]

columnNames = ['col1', 'col2', ...] # column names you want in result

newDf = df[columnNames]

newDf.to_csv(outfile)

Have you tried pandas?

I believe OBJ, SNR1, ... are column names and I hope you are applying the same condition on all your rows.
If that's the case I suggest you to go with pandas.

Your code snippet would go something like this...

import pandas as pd

infile = 'ugc4305_1.phot'
outfile = 'ugc4305_1_filt.phot'

df = pd.read_csv(infile)

condition = (df['OBJ'] <= 2) & (df['SRN1'] > 4) & (df['SRN2'] > 4) & (df['FLAG1'] < 8) & (df['FLAG2'] < 8) & ((df['SHARP1'] + df['SHARP2'])**2 < 0.075) & ((df['CROWD1'] + df['CROWD2']) < 0.1)
newDf = df[condition]

columnNames = ['col1', 'col2', ...] # column names you want in result

newDf = df[columnNames]

newDf.to_csv(outfile)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文