获取文本文件的第一行和最后一行的最有效方法是什么?

发布于 2024-09-11 19:49:23 字数 166 浏览 7 评论 0 原文

我有一个文本文件,其中每行都包含时间戳。我的目标是找到时间范围。所有时间均按顺序排列,因此第一行是最早的时间,最后一行是最晚的时间。我只需要第一行和最后一行。在 python 中获取这些行的最有效方法是什么?

注意:这些文件的长度相对较大,每个文件大约有 1-2 百万行,我必须对数百个文件执行此操作。

I have a text file which contains a time stamp on each line. My goal is to find the time range. All the times are in order so the first line will be the earliest time and the last line will be the latest time. I only need the very first and very last line. What would be the most efficient way to get these lines in python?

Note: These files are relatively large in length, about 1-2 million lines each and I have to do this for several hundred files.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(13

旧瑾黎汐 2024-09-18 19:49:23
from os import SEEK_END, SEEK_CUR

def readlast(f):
    try:
        f.seek(-2, SEEK_END)       # Jump to the second last byte.
        while f.read(1) != b"\n":  #  Until newline is found ...
            f.seek(-2, SEEK_CUR)   #  ... jump back, over the read byte plus one.
    except OSError:                # Reached begginning of File
        f.seek(0)                  #  Set cursor to beginning of file as well.
    return f.read()                # Read all data from this point on.
        
with open(path, "rb") as f:
    first = f.readline()
    last  = readlast(f)

使用 seek 格式为 fseek(offset,whence=0)

引用自docs.python.org

将流位置更改为给定的字节偏移量。 offset 是相对于 wherece 指示的位置来解释的。来源的默认值为 SEEK_SET。其值是:

  • SEEK_SET0 = 流的开始(默认);偏移量应为零或正数
  • SEEK_CUR1 = 当前流位置;偏移量可能为负
  • SEEK_END2 = 流结束;偏移量通常为负

奔腾搜索(2.7+)

from collections import deque
from os import SEEK_CUR, SEEK_END

def readlast(f, d = b'\n'):
    """"readlast(f: io.IOBase, d: bytes = b'\n') -> bytes

    Return the last segment of file `f`, containing data segments separated by
    `d`.
    """
    arr = deque(); step = 1; pos = -1
    try:
        # Seek to last byte of file, save it to arr as to not check for newline.
        pos = f.seek(-1, SEEK_END) 
        arr.appendleft(f.read())
        # Seek past the byte read, plus one to use as the first segment.
        pos = f.seek(-2, SEEK_END) 
        seg = f.read(1)
        # Break when 'd' occurs, store index of the rightmost match in 'i'.
        while seg.rfind(d) == -1:
            # Store segments with no b'\n' in a memory-efficient 'deque'.
            arr.appendleft(seg)
            # Step back in file, past the bytes just read plus twice that.
            pos = f.seek(-step*3, SEEK_CUR)
            # Read new segment, twice as big as the one read previous iteration.
            step *= 2
            seg = f.read(step)
        # Ignore the characters up to 'i', and the triggering newline character.
        arr.appendleft(seg[seg.rfind(d)+1:])
    except OSError: 
        # Reached beginning of file. Read remaining data and check for newline.
        f.seek(0)
        seg = f.read(pos)
        arr.appendleft(seg[seg.rfind(d)+1:])
    return b"".join(arr)

我可能会选择一个利用今天指数增长步长的函数,因此在这里添加了这样一个示例,并将其与原始答案放在一起(暂时)。

除了多字节分隔符和以文本模式打开的文件之外,它可以很好地处理边缘情况(有关处理这些情况的示例,请参阅“边缘情况”)。

用法:

f.write(b'X\nY\nZ\n'); f.seek(0)
assert readlast(f) == b'Z\n'
f.write(b'\n\n'; f.seek(0)
assert readlast(f) == b'\n'

边缘情况(2.7+)

我没有编辑原始答案,因为这个问题专门要求效率,以及尊重以前的赞成票。

该版本解决了多年来提出的所有评论和问题,同时保留了逻辑和向后兼容性(以可读性为代价)。

在撰写本文时提出和解决的问题是:

还支持多字节分隔符。

from os import SEEK_CUR, SEEK_END

def _readlast__bytes(f, sep, size, step):
    # Point cursor 'size' + 'step' bytes away from the end of the file.
    o = f.seek(0 - size - step, SEEK_END)
    # Step 'step' bytes each iteration, halt when 'sep' occurs.
    while f.read(size) != sep:
        f.seek(0 - size - step, SEEK_CUR)

def _readlast__text(f, sep, size, step):
    # Text mode, same principle but without the use of relative offsets.
    o = f.seek(0, SEEK_END)
    o = f.seek(o - size - step)
    while f.read(size) != sep:
        o = f.seek(o - step)

def readlast(f, sep, fixed = False):
    """readlast(f: io.BaseIO, sep: bytes|str, fixed: bool = False) -> bytes|str

    Return the last segment of file `f`, containing data segments separated by
    `sep`.

    Set `fixed` to True when parsing UTF-32 or UTF-16 encoded data (don't forget
    to pass the correct delimiter) in files opened in byte mode.
    """
    size = len(sep)
    step = len(sep) if (fixed is True) else (fixed or 1)
    step = size if fixed else 1
    if not size:
        raise ValueError("Zero-length separator.")
    try:
        if 'b' in f.mode:
            # Process file opened in byte mode.
            _readlast__bytes(f, sep, size, step)
        else:
            # Process file opened in text mode.
            _readlast__text(f, sep, size, step)
    except (OSError, ValueError): 
        # Beginning of file reached.
        f.seek(0, SEEK_SET)
    return f.read()

用法:

f.write("X\nY\nZ\n".encode('utf32'); f.seek(0)
assert readlast(f, "\n".encode('utf32')[4:]) == "Z\n"
f.write(b'X<br>Y</br>'; f.seek(0)
assert readlast(f, b'<br>', fixed=False) == "Y</br>"

用于与此答案进行比较的效率

代码(最受支持的答案的优化版本[在发布时]):

with open(file, "rb") as f:
    first = f.readline()     # Read and store the first line.
    for last in f: pass      # Read all lines, keep final value.

结果:

10k iterations processing a file of 6k lines totalling 200kB: 1.62s vs  6.92s
100 iterations processing a file of 6k lines totalling 1.3GB: 8.93s vs 86.95s

“每个 1-2 百万行”,正如问题所述,当然会增加差异< em>还有很多。

from os import SEEK_END, SEEK_CUR

def readlast(f):
    try:
        f.seek(-2, SEEK_END)       # Jump to the second last byte.
        while f.read(1) != b"\n":  #  Until newline is found ...
            f.seek(-2, SEEK_CUR)   #  ... jump back, over the read byte plus one.
    except OSError:                # Reached begginning of File
        f.seek(0)                  #  Set cursor to beginning of file as well.
    return f.read()                # Read all data from this point on.
        
with open(path, "rb") as f:
    first = f.readline()
    last  = readlast(f)

When using seek the format is fseek(offset, whence=0)

Quote from docs.python.org:

Change the stream position to the given byte offset. offset is interpreted relative to the position indicated by whence. The default value for whence is SEEK_SET. Values for whence are:

  • SEEK_SET or 0 = start of the stream (the default); offset should be zero or positive
  • SEEK_CUR or 1 = current stream position; offset may be negative
  • SEEK_END or 2 = end of the stream; offset is usually negative

Galloping search (2.7+)

from collections import deque
from os import SEEK_CUR, SEEK_END

def readlast(f, d = b'\n'):
    """"readlast(f: io.IOBase, d: bytes = b'\n') -> bytes

    Return the last segment of file `f`, containing data segments separated by
    `d`.
    """
    arr = deque(); step = 1; pos = -1
    try:
        # Seek to last byte of file, save it to arr as to not check for newline.
        pos = f.seek(-1, SEEK_END) 
        arr.appendleft(f.read())
        # Seek past the byte read, plus one to use as the first segment.
        pos = f.seek(-2, SEEK_END) 
        seg = f.read(1)
        # Break when 'd' occurs, store index of the rightmost match in 'i'.
        while seg.rfind(d) == -1:
            # Store segments with no b'\n' in a memory-efficient 'deque'.
            arr.appendleft(seg)
            # Step back in file, past the bytes just read plus twice that.
            pos = f.seek(-step*3, SEEK_CUR)
            # Read new segment, twice as big as the one read previous iteration.
            step *= 2
            seg = f.read(step)
        # Ignore the characters up to 'i', and the triggering newline character.
        arr.appendleft(seg[seg.rfind(d)+1:])
    except OSError: 
        # Reached beginning of file. Read remaining data and check for newline.
        f.seek(0)
        seg = f.read(pos)
        arr.appendleft(seg[seg.rfind(d)+1:])
    return b"".join(arr)

I'd probably go for a function that make use of an exponentially growing step size today and thus added such an example here, and will keep it alongside the the original answer (for now).

It handles edge cases well, apart from multibyte delimiters and files opened in text mode (see "Edge cases" for an example that handle those).

Usage:

f.write(b'X\nY\nZ\n'); f.seek(0)
assert readlast(f) == b'Z\n'
f.write(b'\n\n'; f.seek(0)
assert readlast(f) == b'\n'

Edge cases (2.7+)

I've refrained from editing the original answer as the question is specifically asks for efficiency, as well as to respect previous upvotes.

This version address all comments and issues raised over the years while preserving the logic and backward compatibility (at the cost of readability).

The issues raised and addressed at the point of writing is:

  • Return empty string when parsing empty file, noted in comment by Loïc.
  • Return all content when no delimiter is found, raised in comment by LazyLeopard.
  • Avoid relative offsets to support text mode, raised in comment by AnotherParker.
  • UTF16/UTF32 hack, noted in comment by Pietro Battiston.

Also supports multibyte delimiters.

from os import SEEK_CUR, SEEK_END

def _readlast__bytes(f, sep, size, step):
    # Point cursor 'size' + 'step' bytes away from the end of the file.
    o = f.seek(0 - size - step, SEEK_END)
    # Step 'step' bytes each iteration, halt when 'sep' occurs.
    while f.read(size) != sep:
        f.seek(0 - size - step, SEEK_CUR)

def _readlast__text(f, sep, size, step):
    # Text mode, same principle but without the use of relative offsets.
    o = f.seek(0, SEEK_END)
    o = f.seek(o - size - step)
    while f.read(size) != sep:
        o = f.seek(o - step)

def readlast(f, sep, fixed = False):
    """readlast(f: io.BaseIO, sep: bytes|str, fixed: bool = False) -> bytes|str

    Return the last segment of file `f`, containing data segments separated by
    `sep`.

    Set `fixed` to True when parsing UTF-32 or UTF-16 encoded data (don't forget
    to pass the correct delimiter) in files opened in byte mode.
    """
    size = len(sep)
    step = len(sep) if (fixed is True) else (fixed or 1)
    step = size if fixed else 1
    if not size:
        raise ValueError("Zero-length separator.")
    try:
        if 'b' in f.mode:
            # Process file opened in byte mode.
            _readlast__bytes(f, sep, size, step)
        else:
            # Process file opened in text mode.
            _readlast__text(f, sep, size, step)
    except (OSError, ValueError): 
        # Beginning of file reached.
        f.seek(0, SEEK_SET)
    return f.read()

Usage:

f.write("X\nY\nZ\n".encode('utf32'); f.seek(0)
assert readlast(f, "\n".encode('utf32')[4:]) == "Z\n"
f.write(b'X<br>Y</br>'; f.seek(0)
assert readlast(f, b'<br>', fixed=False) == "Y</br>"

Efficiency

Code used to compare against this answer (optimised version of the most upvoted answer [at the point of posting]):

with open(file, "rb") as f:
    first = f.readline()     # Read and store the first line.
    for last in f: pass      # Read all lines, keep final value.

Results:

10k iterations processing a file of 6k lines totalling 200kB: 1.62s vs  6.92s
100 iterations processing a file of 6k lines totalling 1.3GB: 8.93s vs 86.95s

"1-2 millions lines each", as the question stated, would of course increase the difference a lot more.

GRAY°灰色天空 2024-09-18 19:49:23

io 模块的文档

with open(fname, 'rb') as fh:
    first = next(fh).decode()

    fh.seek(-1024, 2)
    last = fh.readlines()[-1].decode()

这里的变量值为 1024:它代表平均字符串长度。我仅选择 1024 作为示例。如果您估计了平均行长度,则可以使用该值乘以 2。

由于您不知道行长度的可能上限,因此显而易见的解决方案是循环遍历文件:

for line in fh:
    pass
last = line

您不需要如果你想处理二进制标志,你可以使用open(fname)

预计到达时间:由于您有许多文件需要处理,因此您可以使用 random.sample 创建数十个文件的示例,并对它们运行此代码以确定文件的长度最后一行。位置偏移的先验值较大(假设为 1 MB)。这将帮助您估计完整运行的价值。

docs for io module

with open(fname, 'rb') as fh:
    first = next(fh).decode()

    fh.seek(-1024, 2)
    last = fh.readlines()[-1].decode()

The variable value here is 1024: it represents the average string length. I choose 1024 only for example. If you have an estimate of average line length you could just use that value times 2.

Since you have no idea whatsoever about the possible upper bound for the line length, the obvious solution would be to loop over the file:

for line in fh:
    pass
last = line

You don't need to bother with the binary flag you could just use open(fname).

ETA: Since you have many files to work on, you could create a sample of couple of dozens of files using random.sample and run this code on them to determine length of last line. With an a priori large value of the position shift (let say 1 MB). This will help you to estimate the value for the full run.

猥琐帝 2024-09-18 19:49:23

这是 SilentGhost 答案的修改版本,可以满足您的需求。

with open(fname, 'rb') as fh:
    first = next(fh)
    offs = -100
    while True:
        fh.seek(offs, 2)
        lines = fh.readlines()
        if len(lines)>1:
            last = lines[-1]
            break
        offs *= 2
    print first
    print last

这里不需要线路长度的上限。

Here's a modified version of SilentGhost's answer that will do what you want.

with open(fname, 'rb') as fh:
    first = next(fh)
    offs = -100
    while True:
        fh.seek(offs, 2)
        lines = fh.readlines()
        if len(lines)>1:
            last = lines[-1]
            break
        offs *= 2
    print first
    print last

No need for an upper bound for line length here.

╰つ倒转 2024-09-18 19:49:23

你会使用unix命令吗?我认为使用 head -1tail -n 1 可能是最有效的方法。或者,您可以使用简单的 fid.readline() 来获取第一行和 fid.readlines()[-1],但这可能会占用太多内存。

Can you use unix commands? I think using head -1 and tail -n 1 are probably the most efficient methods. Alternatively, you could use a simple fid.readline() to get the first line and fid.readlines()[-1], but that may take too much memory.

叹沉浮 2024-09-18 19:49:23

这是我的解决方案,也兼容Python3。它也管理边界情况,但它缺少 utf-16 支持:

def tail(filepath):
    """
    @author Marco Sulla ([email protected])
    @date May 31, 2016
    """

    try:
        filepath.is_file
        fp = str(filepath)
    except AttributeError:
        fp = filepath

    with open(fp, "rb") as f:
        size = os.stat(fp).st_size
        start_pos = 0 if size - 1 < 0 else size - 1

        if start_pos != 0:
            f.seek(start_pos)
            char = f.read(1)

            if char == b"\n":
                start_pos -= 1
                f.seek(start_pos)

            if start_pos == 0:
                f.seek(start_pos)
            else:
                char = ""

                for pos in range(start_pos, -1, -1):
                    f.seek(pos)

                    char = f.read(1)

                    if char == b"\n":
                        break

        return f.readline()

它的灵感来自于 Trasp 的回答另一个帕克的评论< /a>.

This is my solution, compatible also with Python3. It does also manage border cases, but it misses utf-16 support:

def tail(filepath):
    """
    @author Marco Sulla ([email protected])
    @date May 31, 2016
    """

    try:
        filepath.is_file
        fp = str(filepath)
    except AttributeError:
        fp = filepath

    with open(fp, "rb") as f:
        size = os.stat(fp).st_size
        start_pos = 0 if size - 1 < 0 else size - 1

        if start_pos != 0:
            f.seek(start_pos)
            char = f.read(1)

            if char == b"\n":
                start_pos -= 1
                f.seek(start_pos)

            if start_pos == 0:
                f.seek(start_pos)
            else:
                char = ""

                for pos in range(start_pos, -1, -1):
                    f.seek(pos)

                    char = f.read(1)

                    if char == b"\n":
                        break

        return f.readline()

It's ispired by Trasp's answer and AnotherParker's comment.

渔村楼浪 2024-09-18 19:49:23

首先以读取模式打开文件。然后使用 readlines() 方法逐行读取。所有行存储在列表中。现在您可以使用列表切片来获取文件的第一行和最后一行。

    a=open('file.txt','rb')
    lines = a.readlines()
    if lines:
        first_line = lines[:1]
        last_line = lines[-1]

First open the file in read mode.Then use readlines() method to read line by line.All the lines stored in a list.Now you can use list slices to get first and last lines of the file.

    a=open('file.txt','rb')
    lines = a.readlines()
    if lines:
        first_line = lines[:1]
        last_line = lines[-1]
黯淡〆 2024-09-18 19:49:23
w=open(file.txt, 'r')
print ('first line is : ',w.readline())
for line in w:  
    x= line
print ('last line is : ',x)
w.close()

for 循环遍历各行,x 获取最终迭代的最后一行。

w=open(file.txt, 'r')
print ('first line is : ',w.readline())
for line in w:  
    x= line
print ('last line is : ',x)
w.close()

The for loop runs through the lines and x gets the last line on the final iteration.

椒妓 2024-09-18 19:49:23
with open("myfile.txt") as f:
    lines = f.readlines()
    first_row = lines[0]
    print first_row
    last_row = lines[-1]
    print last_row
with open("myfile.txt") as f:
    lines = f.readlines()
    first_row = lines[0]
    print first_row
    last_row = lines[-1]
    print last_row
戏蝶舞 2024-09-18 19:49:23

这是 @Trasp 答案的扩展,它具有用于处理只有一行的文件的极端情况的附加逻辑。如果您反复想要读取不断更新的文件的最后一行,处理这种情况可能会很有用。如果没有这个,如果您尝试获取刚刚创建且只有一行的文件的最后一行,则会引发 IOError: [Errno 22] Invalid argument 。

def tail(filepath):
    with open(filepath, "rb") as f:
        first = f.readline()      # Read the first line.
        f.seek(-2, 2)             # Jump to the second last byte.
        while f.read(1) != b"\n": # Until EOL is found...
            try:
                f.seek(-2, 1)     # ...jump back the read byte plus one more.
            except IOError:
                f.seek(-1, 1)
                if f.tell() == 0:
                    break
        last = f.readline()       # Read last line.
    return last

Here is an extension of @Trasp's answer that has additional logic for handling the corner case of a file that has only one line. It may be useful to handle this case if you repeatedly want to read the last line of a file that is continuously being updated. Without this, if you try to grab the last line of a file that has just been created and has only one line, IOError: [Errno 22] Invalid argument will be raised.

def tail(filepath):
    with open(filepath, "rb") as f:
        first = f.readline()      # Read the first line.
        f.seek(-2, 2)             # Jump to the second last byte.
        while f.read(1) != b"\n": # Until EOL is found...
            try:
                f.seek(-2, 1)     # ...jump back the read byte plus one more.
            except IOError:
                f.seek(-1, 1)
                if f.tell() == 0:
                    break
        last = f.readline()       # Read last line.
    return last
旧梦荧光笔 2024-09-18 19:49:23

没有人提到使用反转:

f=open(file,"r")
r=reversed(f.readlines())
last_line_of_file = r.next()

Nobody mentioned using reversed:

f=open(file,"r")
r=reversed(f.readlines())
last_line_of_file = r.next()
长途伴 2024-09-18 19:49:23

获取第一行非常简单。对于最后一行,假设您知道行长度的近似上限, os.lseekSEEK_END 中找到倒数第二行结尾,然后readline() 最后一行。

Getting the first line is trivially easy. For the last line, presuming you know an approximate upper bound on the line length, os.lseek some amount from SEEK_END find the second to last line ending and then readline() the last line.

故事和酒 2024-09-18 19:49:23
with open(filename, "rb") as f:#Needs to be in binary mode for the seek from the end to work
    first = f.readline()
    if f.read(1) == '':
        return first
    f.seek(-2, 2)  # Jump to the second last byte.
    while f.read(1) != b"\n":  # Until EOL is found...
        f.seek(-2, 1)  # ...jump back the read byte plus one more.
    last = f.readline()  # Read last line.
    return last

上面的答案是上述答案的修改版本,它处理文件中只有一行的情况

with open(filename, "rb") as f:#Needs to be in binary mode for the seek from the end to work
    first = f.readline()
    if f.read(1) == '':
        return first
    f.seek(-2, 2)  # Jump to the second last byte.
    while f.read(1) != b"\n":  # Until EOL is found...
        f.seek(-2, 1)  # ...jump back the read byte plus one more.
    last = f.readline()  # Read last line.
    return last

The above answer is a modified version of the above answers which handles the case that there is only one line in the file

情绪少女 2024-09-18 19:49:23

如果您只是在寻找方便的小片段并且适合读取整个文件,请考虑deque

from collections import deque

with open("/path/to/file", "rb+") as f:
    first = f.readline()
    try:
        last = deque(f, 1)[0]
    except IndexError:
        last = ""
        

将文件对象 f 传递给 deque 将导致 io 库中的内置函数将流拆分为单独的行,而 deque 将最后一行保留在内存中。

If you're only looking for a convenient small snippet and it's suitable to read the whole file, consider deque.

from collections import deque

with open("/path/to/file", "rb+") as f:
    first = f.readline()
    try:
        last = deque(f, 1)[0]
    except IndexError:
        last = ""
        

Passing the file object f to deque will cause the built in functions in the io library split the stream into individual lines while deque keeps the last line in memory.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文