高效的文件缓冲& python中大文件的扫描方法

发布于 2024-10-14 04:16:08 字数 1600 浏览 4 评论 0原文

我遇到的问题的描述有点复杂,我会错误地提供更完整的信息。对于不耐烦的人来说,这是我可以总结的最简短的方式:

什么是最快的(最少执行 time) 方式将文本文件拆分为 大小为 N 的所有(重叠)子串(绑定 N,例如 36) 同时抛出换行符。

我正在编写一个模块,用于解析基于 FASTA ascii 的基因组格式的文件。这些文件包含所谓的“hg18”人类参考基因组,您可以从 下载该基因组如果您愿意,UCSC 基因组浏览器(去鼻涕虫!)。

您将注意到,基因组文件由 chr[1..22].fa 和 chr[XY].fa 以及本模块中未使用的一组其他小文件组成。

已经存在几个用于解析 FASTA 文件的模块,例如 BioPython 的 SeqIO。 (抱歉,我会发布一个链接,但我还没有这样做的要点。)不幸的是,我找到的每个模块都没有执行我想要执行的特定操作。

我的模块需要将基因组数据(例如,“CAGTACGTCAGACTATACGGAGCTA”可以是一条线)拆分为每个重叠的 N 长度子字符串。让我举一个例子,使用一个非常小的文件(实际的染色体文件长度在 355 到 2000 万个字符之间)和 N=8

>>>import cStringIO
>>>example_file = cStringIO.StringIO("""\
>header
CAGTcag
TFgcACF
""")
>>>for read in parse(example_file):
...    print read
...
CAGTCAGTF
AGTCAGTFG
GTCAGTFGC
TCAGTFGCA
CAGTFGCAC
AGTFGCACF

我发现从我能想到的方法中具有绝对最佳性能的函数是这样的


def parse(file):
  size = 8 # of course in my code this is a function argument
  file.readline() # skip past the header
  buffer = ''
  for line in file:
    buffer += line.rstrip().upper()
    while len(buffer) >= size:
      yield buffer[:size]
      buffer = buffer[1:]

: ,但不幸的是,以这种方式解析人类基因组仍然需要大约 1.5 小时(见下面的注释)。也许这是我用这种方法看到的最好的方法(完整的代码重构可能是合适的,但我想避免它,因为这种方法在代码的其他区域有一些非常具体的优点),但我我想我会把这个交给社区。

谢谢!

  • 请注意,这一次包括很多额外的计算,例如计算相反的链读取以及对大约 5G 大小的哈希进行哈希表查找。

回答后结论:事实证明,与其余部分相比,使用 fileobj.read() 然后操作生成的字符串(string.replace() 等)花费的时间和内存相对较少。程序,所以我使用了这种方法。谢谢大家!

The description of the problem I am having is a bit complicated, and I will err on the side of providing more complete information. For the impatient, here is the briefest way I can summarize it:

What is the fastest (least execution
time) way to split a text file in to
ALL (overlapping) substrings of size N (bound N, eg 36)
while throwing out newline characters.

I am writing a module which parses files in the FASTA ascii-based genome format. These files comprise what is known as the 'hg18' human reference genome, which you can download from the UCSC genome browser (go slugs!) if you like.

As you will notice, the genome files are composed of chr[1..22].fa and chr[XY].fa, as well as a set of other small files which are not used in this module.

Several modules already exist for parsing FASTA files, such as BioPython's SeqIO. (Sorry, I'd post a link, but I don't have the points to do so yet.) Unfortunately, every module I've been able to find doesn't do the specific operation I am trying to do.

My module needs to split the genome data ('CAGTACGTCAGACTATACGGAGCTA' could be a line, for instance) in to every single overlapping N-length substring. Let me give an example using a very small file (the actual chromosome files are between 355 and 20 million characters long) and N=8

>>>import cStringIO
>>>example_file = cStringIO.StringIO("""\
>header
CAGTcag
TFgcACF
""")
>>>for read in parse(example_file):
...    print read
...
CAGTCAGTF
AGTCAGTFG
GTCAGTFGC
TCAGTFGCA
CAGTFGCAC
AGTFGCACF

The function that I found had the absolute best performance from the methods I could think of is this:


def parse(file):
  size = 8 # of course in my code this is a function argument
  file.readline() # skip past the header
  buffer = ''
  for line in file:
    buffer += line.rstrip().upper()
    while len(buffer) >= size:
      yield buffer[:size]
      buffer = buffer[1:]

This works, but unfortunately it still takes about 1.5 hours (see note below) to parse the human genome this way. Perhaps this is the very best I am going to see with this method (a complete code refactor might be in order, but I'd like to avoid it as this approach has some very specific advantages in other areas of the code), but I thought I would turn this over to the community.

Thanks!

  • Note, this time includes a lot of extra calculation, such as computing the opposing strand read and doing hashtable lookups on a hash of approximately 5G in size.

Post-answer conclusion: It turns out that using fileobj.read() and then manipulating the resulting string (string.replace(), etc.) took relatively little time and memory compared to the remainder of the program, and so I used that approach. Thanks everyone!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

韬韬不绝 2024-10-21 04:16:09

您可以映射该文件并开始使用滑动窗口来浏览它吗?我写了一个愚蠢的小程序,运行得非常小:

用户 PID %CPU %MEM VSZ RSS TTY STAT 启动时间命令
萨诺德 20919 0.0 0.0 33036 4960 点/2 R+ 22:23 0:00 /usr/bin/python ./sliding_window.py

处理 636229 字节的 fasta 文件(通过 http://biostar.stackexchange.com/questions/1759) 花费了 0.383 秒。

#!/usr/bin/python

import mmap
import os

  def parse(string, size):
    stride = 8
    start = string.find("\n")
    while start < size - stride:
        print string[start:start+stride]
        start += 1

fasta = open("small.fasta", 'r')
fasta_size = os.stat("small.fasta").st_size
fasta_map = mmap.mmap(fasta.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
parse(fasta_map, fasta_size)

Could you mmap the file and start pecking through it with a sliding window? I wrote a stupid little program that runs pretty small:

USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
sarnold  20919  0.0  0.0  33036  4960 pts/2    R+   22:23   0:00 /usr/bin/python ./sliding_window.py

Working through a 636229 byte fasta file (found via http://biostar.stackexchange.com/questions/1759) took .383 seconds.

#!/usr/bin/python

import mmap
import os

  def parse(string, size):
    stride = 8
    start = string.find("\n")
    while start < size - stride:
        print string[start:start+stride]
        start += 1

fasta = open("small.fasta", 'r')
fasta_size = os.stat("small.fasta").st_size
fasta_map = mmap.mmap(fasta.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
parse(fasta_map, fasta_size)
阿楠 2024-10-21 04:16:09

一些经典的 IO 绑定更改。

  • 使用 os.read 等较低级别的读取操作并读入大型固定缓冲区。
  • 使用线程/多处理,其中一个进行读取和缓冲,另一个进行处理。
  • 如果您有多个处理器/机器,请使用 multiprocessing/mq 在 CPU 之间分配处理(ala map-reduce)。

使用较低级别的读取操作不会造成太大的重写。其他的将是相当大的重写。

Some classic IO bound changes.

  • Use a lower level read operation like os.read and read in to a large fixed buffer.
  • Use threading/multiprocessing where one reads and buffers and the other processes.
  • If you have multiple processors/machines use multiprocessing/mq to divy up processing across CPUs ala map-reduce.

Using a lower level read operation wouldn't be that much of a rewrite. The others would be pretty large rewrites.

凉薄对峙 2024-10-21 04:16:09

我怀疑问题在于您以字符串格式存储了太多数据,这对您的用例来说确实是浪费,以至于您耗尽了实际内存并破坏了交换。 128 GB 应该足以避免这种情况...:)

由于您在评论中指出您无论如何都需要存储附加信息,因此我选择引用父字符串的单独类。我使用 hg18 的 chromFa.zip 中的 chr21.fa 进行了一个简短的测试;该文件大约 48MB,行数不到 1M。我这里只有 1GB 内存,所以之后我就简单地丢弃这些对象。因此,这个测试不会显示碎片、缓存或相关问题,但我认为它应该是测量解析吞吐量的一个很好的起点:

import mmap
import os
import time
import sys

class Subseq(object):
  __slots__ = ("parent", "offset", "length")

  def __init__(self, parent, offset, length):
    self.parent = parent
    self.offset = offset
    self.length = length

  # these are discussed in comments:
  def __str__(self):
    return self.parent[self.offset:self.offset + self.length]

  def __hash__(self):
    return hash(str(self))

  def __getitem__(self, index):
    # doesn't currently handle slicing
    assert 0 <= index < self.length
    return self.parent[self.offset + index]

  # other methods

def parse(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield Subseq(whole, offset, size)

class Seq(object):
  __slots__ = ("value", "offset")
  def __init__(self, value, offset):
    self.value = value
    self.offset = offset

def parse_sep_str(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield Seq(whole[offset:offset + size], offset)

def parse_plain_str(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield whole[offset:offset+size]

def parse_tuple(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield (whole, offset, size)

def parse_orig(file, size=8):
  file.readline() # skip header
  buffer = ''
  for line in file:
    buffer += line.rstrip().upper()
    while len(buffer) >= size:
      yield buffer[:size]
      buffer = buffer[1:]

def parse_os_read(file, size=8):
  file.readline()  # skip header
  file_size = os.fstat(file.fileno()).st_size
  whole = os.read(file.fileno(), file_size).replace("\n", "").upper()
  for offset in xrange(0, len(whole) - size + 1):
    yield whole[offset:offset+size]

def parse_mmap(file, size=8):
  file.readline()  # skip past the header
  buffer = ""
  for line in file:
    buffer += line
    if len(buffer) >= size:
      for start in xrange(0, len(buffer) - size + 1):
        yield buffer[start:start + size].upper()
      buffer = buffer[-(len(buffer) - size + 1):]
  for start in xrange(0, len(buffer) - size + 1):
    yield buffer[start:start + size]

def length(x):
  return sum(1 for _ in x)

def duration(secs):
  return "%dm %ds" % divmod(secs, 60)


def main(argv):
  tests = [parse, parse_sep_str, parse_tuple, parse_plain_str, parse_orig, parse_os_read]
  n = 0
  for fn in tests:
    n += 1
    with open(argv[1]) as f:
      start = time.time()
      length(fn(f))
      end = time.time()
      print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))

  fn = parse_mmap
  n += 1
  with open(argv[1]) as f:
    f = mmap.mmap(f.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
    start = time.time()
    length(fn(f))
    end = time.time()
  print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))


if __name__ == "__main__":
  sys.exit(main(sys.argv))

1  parse                 1m 42s
2  parse_sep_str         1m 42s
3  parse_tuple           0m 29s
4  parse_plain_str       0m 36s
5  parse_orig            0m 45s
6  parse_os_read         0m 34s
7  parse_mmap            0m 37s

前四个是我的代码,而 orig 是你的代码,最后两个来自这里的其他答案。

用户定义的对象的创建和收集成本比元组或纯字符串要高得多!这不应该那么令人惊讶,但我没有意识到它会产生如此大的差异(比较#1和#3,它们实际上仅在用户定义的类与元组中有所不同)。您说过您想要存储附加信息,例如偏移量,无论如何(如在 parse 和 parse_sep_str 情况下),因此您可能会考虑在 C 扩展模块中实现该类型。如果不想直接写C的话可以看看Cython及相关的。

情况 #1 和情况 #2 是相同的:通过指向父字符串,我试图节省内存而不是处理时间,但这个测试并没有测量这一点。

I suspect the problem is that you have so much data stored in string format, which is really wasteful for your use case, that you're running out of real memory and thrashing swap. 128 GB should be enough to avoid this... :)

Since you've indicated in comments that you need to store additional information anyway, a separate class which references a parent string would be my choice. I ran a short test using chr21.fa from chromFa.zip from hg18; the file is about 48MB and just under 1M lines. I only have 1GB of memory here, so I simply discard the objects afterwards. This test thus won't show problems with fragmentation, cache, or related, but I think it should be a good starting point for measuring parsing throughput:

import mmap
import os
import time
import sys

class Subseq(object):
  __slots__ = ("parent", "offset", "length")

  def __init__(self, parent, offset, length):
    self.parent = parent
    self.offset = offset
    self.length = length

  # these are discussed in comments:
  def __str__(self):
    return self.parent[self.offset:self.offset + self.length]

  def __hash__(self):
    return hash(str(self))

  def __getitem__(self, index):
    # doesn't currently handle slicing
    assert 0 <= index < self.length
    return self.parent[self.offset + index]

  # other methods

def parse(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield Subseq(whole, offset, size)

class Seq(object):
  __slots__ = ("value", "offset")
  def __init__(self, value, offset):
    self.value = value
    self.offset = offset

def parse_sep_str(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield Seq(whole[offset:offset + size], offset)

def parse_plain_str(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield whole[offset:offset+size]

def parse_tuple(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield (whole, offset, size)

def parse_orig(file, size=8):
  file.readline() # skip header
  buffer = ''
  for line in file:
    buffer += line.rstrip().upper()
    while len(buffer) >= size:
      yield buffer[:size]
      buffer = buffer[1:]

def parse_os_read(file, size=8):
  file.readline()  # skip header
  file_size = os.fstat(file.fileno()).st_size
  whole = os.read(file.fileno(), file_size).replace("\n", "").upper()
  for offset in xrange(0, len(whole) - size + 1):
    yield whole[offset:offset+size]

def parse_mmap(file, size=8):
  file.readline()  # skip past the header
  buffer = ""
  for line in file:
    buffer += line
    if len(buffer) >= size:
      for start in xrange(0, len(buffer) - size + 1):
        yield buffer[start:start + size].upper()
      buffer = buffer[-(len(buffer) - size + 1):]
  for start in xrange(0, len(buffer) - size + 1):
    yield buffer[start:start + size]

def length(x):
  return sum(1 for _ in x)

def duration(secs):
  return "%dm %ds" % divmod(secs, 60)


def main(argv):
  tests = [parse, parse_sep_str, parse_tuple, parse_plain_str, parse_orig, parse_os_read]
  n = 0
  for fn in tests:
    n += 1
    with open(argv[1]) as f:
      start = time.time()
      length(fn(f))
      end = time.time()
      print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))

  fn = parse_mmap
  n += 1
  with open(argv[1]) as f:
    f = mmap.mmap(f.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
    start = time.time()
    length(fn(f))
    end = time.time()
  print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))


if __name__ == "__main__":
  sys.exit(main(sys.argv))

1  parse                 1m 42s
2  parse_sep_str         1m 42s
3  parse_tuple           0m 29s
4  parse_plain_str       0m 36s
5  parse_orig            0m 45s
6  parse_os_read         0m 34s
7  parse_mmap            0m 37s

The first four are my code, while orig is yours and the last two are from other answers here.

User-defined objects are much more costly to create and collect than tuples or plain strings! This shouldn't be that surprising, but I had not realized it would make this much of a difference (compare #1 and #3, which really only differ in a user-defined class vs tuple). You said you want to store additional information, like offset, with the string anyway (as in the parse and parse_sep_str cases), so you might consider implementing that type in a C extension module. Look at Cython and related if you don't want to write C directly.

Case #1 and #2 being identical is expected: by pointing to a parent string, I was trying to save memory rather than processing time, but this test doesn't measure that.

蛮可爱 2024-10-21 04:16:09

我有一个处理文本文件的函数,并在读写和并行计算中使用缓冲区与进程工作集的异步池。我有一个 2 核、8GB RAM、gnu/linux 的 AMD,可以在不到 1 秒的时间内处理 300000 行,在大约 4 秒内处理 1000000 行,在大约 20 秒内处理大约 4500000 行(超过 220MB):

# -*- coding: utf-8 -*-
import sys
from multiprocessing import Pool

def process_file(f, fo="result.txt", fi=sys.argv[1]):
    fi = open(fi, "r", 4096)
    fo = open(fo, "w", 4096)
    b = []
    x = 0
    result = None
    pool = None
    for line in fi:
        b.append(line)
        x += 1
        if (x % 200000) == 0:
            if pool == None:
                pool = Pool(processes=20)
            if result == None:
                result = pool.map_async(f, b)
            else:
                presult = result.get()
                result = pool.map_async(f, b)
                for l in presult:
                    fo.write(l)
            b = []
    if not result == None:
        for l in result.get():
            fo.write(l)
    if not b == []:
        for l in b:
            fo.write(f(l))
    fo.close()
    fi.close()

第一个参数是函数接收一行、处理并返回结果将写入文件,下一个是输出文件,最后一个是输入文件(如果您在输入脚本文件中接收作为第一个参数,则不能使用最后一个参数)。

I have a function for process a text file and use buffer in read and write and parallel computing with async pool of workets of process. I have a AMD of 2 cores, 8GB RAM, with gnu/linux and can process 300000 lines in less of 1 second, 1000000 lines in aproximately 4 seconds and aproximately 4500000 lines (more of 220MB) in aproximately 20 seconds:

# -*- coding: utf-8 -*-
import sys
from multiprocessing import Pool

def process_file(f, fo="result.txt", fi=sys.argv[1]):
    fi = open(fi, "r", 4096)
    fo = open(fo, "w", 4096)
    b = []
    x = 0
    result = None
    pool = None
    for line in fi:
        b.append(line)
        x += 1
        if (x % 200000) == 0:
            if pool == None:
                pool = Pool(processes=20)
            if result == None:
                result = pool.map_async(f, b)
            else:
                presult = result.get()
                result = pool.map_async(f, b)
                for l in presult:
                    fo.write(l)
            b = []
    if not result == None:
        for l in result.get():
            fo.write(l)
    if not b == []:
        for l in b:
            fo.write(f(l))
    fo.close()
    fi.close()

First argument is function that rceive one line, process and return result for will write in file, next is file of output and last is file of input (you can not use last argument if you receive as first parameter in your script file of input).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文