如何实现 tail -F 的 pythonic 等效项?

发布于 2024-08-10 09:18:16 字数 162 浏览 4 评论 0原文

观察不断增长的文件尾部是否出现某些关键字的Pythonic方法是什么?

在 shell 中我可能会说:

tail -f "$file" | grep "$string" | while read hit; do
    #stuff
done

What is the pythonic way of watching the tail end of a growing file for the occurrence of certain keywords?

In shell I might say:

tail -f "$file" | grep "$string" | while read hit; do
    #stuff
done

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(10

一花一树开 2024-08-17 09:18:16

嗯,最简单的方法是不断读取文件,检查新内容并测试命中率。

import time

def watch(fn, words):
    fp = open(fn, 'r')
    while True:
        new = fp.readline()
        # Once all lines are read this just returns ''
        # until the file changes and a new line appears

        if new:
            for word in words:
                if word in new:
                    yield (word, new)
        else:
            time.sleep(0.5)

fn = 'test.py'
words = ['word']
for hit_word, hit_sentence in watch(fn, words):
    print "Found %r in line: %r" % (hit_word, hit_sentence)

如果您知道数据将按行显示,则使用 readline 的解决方案可以发挥作用。

如果数据是某种流,您需要一个缓冲区,大于您要查找的最大单词,并首先填充它。这样事情就变得有点复杂了……

Well, the simplest way would be to constantly read from the file, check what's new and test for hits.

import time

def watch(fn, words):
    fp = open(fn, 'r')
    while True:
        new = fp.readline()
        # Once all lines are read this just returns ''
        # until the file changes and a new line appears

        if new:
            for word in words:
                if word in new:
                    yield (word, new)
        else:
            time.sleep(0.5)

fn = 'test.py'
words = ['word']
for hit_word, hit_sentence in watch(fn, words):
    print "Found %r in line: %r" % (hit_word, hit_sentence)

This solution with readline works if you know your data will appear in lines.

If the data is some sort of stream you need a buffer, larger than the largest word you're looking for, and fill it first. It gets a bit more complicated that way...

伤感在游骋 2024-08-17 09:18:16
def tail(f):
    f.seek(0, 2)

    while True:
        line = f.readline()

        if not line:
            time.sleep(0.1)
            continue

        yield line

def process_matches(matchtext):
    while True:
        line = (yield)  
        if matchtext in line:
            do_something_useful() # email alert, etc.


list_of_matches = ['ERROR', 'CRITICAL']
matches = [process_matches(string_match) for string_match in list_of_matches]    

for m in matches: # prime matches
    m.next()

while True:
    auditlog = tail( open(log_file_to_monitor) )
    for line in auditlog:
        for m in matches:
            m.send(line)

我用它来监视日志文件。在完整的实现中,我将 list_of_matches 保留在配置文件中,以便它可以用于多种目的。我的增强功能列表中包括对正则表达式的支持,而不是简单的“in”匹配。

def tail(f):
    f.seek(0, 2)

    while True:
        line = f.readline()

        if not line:
            time.sleep(0.1)
            continue

        yield line

def process_matches(matchtext):
    while True:
        line = (yield)  
        if matchtext in line:
            do_something_useful() # email alert, etc.


list_of_matches = ['ERROR', 'CRITICAL']
matches = [process_matches(string_match) for string_match in list_of_matches]    

for m in matches: # prime matches
    m.next()

while True:
    auditlog = tail( open(log_file_to_monitor) )
    for line in auditlog:
        for m in matches:
            m.send(line)

I use this to monitor log files. In the full implementation, I keep list_of_matches in a configuration file so it can be used for multiple purposes. On my list of enhancements is support for regex instead of a simple 'in' match.

年华零落成诗 2024-08-17 09:18:16

编辑:正如下面的评论所述,O_NONBLOCK 不适用于磁盘上的文件。如果其他人来寻找来自套接字或命名管道或其他进程的尾部数据,这仍然会有所帮助,但它没有回答所提出的实际问题。原始答案保留在下面,以供后代使用。 (调用 tail 和 grep 会起作用,但无论如何都不是答案。)

使用 O_NONBLOCK 打开文件并使用 select 轮询读取可用性,然后然后 read 读取新数据,并使用字符串方法过滤文件末尾的行...或者只使用 subprocess 模块并让 tail< /code> 和 grep 为您完成工作,就像在 shell 中一样。

EDIT: as the comment below notes, O_NONBLOCK doesn't work for files on disk. This will still help if anyone else comes along looking to tail data coming from a socket or named pipe or another process, but it doesn't answer the actual question that was asked. Original answer remains below for posterity. (Calling out to tail and grep will work, but is a non-answer of sorts anyway.)

Either open the file with O_NONBLOCK and use select to poll for read availability and then read to read the new data and the string methods to filter lines on the end of a file...or just use the subprocess module and let tail and grep do the work for you just as you would in the shell.

绮筵 2024-08-17 09:18:16

您可以使用 select 轮询文件中的新内容。

def tail(filename, bufsize = 1024):
    fds = [ os.open(filename, os.O_RDONLY) ]
    while True:
        reads, _, _ = select.select(fds, [], [])
        if 0 < len(reads):
            yield os.read(reads[0], bufsize)

You can use select to poll for new contents in a file.

def tail(filename, bufsize = 1024):
    fds = [ os.open(filename, os.O_RDONLY) ]
    while True:
        reads, _, _ = select.select(fds, [], [])
        if 0 < len(reads):
            yield os.read(reads[0], bufsize)
孤芳又自赏 2024-08-17 09:18:16

看起来有一个包: https://github.com/kasun/python-tail

Looks like there's a package for that: https://github.com/kasun/python-tail

江挽川 2024-08-17 09:18:16

如果您无法将问题限制为基于行的读取,则需要诉诸块。

这应该可行:

import sys

needle = "needle"

blocks = []

inf = sys.stdin

if len(sys.argv) == 2:
    inf = open(sys.argv[1])

while True:
    block = inf.read()
    blocks.append(block)
    if len(blocks) >= 2:
        data = "".join((blocks[-2], blocks[-1]))
    else:
        data = blocks[-1]

    # attention, this needs to be changed if you are interested
    # in *all* matches separately, not if there was any match ata all
    if needle in data:
        print "found"
        blocks = []
    blocks[:-2] = []

    if block == "":
        break

挑战在于确保针匹配,即使它被两个块边界分开。

If you can't constraint the problem to work for a line-based read, you need to resort to blocks.

This should work:

import sys

needle = "needle"

blocks = []

inf = sys.stdin

if len(sys.argv) == 2:
    inf = open(sys.argv[1])

while True:
    block = inf.read()
    blocks.append(block)
    if len(blocks) >= 2:
        data = "".join((blocks[-2], blocks[-1]))
    else:
        data = blocks[-1]

    # attention, this needs to be changed if you are interested
    # in *all* matches separately, not if there was any match ata all
    if needle in data:
        print "found"
        blocks = []
    blocks[:-2] = []

    if block == "":
        break

The challenge lies in ensuring that you match needle even if it's separated by two block-boundaries.

夏の忆 2024-08-17 09:18:16

您可以使用 pytailf :简单的 python tail -f 包装器

from tailf import tailf    

for line in tailf("myfile.log"):
    print line

you can use pytailf : Simple python tail -f wrapper

from tailf import tailf    

for line in tailf("myfile.log"):
    print line
骑趴 2024-08-17 09:18:16

据我所知,Python 函数列表中没有相当于“tail”的函数。解决方案是使用tell()(获取文件大小)和read()来计算结束行。

这篇博文(不是我写的)已经写出了函数,看起来很适合我!
http://www.manugarg.com/2007/04/ real-tailing-in-python.html

To my knowledge there's no equivalent to "tail" in the Python function list. Solution would be to use tell() (get file size) and read() to work out the ending lines.

This blog post (not by me) has the function written out, looks appropriate to me!
http://www.manugarg.com/2007/04/real-tailing-in-python.html

零時差 2024-08-17 09:18:16

如果您只需要一个非常简单的 Python 3 解决方案来处理写入文本文件的行,并且不需要 Windows 支持,那么这对我来说效果很好:

import subprocess
def tailf(filename):
    #returns lines from a file, starting from the beginning
    command = "tail -n +1 -F " + filename
    p = subprocess.Popen(command.split(), stdout=subprocess.PIPE, universal_newlines=True)
    for line in p.stdout:
        yield line
for line in tailf("logfile"):
    #do stuff

它会阻止等待写入新行,因此如果不进行一些修改,这不适合异步使用。

If you just need a dead simple Python 3 solution for processing the lines of a text file as they're written, and you don't need Windows support, this worked well for me:

import subprocess
def tailf(filename):
    #returns lines from a file, starting from the beginning
    command = "tail -n +1 -F " + filename
    p = subprocess.Popen(command.split(), stdout=subprocess.PIPE, universal_newlines=True)
    for line in p.stdout:
        yield line
for line in tailf("logfile"):
    #do stuff

It blocks waiting for new lines to be written, so this isn't suitable for asynchronous use without some modifications.

天暗了我发光 2024-08-17 09:18:16

您可以使用collections.deque来实现tail。

来自 http://docs.python.org/library/collections.html#deque -recipes ...

def tail(filename, n=10):
    'Return the last n lines of a file'
    return deque(open(filename), n)

当然,这会读取整个文件内容,但这是实现 tail 的一种简洁的方式。

You can use collections.deque to implement tail.

From http://docs.python.org/library/collections.html#deque-recipes ...

def tail(filename, n=10):
    'Return the last n lines of a file'
    return deque(open(filename), n)

Of course, this reads the entire file contents, but it's a neat and terse way of implementing tail.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文