从正在 Python 中写入的打开文件句柄中读取

发布于 2024-10-27 04:27:27 字数 779 浏览 2 评论 0原文

我知道这是流处理中的一个经典问题,但我不知道如何在Python中处理它。我有一个正在由活动进程写入的文件句柄。我想逐行使用该文件句柄中的内容,但我不想在等待读取时陷入死锁。我将继续阅读,直到 EOF 或循环阅读 60 秒,以先到者为准。有关如何执行此操作的建议将不胜感激。我对这个问题的伪代码描述如下。

proc = genprocess("command")
found_a = False
found_b = False
start = time.time()
while True:
    line = proc.readline()
    while line:
        if not found_a and grep(pattern_a, line):
            found_a = True
            print "Found A, now looking for B"
        elif not found_b and grep(pattern_b, line):
            found_b = True
            print "Found B, all done"
            break
    if time.time() - start > 60:
        break
    else:
        time.sleep(5)

proc.kill()

问题是这只能在每个时间间隔读取一行。相反,我希望循环内部尽可能多次地迭代,但阻止等待新内容写入文件。一旦它读取了尽可能多的可用内容,它应该休眠 5 秒钟,以便积累更多内容。

I know this is a classic problem in stream processing, but I don't know how to handle it in Python. I have a file handle that is being written to by an active process. I want to consume content from that file handle on a line-by-line basis, but I don't want to deadlock waiting to read. I will keep reading until EOF or 60 seconds of looped reading, whichever comes first. Advice on how to do this would be appreciated. My pseudo code description of this problem is below.

proc = genprocess("command")
found_a = False
found_b = False
start = time.time()
while True:
    line = proc.readline()
    while line:
        if not found_a and grep(pattern_a, line):
            found_a = True
            print "Found A, now looking for B"
        elif not found_b and grep(pattern_b, line):
            found_b = True
            print "Found B, all done"
            break
    if time.time() - start > 60:
        break
    else:
        time.sleep(5)

proc.kill()

The problem is that this only reads one line on each interval. Instead I want the inside of the loop to iterate as many times as possible, but not to block waiting for new content to be written to the file. Once it has read as much as is available, it should sleep for 5 seconds to allow more content to accumulate.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

森末i 2024-11-03 04:27:27

如果你运行在 Unix 环境下,你可以使用 Python 的 select 模块 来等待用于文件句柄上的数据。另外,您可以使用Python的fcntl模块将文件句柄更改为非阻塞模式如此问题中所述。

例如,假设您的 proc 变量是支持 fileno() 的常规文件句柄:

file_num = proc.fileno()
old_flags = fcntl.fcntl(file_num, fcntl.F_GETFL)
fcntl.fcntl(file_num, fcntl.F_SETFL, old_flags | os.O_NONBLOCK)

If you're running on a Unix environment, you could use Python's select module to wait for data on the file handle. Also, you can use Python's fcntl module to change a file handle to non-blocking mode as described in this question.

For example, assuming your proc variable is a regular file handle that supports fileno():

file_num = proc.fileno()
old_flags = fcntl.fcntl(file_num, fcntl.F_GETFL)
fcntl.fcntl(file_num, fcntl.F_SETFL, old_flags | os.O_NONBLOCK)
背叛残局 2024-11-03 04:27:27

上面链接的 fcntl 示例是可以的(除了它将进程置于繁忙的循环轮询中),但是我最终使用“select”来实现或多或少所需的功能。

    started = False
    while True:
        if (time.time() - start > wait_for) or started:
            break
        (rlist, wlist, xlist) = select([proc.stdout], [], [], wait_interval)
        if len(rlist) > 0:
            line = rlist[0].readline() # read one line (this blocks until '\n' is read)
        else: # nothing available to read from proc.stdout
            print ".",
            sys.stdout.flush()
            time.sleep(1)
            continue
        if re.search("daemon started", line):
            started = True

    if not started:
        proc.kill() # don't leave the process running if it didn't start properly

如果这是用户可能会按 CTRL-C 的情况,那么将整个事情放入 try/ except 块中并查找 KeyboardInterrupt 允许调用 proc.kill(),而不是让进程在后台运行。

The fcntl example linked to above is OK (except that it puts the process in a busy loop polling), however I ended up using "select" to achieve more-or-less the desired functionality.

    started = False
    while True:
        if (time.time() - start > wait_for) or started:
            break
        (rlist, wlist, xlist) = select([proc.stdout], [], [], wait_interval)
        if len(rlist) > 0:
            line = rlist[0].readline() # read one line (this blocks until '\n' is read)
        else: # nothing available to read from proc.stdout
            print ".",
            sys.stdout.flush()
            time.sleep(1)
            continue
        if re.search("daemon started", line):
            started = True

    if not started:
        proc.kill() # don't leave the process running if it didn't start properly

And if this is the sort of thing a user might CTRL-C, then putting the whole thing in a try/except block and looking for KeyboardInterrupt allows proc.kill() to be called instead of leaving the process running in the background.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文