用于合并排序文件的Python类,如何改进?

发布于 2024-07-23 17:11:45 字数 6146 浏览 6 评论 0原文

背景:

我正在清理大型(无法保存在内存中)制表符分隔的文件。 当我清理输入文件时,我在内存中建立了一个列表; 当达到 1,000,000 个条目(内存约为 1GB)时,我对其进行排序(使用下面的默认键)并将列表写入文件。 此类用于将排序后的文件重新组合在一起。 它适用于我迄今为止遇到的文件。 到目前为止,我最大的案例是合并 66 个已排序的文件。

问题:

  1. 我的逻辑是否存在漏洞(哪里脆弱)?
  2. 我实现了合并排序吗 算法正确吗?
  3. 有没有明显的改进 可以做吗?

示例数据

这是以下文件之一中一行的抽象:

'hash_of_SomeStringId\tSome String Id\t\t\twww.somelink.com\t\tOtherData\t\n '

要点是我使用 'SomeStringId'.lower().replace(' ', '') 作为排序键。

原始代码:

class SortedFileMerger():
    """ A one-time use object that merges any number of smaller sorted 
        files into one large sorted file.

        ARGS:
            paths - list of paths to sorted files
            output_path - string path to desired output file
            dedup - (boolean) remove lines with duplicate keys, default = True
            key - use to override sort key, default = "line.split('\t')[1].lower().replace(' ', '')"
                  will be prepended by "lambda line: ".  This should be the same 
                  key that was used to sort the files being merged!
    """
    def __init__(self, paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
        self.key = eval("lambda line: %s" % key)
        self.dedup = dedup
        self.handles = [open(path, 'r') for path in paths]
        # holds one line from each file
        self.lines = [file_handle.readline() for file_handle in self.handles]
        self.output_file = open(output_path, 'w')
        self.lines_written = 0
        self._mergeSortedFiles() #call the main method

    def __del__(self):
        """ Clean-up file handles.
        """
        for handle in self.handles:
            if not handle.closed:
                handle.close()
        if self.output_file and (not self.output_file.closed):
            self.output_file.close()

    def _mergeSortedFiles(self):
        """ Merge the small sorted files to 'self.output_file'. This can 
            and should only be called once.
            Called from __init__().
        """
        previous_comparable = ''
        min_line = self._getNextMin()
        while min_line:
            index = self.lines.index(min_line)
            comparable = self.key(min_line)
            if not self.dedup:                      
                #not removing duplicates
                self._writeLine(index)
            elif comparable != previous_comparable: 
                #removing duplicates and this isn't one
                self._writeLine(index)
            else:                                   
                #removing duplicates and this is one
                self._readNextLine(index)
            previous_comparable = comparable
            min_line = self._getNextMin()
        #finished merging
        self.output_file.close()

    def _getNextMin(self):
        """ Returns the next "smallest" line in sorted order.
            Returns None when there are no more values to get.
        """
        while '' in self.lines:
            index = self.lines.index('')
            if self._isLastLine(index):
                # file.readline() is returning '' because 
                # it has reached the end of a file.
                self._closeFile(index)
            else:
                # an empty line got mixed in
                self._readNextLine(index)
        if len(self.lines) == 0:
            return None
        return min(self.lines, key=self.key)

    def _writeLine(self, index):
        """ Write line to output file and update self.lines
        """
        self.output_file.write(self.lines[index])
        self.lines_written += 1
        self._readNextLine(index)

    def _readNextLine(self, index):
        """ Read the next line from handles[index] into lines[index]
        """
        self.lines[index] = self.handles[index].readline()

    def _closeFile(self, index):
        """ If there are no more lines to get in a file, it 
            needs to be closed and removed from 'self.handles'.
            It's entry in 'self.lines' also need to be removed.
        """
        handle = self.handles.pop(index)
        if not handle.closed:
            handle.close()
        # remove entry from self.lines to preserve order
        _ = self.lines.pop(index)

    def _isLastLine(self, index):
        """ Check that handles[index] is at the eof.
        """
        handle = self.handles[index]            
        if handle.tell() == os.path.getsize(handle.name):
            return True
        return False

编辑:实施我来的Brian的建议使用以下解决方案:

第二次编辑:更新了 John Machin 的代码建议:

def decorated_file(f, key):
    """ Yields an easily sortable tuple. 
    """
    for line in f:
        yield (key(line), line)

def standard_keyfunc(line):
    """ The standard key function in my application.
    """
    return line.split('\t', 2)[1].replace(' ', '').lower()

def mergeSortedFiles(paths, output_path, dedup=True, keyfunc=standard_keyfunc):
    """ Does the same thing SortedFileMerger class does. 
    """
    files = map(open, paths) #open defaults to mode='r'
    output_file = open(output_path, 'w')
    lines_written = 0
    previous_comparable = ''
    for line in heapq26.merge(*[decorated_file(f, keyfunc) for f in files]):
        comparable = line[0]
        if previous_comparable != comparable:
            output_file.write(line[1])
            lines_written += 1
        previous_comparable = comparable
    return lines_written

粗略测试

使用相同的输入文件(2.2 GB 数据):

  • SortedFileMerger 类需要 51 分钟(3068.4 秒)
  • Brian 的解决方案花了 40 分钟(2408.5 秒)
  • 添加 约翰·马钦的建议, 解决方案代码耗时 36 分钟 (2214.0 秒)

Background:

I'm cleaning large (cannot be held in memory) tab-delimited files. As I clean the input file, I build up a list in memory; when it gets to 1,000,000 entries (about 1GB in memory) I sort it (using the default key below) and write the list to a file. This class is for putting the sorted files back together. It works on the files I have encountered thus far. My largest case, so far, is merging 66 sorted files.

Questions:

  1. Are there holes in my logic (where is it fragile)?
  2. Have I implemented the merge-sort
    algorithm correctly?
  3. Are there any obvious improvements
    that could be made?

Example Data:

This is an abstraction of a line in one of these files:

'hash_of_SomeStringId\tSome String Id\t\t\twww.somelink.com\t\tOtherData\t\n'

The takeaway is that I use 'SomeStringId'.lower().replace(' ', '') as my sort key.

Original Code:

class SortedFileMerger():
    """ A one-time use object that merges any number of smaller sorted 
        files into one large sorted file.

        ARGS:
            paths - list of paths to sorted files
            output_path - string path to desired output file
            dedup - (boolean) remove lines with duplicate keys, default = True
            key - use to override sort key, default = "line.split('\t')[1].lower().replace(' ', '')"
                  will be prepended by "lambda line: ".  This should be the same 
                  key that was used to sort the files being merged!
    """
    def __init__(self, paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
        self.key = eval("lambda line: %s" % key)
        self.dedup = dedup
        self.handles = [open(path, 'r') for path in paths]
        # holds one line from each file
        self.lines = [file_handle.readline() for file_handle in self.handles]
        self.output_file = open(output_path, 'w')
        self.lines_written = 0
        self._mergeSortedFiles() #call the main method

    def __del__(self):
        """ Clean-up file handles.
        """
        for handle in self.handles:
            if not handle.closed:
                handle.close()
        if self.output_file and (not self.output_file.closed):
            self.output_file.close()

    def _mergeSortedFiles(self):
        """ Merge the small sorted files to 'self.output_file'. This can 
            and should only be called once.
            Called from __init__().
        """
        previous_comparable = ''
        min_line = self._getNextMin()
        while min_line:
            index = self.lines.index(min_line)
            comparable = self.key(min_line)
            if not self.dedup:                      
                #not removing duplicates
                self._writeLine(index)
            elif comparable != previous_comparable: 
                #removing duplicates and this isn't one
                self._writeLine(index)
            else:                                   
                #removing duplicates and this is one
                self._readNextLine(index)
            previous_comparable = comparable
            min_line = self._getNextMin()
        #finished merging
        self.output_file.close()

    def _getNextMin(self):
        """ Returns the next "smallest" line in sorted order.
            Returns None when there are no more values to get.
        """
        while '' in self.lines:
            index = self.lines.index('')
            if self._isLastLine(index):
                # file.readline() is returning '' because 
                # it has reached the end of a file.
                self._closeFile(index)
            else:
                # an empty line got mixed in
                self._readNextLine(index)
        if len(self.lines) == 0:
            return None
        return min(self.lines, key=self.key)

    def _writeLine(self, index):
        """ Write line to output file and update self.lines
        """
        self.output_file.write(self.lines[index])
        self.lines_written += 1
        self._readNextLine(index)

    def _readNextLine(self, index):
        """ Read the next line from handles[index] into lines[index]
        """
        self.lines[index] = self.handles[index].readline()

    def _closeFile(self, index):
        """ If there are no more lines to get in a file, it 
            needs to be closed and removed from 'self.handles'.
            It's entry in 'self.lines' also need to be removed.
        """
        handle = self.handles.pop(index)
        if not handle.closed:
            handle.close()
        # remove entry from self.lines to preserve order
        _ = self.lines.pop(index)

    def _isLastLine(self, index):
        """ Check that handles[index] is at the eof.
        """
        handle = self.handles[index]            
        if handle.tell() == os.path.getsize(handle.name):
            return True
        return False

Edit: Implementing the suggestions from Brian I came up with the following solution:

Second Edit: Updated the code per John Machin's suggestion:

def decorated_file(f, key):
    """ Yields an easily sortable tuple. 
    """
    for line in f:
        yield (key(line), line)

def standard_keyfunc(line):
    """ The standard key function in my application.
    """
    return line.split('\t', 2)[1].replace(' ', '').lower()

def mergeSortedFiles(paths, output_path, dedup=True, keyfunc=standard_keyfunc):
    """ Does the same thing SortedFileMerger class does. 
    """
    files = map(open, paths) #open defaults to mode='r'
    output_file = open(output_path, 'w')
    lines_written = 0
    previous_comparable = ''
    for line in heapq26.merge(*[decorated_file(f, keyfunc) for f in files]):
        comparable = line[0]
        if previous_comparable != comparable:
            output_file.write(line[1])
            lines_written += 1
        previous_comparable = comparable
    return lines_written

Rough Test

Using the same input files (2.2 GB of data):

  • SortedFileMerger class took 51
    minutes (3068.4 seconds)
  • Brian's solution took 40 minutes (2408.5 seconds)
  • After adding John Machin's suggestions,
    the solution code took 36 minutes
    (2214.0 seconds)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

疑心病 2024-07-30 17:11:45

请注意,在 python2.6 中,heapq 有一个新的 merge 函数,它将为你做这件事。

要处理自定义键函数,您只需用一些装饰它的东西包装文件迭代器,以便它根据键进行比较,然后将其删除:

def decorated_file(f, key):
    for line in f: 
        yield (key(line), line)

filenames = ['file1.txt','file2.txt','file3.txt']
files = map(open, filenames)
outfile = open('merged.txt')

for line in heapq.merge(*[decorated_file(f, keyfunc) for f in files]):
    outfile.write(line[1])

[Edit]即使在早期版本的Python中,简单地从后面的 heapq 模块中获取 merge 的实现可能是值得的。 它是纯Python,并且在Python2.5中未经修改地运行,并且由于它使用堆来获取下一个最小值,因此在合并大量文件时应该非常高效。

您应该能够简单地从 python2.6 安装中复制 heapq.py,将其作为“heapq26.py”复制到源中并使用“from heapq26 import merge” - 没有 2.6 特定的其中使用的功能。 或者,您可以只复制合并函数(重写 heappop 等调用以引用 python2.5 heapq 模块)。

Note that in python2.6, heapq has a new merge function which will do this for you.

To handle the custom key function, you can just wrap the file iterator with something that decorates it so that it compares based on the key, and strip it out afterwards:

def decorated_file(f, key):
    for line in f: 
        yield (key(line), line)

filenames = ['file1.txt','file2.txt','file3.txt']
files = map(open, filenames)
outfile = open('merged.txt')

for line in heapq.merge(*[decorated_file(f, keyfunc) for f in files]):
    outfile.write(line[1])

[Edit] Even in earlier versions of python, it's probably worthwhile simply to take the implementation of merge from the later heapq module. It's pure python, and runs unmodified in python2.5, and since it uses a heap to get the next minimum should be very efficient when merging large numbers of files.

You should be able to simply copy the heapq.py from a python2.6 installation, copy it to your source as "heapq26.py" and use "from heapq26 import merge" - there are no 2.6 specific features used in it. Alternatively, you could just copy the merge function (rewriting the heappop etc calls to reference the python2.5 heapq module).

辞慾 2024-07-30 17:11:45

<< 这个“答案”是对原始提问者的结果代码的评论>>

建议:使用 eval() 是嗯,你正在做的事情限制了调用者使用 lambda —— 键提取可能需要多个一行,并且在任何情况下,你不需要为初步排序步骤使用相同的函数吗?

因此,将以下内容替换

def mergeSortedFiles(paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
    keyfunc = eval("lambda line: %s" % key)

为:

def my_keyfunc(line):
    return line.split('\t', 2)[1].replace(' ', '').lower()
    # minor tweaks may speed it up a little

def mergeSortedFiles(paths, output_path, keyfunc, dedup=True):    

<< This "answer" is a comment on the original questioner's resultant code >>

Suggestion: using eval() is ummmm and what you are doing restricts the caller to using lambda -- key extraction may require more than a one-liner, and in any case don't you need the same function for the preliminary sort step?

So replace this:

def mergeSortedFiles(paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
    keyfunc = eval("lambda line: %s" % key)

with this:

def my_keyfunc(line):
    return line.split('\t', 2)[1].replace(' ', '').lower()
    # minor tweaks may speed it up a little

def mergeSortedFiles(paths, output_path, keyfunc, dedup=True):    
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文